diff options
Diffstat (limited to 'plugin/forward')
-rw-r--r-- | plugin/forward/connect.go | 152 | ||||
-rw-r--r-- | plugin/forward/dnstap.go | 7 | ||||
-rw-r--r-- | plugin/forward/forward.go | 49 | ||||
-rw-r--r-- | plugin/forward/forward_test.go | 20 | ||||
-rw-r--r-- | plugin/forward/health.go | 106 | ||||
-rw-r--r-- | plugin/forward/health_test.go | 64 | ||||
-rw-r--r-- | plugin/forward/metrics.go | 37 | ||||
-rw-r--r-- | plugin/forward/persistent.go | 162 | ||||
-rw-r--r-- | plugin/forward/persistent_test.go | 109 | ||||
-rw-r--r-- | plugin/forward/policy.go | 15 | ||||
-rw-r--r-- | plugin/forward/proxy.go | 82 | ||||
-rw-r--r-- | plugin/forward/proxy_test.go | 29 | ||||
-rw-r--r-- | plugin/forward/setup.go | 25 | ||||
-rw-r--r-- | plugin/forward/setup_test.go | 55 | ||||
-rw-r--r-- | plugin/forward/type.go | 37 |
15 files changed, 128 insertions, 821 deletions
diff --git a/plugin/forward/connect.go b/plugin/forward/connect.go deleted file mode 100644 index 3d53044e5..000000000 --- a/plugin/forward/connect.go +++ /dev/null @@ -1,152 +0,0 @@ -// Package forward implements a forwarding proxy. It caches an upstream net.Conn for some time, so if the same -// client returns the upstream's Conn will be precached. Depending on how you benchmark this looks to be -// 50% faster than just opening a new connection for every client. It works with UDP and TCP and uses -// inband healthchecking. -package forward - -import ( - "context" - "io" - "strconv" - "sync/atomic" - "time" - - "github.com/coredns/coredns/request" - - "github.com/miekg/dns" -) - -// limitTimeout is a utility function to auto-tune timeout values -// average observed time is moved towards the last observed delay moderated by a weight -// next timeout to use will be the double of the computed average, limited by min and max frame. -func limitTimeout(currentAvg *int64, minValue time.Duration, maxValue time.Duration) time.Duration { - rt := time.Duration(atomic.LoadInt64(currentAvg)) - if rt < minValue { - return minValue - } - if rt < maxValue/2 { - return 2 * rt - } - return maxValue -} - -func averageTimeout(currentAvg *int64, observedDuration time.Duration, weight int64) { - dt := time.Duration(atomic.LoadInt64(currentAvg)) - atomic.AddInt64(currentAvg, int64(observedDuration-dt)/weight) -} - -func (t *Transport) dialTimeout() time.Duration { - return limitTimeout(&t.avgDialTime, minDialTimeout, maxDialTimeout) -} - -func (t *Transport) updateDialTimeout(newDialTime time.Duration) { - averageTimeout(&t.avgDialTime, newDialTime, cumulativeAvgWeight) -} - -// Dial dials the address configured in transport, potentially reusing a connection or creating a new one. -func (t *Transport) Dial(proto string) (*persistConn, bool, error) { - // If tls has been configured; use it. - if t.tlsConfig != nil { - proto = "tcp-tls" - } - - t.dial <- proto - pc := <-t.ret - - if pc != nil { - ConnCacheHitsCount.WithLabelValues(t.addr, proto).Add(1) - return pc, true, nil - } - ConnCacheMissesCount.WithLabelValues(t.addr, proto).Add(1) - - reqTime := time.Now() - timeout := t.dialTimeout() - if proto == "tcp-tls" { - conn, err := dns.DialTimeoutWithTLS("tcp", t.addr, t.tlsConfig, timeout) - t.updateDialTimeout(time.Since(reqTime)) - return &persistConn{c: conn}, false, err - } - conn, err := dns.DialTimeout(proto, t.addr, timeout) - t.updateDialTimeout(time.Since(reqTime)) - return &persistConn{c: conn}, false, err -} - -// Connect selects an upstream, sends the request and waits for a response. -func (p *Proxy) Connect(ctx context.Context, state request.Request, opts options) (*dns.Msg, error) { - start := time.Now() - - proto := "" - switch { - case opts.forceTCP: // TCP flag has precedence over UDP flag - proto = "tcp" - case opts.preferUDP: - proto = "udp" - default: - proto = state.Proto() - } - - pc, cached, err := p.transport.Dial(proto) - if err != nil { - return nil, err - } - - // Set buffer size correctly for this client. - pc.c.UDPSize = uint16(state.Size()) - if pc.c.UDPSize < 512 { - pc.c.UDPSize = 512 - } - - pc.c.SetWriteDeadline(time.Now().Add(maxTimeout)) - // records the origin Id before upstream. - originId := state.Req.Id - state.Req.Id = dns.Id() - defer func() { - state.Req.Id = originId - }() - - if err := pc.c.WriteMsg(state.Req); err != nil { - pc.c.Close() // not giving it back - if err == io.EOF && cached { - return nil, ErrCachedClosed - } - return nil, err - } - - var ret *dns.Msg - pc.c.SetReadDeadline(time.Now().Add(readTimeout)) - for { - ret, err = pc.c.ReadMsg() - if err != nil { - pc.c.Close() // not giving it back - if err == io.EOF && cached { - return nil, ErrCachedClosed - } - // recovery the origin Id after upstream. - if ret != nil { - ret.Id = originId - } - return ret, err - } - // drop out-of-order responses - if state.Req.Id == ret.Id { - break - } - } - // recovery the origin Id after upstream. - ret.Id = originId - - p.transport.Yield(pc) - - rc, ok := dns.RcodeToString[ret.Rcode] - if !ok { - rc = strconv.Itoa(ret.Rcode) - } - - RequestCount.WithLabelValues(p.addr).Add(1) - RcodeCount.WithLabelValues(rc, p.addr).Add(1) - RequestDuration.WithLabelValues(p.addr, rc).Observe(time.Since(start).Seconds()) - - return ret, nil -} - -const cumulativeAvgWeight = 4 diff --git a/plugin/forward/dnstap.go b/plugin/forward/dnstap.go index edbee8715..e9962d268 100644 --- a/plugin/forward/dnstap.go +++ b/plugin/forward/dnstap.go @@ -6,6 +6,7 @@ import ( "time" "github.com/coredns/coredns/plugin/dnstap/msg" + "github.com/coredns/coredns/plugin/pkg/proxy" "github.com/coredns/coredns/request" tap "github.com/dnstap/golang-dnstap" @@ -13,7 +14,7 @@ import ( ) // toDnstap will send the forward and received message to the dnstap plugin. -func toDnstap(f *Forward, host string, state request.Request, opts options, reply *dns.Msg, start time.Time) { +func toDnstap(f *Forward, host string, state request.Request, opts proxy.Options, reply *dns.Msg, start time.Time) { h, p, _ := net.SplitHostPort(host) // this is preparsed and can't err here port, _ := strconv.ParseUint(p, 10, 32) // same here ip := net.ParseIP(h) @@ -21,9 +22,9 @@ func toDnstap(f *Forward, host string, state request.Request, opts options, repl var ta net.Addr = &net.UDPAddr{IP: ip, Port: int(port)} t := state.Proto() switch { - case opts.forceTCP: + case opts.ForceTCP: t = "tcp" - case opts.preferUDP: + case opts.PreferUDP: t = "udp" } diff --git a/plugin/forward/forward.go b/plugin/forward/forward.go index 223d7e398..927a6e21f 100644 --- a/plugin/forward/forward.go +++ b/plugin/forward/forward.go @@ -16,6 +16,7 @@ import ( "github.com/coredns/coredns/plugin/dnstap" "github.com/coredns/coredns/plugin/metadata" clog "github.com/coredns/coredns/plugin/pkg/log" + "github.com/coredns/coredns/plugin/pkg/proxy" "github.com/coredns/coredns/request" "github.com/miekg/dns" @@ -25,12 +26,17 @@ import ( var log = clog.NewWithPlugin("forward") +const ( + defaultExpire = 10 * time.Second + hcInterval = 500 * time.Millisecond +) + // Forward represents a plugin instance that can proxy requests to another (DNS) server. It has a list // of proxies each representing one upstream proxy. type Forward struct { concurrent int64 // atomic counters need to be first in struct for proper alignment - proxies []*Proxy + proxies []*proxy.Proxy p Policy hcInterval time.Duration @@ -43,7 +49,7 @@ type Forward struct { expire time.Duration maxConcurrent int64 - opts options // also here for testing + opts proxy.Options // also here for testing // ErrLimitExceeded indicates that a query was rejected because the number of concurrent queries has exceeded // the maximum allowed (maxConcurrent) @@ -56,14 +62,14 @@ type Forward struct { // New returns a new Forward. func New() *Forward { - f := &Forward{maxfails: 2, tlsConfig: new(tls.Config), expire: defaultExpire, p: new(random), from: ".", hcInterval: hcInterval, opts: options{forceTCP: false, preferUDP: false, hcRecursionDesired: true, hcDomain: "."}} + f := &Forward{maxfails: 2, tlsConfig: new(tls.Config), expire: defaultExpire, p: new(random), from: ".", hcInterval: hcInterval, opts: proxy.Options{ForceTCP: false, PreferUDP: false, HCRecursionDesired: true, HCDomain: "."}} return f } // SetProxy appends p to the proxy list and starts healthchecking. -func (f *Forward) SetProxy(p *Proxy) { +func (f *Forward) SetProxy(p *proxy.Proxy) { f.proxies = append(f.proxies, p) - p.start(f.hcInterval) + p.Start(f.hcInterval) } // SetTapPlugin appends one or more dnstap plugins to the tap plugin list. @@ -128,12 +134,12 @@ func (f *Forward) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg if span != nil { child = span.Tracer().StartSpan("connect", ot.ChildOf(span.Context())) - otext.PeerAddress.Set(child, proxy.addr) + otext.PeerAddress.Set(child, proxy.Addr()) ctx = ot.ContextWithSpan(ctx, child) } metadata.SetValueFunc(ctx, "forward/upstream", func() string { - return proxy.addr + return proxy.Addr() }) var ( @@ -141,14 +147,15 @@ func (f *Forward) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg err error ) opts := f.opts + for { ret, err = proxy.Connect(ctx, state, opts) if err == ErrCachedClosed { // Remote side closed conn, can only happen with TCP. continue } // Retry with TCP if truncated and prefer_udp configured. - if ret != nil && ret.Truncated && !opts.forceTCP && opts.preferUDP { - opts.forceTCP = true + if ret != nil && ret.Truncated && !opts.ForceTCP && opts.PreferUDP { + opts.ForceTCP = true continue } break @@ -159,7 +166,7 @@ func (f *Forward) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg } if len(f.tapPlugins) != 0 { - toDnstap(f, proxy.addr, state, opts, ret, start) + toDnstap(f, proxy.Addr(), state, opts, ret, start) } upstreamErr = err @@ -219,13 +226,13 @@ func (f *Forward) isAllowedDomain(name string) bool { } // ForceTCP returns if TCP is forced to be used even when the request comes in over UDP. -func (f *Forward) ForceTCP() bool { return f.opts.forceTCP } +func (f *Forward) ForceTCP() bool { return f.opts.ForceTCP } // PreferUDP returns if UDP is preferred to be used even when the request comes in over TCP. -func (f *Forward) PreferUDP() bool { return f.opts.preferUDP } +func (f *Forward) PreferUDP() bool { return f.opts.PreferUDP } // List returns a set of proxies to be used for this client depending on the policy in f. -func (f *Forward) List() []*Proxy { return f.p.List(f.proxies) } +func (f *Forward) List() []*proxy.Proxy { return f.p.List(f.proxies) } var ( // ErrNoHealthy means no healthy proxies left. @@ -236,12 +243,16 @@ var ( ErrCachedClosed = errors.New("cached connection was closed by peer") ) -// options holds various options that can be set. -type options struct { - forceTCP bool - preferUDP bool - hcRecursionDesired bool - hcDomain string +// Options holds various Options that can be set. +type Options struct { + // ForceTCP use TCP protocol for upstream DNS request. Has precedence over PreferUDP flag + ForceTCP bool + // PreferUDP use UDP protocol for upstream DNS request. + PreferUDP bool + // HCRecursionDesired sets recursion desired flag for Proxy healthcheck requests + HCRecursionDesired bool + // HCDomain sets domain for Proxy healthcheck requests + HCDomain string } var defaultTimeout = 5 * time.Second diff --git a/plugin/forward/forward_test.go b/plugin/forward/forward_test.go index b50f4ff22..9ea859826 100644 --- a/plugin/forward/forward_test.go +++ b/plugin/forward/forward_test.go @@ -8,23 +8,33 @@ import ( "github.com/coredns/caddy/caddyfile" "github.com/coredns/coredns/core/dnsserver" "github.com/coredns/coredns/plugin/dnstap" + "github.com/coredns/coredns/plugin/pkg/proxy" + "github.com/coredns/coredns/plugin/pkg/transport" ) func TestList(t *testing.T) { f := Forward{ - proxies: []*Proxy{{addr: "1.1.1.1:53"}, {addr: "2.2.2.2:53"}, {addr: "3.3.3.3:53"}}, - p: &roundRobin{}, + proxies: []*proxy.Proxy{ + proxy.NewProxy("1.1.1.1:53", transport.DNS), + proxy.NewProxy("2.2.2.2:53", transport.DNS), + proxy.NewProxy("3.3.3.3:53", transport.DNS), + }, + p: &roundRobin{}, } - expect := []*Proxy{{addr: "2.2.2.2:53"}, {addr: "1.1.1.1:53"}, {addr: "3.3.3.3:53"}} + expect := []*proxy.Proxy{ + proxy.NewProxy("2.2.2.2:53", transport.DNS), + proxy.NewProxy("1.1.1.1:53", transport.DNS), + proxy.NewProxy("3.3.3.3:53", transport.DNS), + } got := f.List() if len(got) != len(expect) { t.Fatalf("Expected: %v results, got: %v", len(expect), len(got)) } for i, p := range got { - if p.addr != expect[i].addr { - t.Fatalf("Expected proxy %v to be '%v', got: '%v'", i, expect[i].addr, p.addr) + if p.Addr() != expect[i].Addr() { + t.Fatalf("Expected proxy %v to be '%v', got: '%v'", i, expect[i].Addr(), p.Addr()) } } } diff --git a/plugin/forward/health.go b/plugin/forward/health.go deleted file mode 100644 index ec0b48143..000000000 --- a/plugin/forward/health.go +++ /dev/null @@ -1,106 +0,0 @@ -package forward - -import ( - "crypto/tls" - "sync/atomic" - "time" - - "github.com/coredns/coredns/plugin/pkg/transport" - - "github.com/miekg/dns" -) - -// HealthChecker checks the upstream health. -type HealthChecker interface { - Check(*Proxy) error - SetTLSConfig(*tls.Config) - SetRecursionDesired(bool) - GetRecursionDesired() bool - SetDomain(domain string) - GetDomain() string - SetTCPTransport() -} - -// dnsHc is a health checker for a DNS endpoint (DNS, and DoT). -type dnsHc struct { - c *dns.Client - recursionDesired bool - domain string -} - -var ( - hcReadTimeout = 1 * time.Second - hcWriteTimeout = 1 * time.Second -) - -// NewHealthChecker returns a new HealthChecker based on transport. -func NewHealthChecker(trans string, recursionDesired bool, domain string) HealthChecker { - switch trans { - case transport.DNS, transport.TLS: - c := new(dns.Client) - c.Net = "udp" - c.ReadTimeout = hcReadTimeout - c.WriteTimeout = hcWriteTimeout - - return &dnsHc{c: c, recursionDesired: recursionDesired, domain: domain} - } - - log.Warningf("No healthchecker for transport %q", trans) - return nil -} - -func (h *dnsHc) SetTLSConfig(cfg *tls.Config) { - h.c.Net = "tcp-tls" - h.c.TLSConfig = cfg -} - -func (h *dnsHc) SetRecursionDesired(recursionDesired bool) { - h.recursionDesired = recursionDesired -} -func (h *dnsHc) GetRecursionDesired() bool { - return h.recursionDesired -} - -func (h *dnsHc) SetDomain(domain string) { - h.domain = domain -} -func (h *dnsHc) GetDomain() string { - return h.domain -} - -func (h *dnsHc) SetTCPTransport() { - h.c.Net = "tcp" -} - -// For HC we send to . IN NS +[no]rec message to the upstream. Dial timeouts and empty -// replies are considered fails, basically anything else constitutes a healthy upstream. - -// Check is used as the up.Func in the up.Probe. -func (h *dnsHc) Check(p *Proxy) error { - err := h.send(p.addr) - if err != nil { - HealthcheckFailureCount.WithLabelValues(p.addr).Add(1) - atomic.AddUint32(&p.fails, 1) - return err - } - - atomic.StoreUint32(&p.fails, 0) - return nil -} - -func (h *dnsHc) send(addr string) error { - ping := new(dns.Msg) - ping.SetQuestion(h.domain, dns.TypeNS) - ping.MsgHdr.RecursionDesired = h.recursionDesired - - m, _, err := h.c.Exchange(ping, addr) - // If we got a header, we're alright, basically only care about I/O errors 'n stuff. - if err != nil && m != nil { - // Silly check, something sane came back. - if m.Response || m.Opcode == dns.OpcodeQuery { - err = nil - } - } - - return err -} diff --git a/plugin/forward/health_test.go b/plugin/forward/health_test.go index 9917b3a37..7cb928d22 100644 --- a/plugin/forward/health_test.go +++ b/plugin/forward/health_test.go @@ -7,6 +7,7 @@ import ( "time" "github.com/coredns/coredns/plugin/pkg/dnstest" + "github.com/coredns/coredns/plugin/pkg/proxy" "github.com/coredns/coredns/plugin/pkg/transport" "github.com/coredns/coredns/plugin/test" @@ -14,9 +15,6 @@ import ( ) func TestHealth(t *testing.T) { - hcReadTimeout = 10 * time.Millisecond - hcWriteTimeout = 10 * time.Millisecond - readTimeout = 10 * time.Millisecond defaultTimeout = 10 * time.Millisecond i := uint32(0) @@ -35,7 +33,9 @@ func TestHealth(t *testing.T) { }) defer s.Close() - p := NewProxy(s.Addr, transport.DNS) + p := proxy.NewProxy(s.Addr, transport.DNS) + p.GetHealthchecker().SetReadTimeout(10 * time.Millisecond) + p.GetHealthchecker().SetWriteTimeout(10 * time.Millisecond) f := New() f.SetProxy(p) defer f.OnShutdown() @@ -53,9 +53,6 @@ func TestHealth(t *testing.T) { } func TestHealthTCP(t *testing.T) { - hcReadTimeout = 10 * time.Millisecond - hcWriteTimeout = 10 * time.Millisecond - readTimeout = 10 * time.Millisecond defaultTimeout = 10 * time.Millisecond i := uint32(0) @@ -74,8 +71,10 @@ func TestHealthTCP(t *testing.T) { }) defer s.Close() - p := NewProxy(s.Addr, transport.DNS) - p.health.SetTCPTransport() + p := proxy.NewProxy(s.Addr, transport.DNS) + p.GetHealthchecker().SetReadTimeout(10 * time.Millisecond) + p.GetHealthchecker().SetWriteTimeout(10 * time.Millisecond) + p.GetHealthchecker().SetTCPTransport() f := New() f.SetProxy(p) defer f.OnShutdown() @@ -93,10 +92,7 @@ func TestHealthTCP(t *testing.T) { } func TestHealthNoRecursion(t *testing.T) { - hcReadTimeout = 10 * time.Millisecond - readTimeout = 10 * time.Millisecond defaultTimeout = 10 * time.Millisecond - hcWriteTimeout = 10 * time.Millisecond i := uint32(0) q := uint32(0) @@ -114,8 +110,10 @@ func TestHealthNoRecursion(t *testing.T) { }) defer s.Close() - p := NewProxy(s.Addr, transport.DNS) - p.health.SetRecursionDesired(false) + p := proxy.NewProxy(s.Addr, transport.DNS) + p.GetHealthchecker().SetReadTimeout(10 * time.Millisecond) + p.GetHealthchecker().SetWriteTimeout(10 * time.Millisecond) + p.GetHealthchecker().SetRecursionDesired(false) f := New() f.SetProxy(p) defer f.OnShutdown() @@ -133,9 +131,6 @@ func TestHealthNoRecursion(t *testing.T) { } func TestHealthTimeout(t *testing.T) { - hcReadTimeout = 10 * time.Millisecond - hcWriteTimeout = 10 * time.Millisecond - readTimeout = 10 * time.Millisecond defaultTimeout = 10 * time.Millisecond i := uint32(0) @@ -159,7 +154,9 @@ func TestHealthTimeout(t *testing.T) { }) defer s.Close() - p := NewProxy(s.Addr, transport.DNS) + p := proxy.NewProxy(s.Addr, transport.DNS) + p.GetHealthchecker().SetReadTimeout(10 * time.Millisecond) + p.GetHealthchecker().SetWriteTimeout(10 * time.Millisecond) f := New() f.SetProxy(p) defer f.OnShutdown() @@ -177,19 +174,20 @@ func TestHealthTimeout(t *testing.T) { } func TestHealthMaxFails(t *testing.T) { - hcReadTimeout = 10 * time.Millisecond - hcWriteTimeout = 10 * time.Millisecond - readTimeout = 10 * time.Millisecond defaultTimeout = 10 * time.Millisecond - hcInterval = 10 * time.Millisecond + //,hcInterval = 10 * time.Millisecond s := dnstest.NewServer(func(w dns.ResponseWriter, r *dns.Msg) { // timeout }) defer s.Close() - p := NewProxy(s.Addr, transport.DNS) + p := proxy.NewProxy(s.Addr, transport.DNS) + p.SetReadTimeout(10 * time.Millisecond) + p.GetHealthchecker().SetReadTimeout(10 * time.Millisecond) + p.GetHealthchecker().SetWriteTimeout(10 * time.Millisecond) f := New() + f.hcInterval = 10 * time.Millisecond f.maxfails = 2 f.SetProxy(p) defer f.OnShutdown() @@ -200,18 +198,14 @@ func TestHealthMaxFails(t *testing.T) { f.ServeDNS(context.TODO(), &test.ResponseWriter{}, req) time.Sleep(100 * time.Millisecond) - fails := atomic.LoadUint32(&p.fails) + fails := p.Fails() if !p.Down(f.maxfails) { t.Errorf("Expected Proxy fails to be greater than %d, got %d", f.maxfails, fails) } } func TestHealthNoMaxFails(t *testing.T) { - hcReadTimeout = 10 * time.Millisecond - hcWriteTimeout = 10 * time.Millisecond - readTimeout = 10 * time.Millisecond defaultTimeout = 10 * time.Millisecond - hcInterval = 10 * time.Millisecond i := uint32(0) s := dnstest.NewServer(func(w dns.ResponseWriter, r *dns.Msg) { @@ -225,7 +219,9 @@ func TestHealthNoMaxFails(t *testing.T) { }) defer s.Close() - p := NewProxy(s.Addr, transport.DNS) + p := proxy.NewProxy(s.Addr, transport.DNS) + p.GetHealthchecker().SetReadTimeout(10 * time.Millisecond) + p.GetHealthchecker().SetWriteTimeout(10 * time.Millisecond) f := New() f.maxfails = 0 f.SetProxy(p) @@ -244,10 +240,8 @@ func TestHealthNoMaxFails(t *testing.T) { } func TestHealthDomain(t *testing.T) { - hcReadTimeout = 10 * time.Millisecond - readTimeout = 10 * time.Millisecond defaultTimeout = 10 * time.Millisecond - hcWriteTimeout = 10 * time.Millisecond + hcDomain := "example.org." i := uint32(0) q := uint32(0) @@ -264,8 +258,10 @@ func TestHealthDomain(t *testing.T) { w.WriteMsg(ret) }) defer s.Close() - p := NewProxy(s.Addr, transport.DNS) - p.health.SetDomain(hcDomain) + p := proxy.NewProxy(s.Addr, transport.DNS) + p.GetHealthchecker().SetReadTimeout(10 * time.Millisecond) + p.GetHealthchecker().SetWriteTimeout(10 * time.Millisecond) + p.GetHealthchecker().SetDomain(hcDomain) f := New() f.SetProxy(p) defer f.OnShutdown() diff --git a/plugin/forward/metrics.go b/plugin/forward/metrics.go index f1f0c48d6..da0905525 100644 --- a/plugin/forward/metrics.go +++ b/plugin/forward/metrics.go @@ -9,31 +9,6 @@ import ( // Variables declared for monitoring. var ( - RequestCount = promauto.NewCounterVec(prometheus.CounterOpts{ - Namespace: plugin.Namespace, - Subsystem: "forward", - Name: "requests_total", - Help: "Counter of requests made per upstream.", - }, []string{"to"}) - RcodeCount = promauto.NewCounterVec(prometheus.CounterOpts{ - Namespace: plugin.Namespace, - Subsystem: "forward", - Name: "responses_total", - Help: "Counter of responses received per upstream.", - }, []string{"rcode", "to"}) - RequestDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{ - Namespace: plugin.Namespace, - Subsystem: "forward", - Name: "request_duration_seconds", - Buckets: plugin.TimeBuckets, - Help: "Histogram of the time each request took.", - }, []string{"to", "rcode"}) - HealthcheckFailureCount = promauto.NewCounterVec(prometheus.CounterOpts{ - Namespace: plugin.Namespace, - Subsystem: "forward", - Name: "healthcheck_failures_total", - Help: "Counter of the number of failed healthchecks.", - }, []string{"to"}) HealthcheckBrokenCount = promauto.NewCounter(prometheus.CounterOpts{ Namespace: plugin.Namespace, Subsystem: "forward", @@ -46,16 +21,4 @@ var ( Name: "max_concurrent_rejects_total", Help: "Counter of the number of queries rejected because the concurrent queries were at maximum.", }) - ConnCacheHitsCount = promauto.NewCounterVec(prometheus.CounterOpts{ - Namespace: plugin.Namespace, - Subsystem: "forward", - Name: "conn_cache_hits_total", - Help: "Counter of connection cache hits per upstream and protocol.", - }, []string{"to", "proto"}) - ConnCacheMissesCount = promauto.NewCounterVec(prometheus.CounterOpts{ - Namespace: plugin.Namespace, - Subsystem: "forward", - Name: "conn_cache_misses_total", - Help: "Counter of connection cache misses per upstream and protocol.", - }, []string{"to", "proto"}) ) diff --git a/plugin/forward/persistent.go b/plugin/forward/persistent.go deleted file mode 100644 index c53dea82f..000000000 --- a/plugin/forward/persistent.go +++ /dev/null @@ -1,162 +0,0 @@ -package forward - -import ( - "crypto/tls" - "sort" - "time" - - "github.com/miekg/dns" -) - -// a persistConn hold the dns.Conn and the last used time. -type persistConn struct { - c *dns.Conn - used time.Time -} - -// Transport hold the persistent cache. -type Transport struct { - avgDialTime int64 // kind of average time of dial time - conns [typeTotalCount][]*persistConn // Buckets for udp, tcp and tcp-tls. - expire time.Duration // After this duration a connection is expired. - addr string - tlsConfig *tls.Config - - dial chan string - yield chan *persistConn - ret chan *persistConn - stop chan bool -} - -func newTransport(addr string) *Transport { - t := &Transport{ - avgDialTime: int64(maxDialTimeout / 2), - conns: [typeTotalCount][]*persistConn{}, - expire: defaultExpire, - addr: addr, - dial: make(chan string), - yield: make(chan *persistConn), - ret: make(chan *persistConn), - stop: make(chan bool), - } - return t -} - -// connManagers manages the persistent connection cache for UDP and TCP. -func (t *Transport) connManager() { - ticker := time.NewTicker(defaultExpire) - defer ticker.Stop() -Wait: - for { - select { - case proto := <-t.dial: - transtype := stringToTransportType(proto) - // take the last used conn - complexity O(1) - if stack := t.conns[transtype]; len(stack) > 0 { - pc := stack[len(stack)-1] - if time.Since(pc.used) < t.expire { - // Found one, remove from pool and return this conn. - t.conns[transtype] = stack[:len(stack)-1] - t.ret <- pc - continue Wait - } - // clear entire cache if the last conn is expired - t.conns[transtype] = nil - // now, the connections being passed to closeConns() are not reachable from - // transport methods anymore. So, it's safe to close them in a separate goroutine - go closeConns(stack) - } - t.ret <- nil - - case pc := <-t.yield: - transtype := t.transportTypeFromConn(pc) - t.conns[transtype] = append(t.conns[transtype], pc) - - case <-ticker.C: - t.cleanup(false) - - case <-t.stop: - t.cleanup(true) - close(t.ret) - return - } - } -} - -// closeConns closes connections. -func closeConns(conns []*persistConn) { - for _, pc := range conns { - pc.c.Close() - } -} - -// cleanup removes connections from cache. -func (t *Transport) cleanup(all bool) { - staleTime := time.Now().Add(-t.expire) - for transtype, stack := range t.conns { - if len(stack) == 0 { - continue - } - if all { - t.conns[transtype] = nil - // now, the connections being passed to closeConns() are not reachable from - // transport methods anymore. So, it's safe to close them in a separate goroutine - go closeConns(stack) - continue - } - if stack[0].used.After(staleTime) { - continue - } - - // connections in stack are sorted by "used" - good := sort.Search(len(stack), func(i int) bool { - return stack[i].used.After(staleTime) - }) - t.conns[transtype] = stack[good:] - // now, the connections being passed to closeConns() are not reachable from - // transport methods anymore. So, it's safe to close them in a separate goroutine - go closeConns(stack[:good]) - } -} - -// It is hard to pin a value to this, the import thing is to no block forever, losing at cached connection is not terrible. -const yieldTimeout = 25 * time.Millisecond - -// Yield returns the connection to transport for reuse. -func (t *Transport) Yield(pc *persistConn) { - pc.used = time.Now() // update used time - - // Make this non-blocking, because in the case of a very busy forwarder we will *block* on this yield. This - // blocks the outer go-routine and stuff will just pile up. We timeout when the send fails to as returning - // these connection is an optimization anyway. - select { - case t.yield <- pc: - return - case <-time.After(yieldTimeout): - return - } -} - -// Start starts the transport's connection manager. -func (t *Transport) Start() { go t.connManager() } - -// Stop stops the transport's connection manager. -func (t *Transport) Stop() { close(t.stop) } - -// SetExpire sets the connection expire time in transport. -func (t *Transport) SetExpire(expire time.Duration) { t.expire = expire } - -// SetTLSConfig sets the TLS config in transport. -func (t *Transport) SetTLSConfig(cfg *tls.Config) { t.tlsConfig = cfg } - -const ( - defaultExpire = 10 * time.Second - minDialTimeout = 1 * time.Second - maxDialTimeout = 30 * time.Second -) - -// Make a var for minimizing this value in tests. -var ( - // Some resolves might take quite a while, usually (cached) responses are fast. Set to 2s to give us some time to retry a different upstream. - readTimeout = 2 * time.Second -) diff --git a/plugin/forward/persistent_test.go b/plugin/forward/persistent_test.go deleted file mode 100644 index 633696ac0..000000000 --- a/plugin/forward/persistent_test.go +++ /dev/null @@ -1,109 +0,0 @@ -package forward - -import ( - "testing" - "time" - - "github.com/coredns/coredns/plugin/pkg/dnstest" - - "github.com/miekg/dns" -) - -func TestCached(t *testing.T) { - s := dnstest.NewServer(func(w dns.ResponseWriter, r *dns.Msg) { - ret := new(dns.Msg) - ret.SetReply(r) - w.WriteMsg(ret) - }) - defer s.Close() - - tr := newTransport(s.Addr) - tr.Start() - defer tr.Stop() - - c1, cache1, _ := tr.Dial("udp") - c2, cache2, _ := tr.Dial("udp") - - if cache1 || cache2 { - t.Errorf("Expected non-cached connection") - } - - tr.Yield(c1) - tr.Yield(c2) - c3, cached3, _ := tr.Dial("udp") - if !cached3 { - t.Error("Expected cached connection (c3)") - } - if c2 != c3 { - t.Error("Expected c2 == c3") - } - - tr.Yield(c3) - - // dial another protocol - c4, cached4, _ := tr.Dial("tcp") - if cached4 { - t.Errorf("Expected non-cached connection (c4)") - } - tr.Yield(c4) -} - -func TestCleanupByTimer(t *testing.T) { - s := dnstest.NewServer(func(w dns.ResponseWriter, r *dns.Msg) { - ret := new(dns.Msg) - ret.SetReply(r) - w.WriteMsg(ret) - }) - defer s.Close() - - tr := newTransport(s.Addr) - tr.SetExpire(100 * time.Millisecond) - tr.Start() - defer tr.Stop() - - c1, _, _ := tr.Dial("udp") - c2, _, _ := tr.Dial("udp") - tr.Yield(c1) - time.Sleep(10 * time.Millisecond) - tr.Yield(c2) - - time.Sleep(120 * time.Millisecond) - c3, cached, _ := tr.Dial("udp") - if cached { - t.Error("Expected non-cached connection (c3)") - } - tr.Yield(c3) - - time.Sleep(120 * time.Millisecond) - c4, cached, _ := tr.Dial("udp") - if cached { - t.Error("Expected non-cached connection (c4)") - } - tr.Yield(c4) -} - -func TestCleanupAll(t *testing.T) { - s := dnstest.NewServer(func(w dns.ResponseWriter, r *dns.Msg) { - ret := new(dns.Msg) - ret.SetReply(r) - w.WriteMsg(ret) - }) - defer s.Close() - - tr := newTransport(s.Addr) - - c1, _ := dns.DialTimeout("udp", tr.addr, maxDialTimeout) - c2, _ := dns.DialTimeout("udp", tr.addr, maxDialTimeout) - c3, _ := dns.DialTimeout("udp", tr.addr, maxDialTimeout) - - tr.conns[typeUDP] = []*persistConn{{c1, time.Now()}, {c2, time.Now()}, {c3, time.Now()}} - - if len(tr.conns[typeUDP]) != 3 { - t.Error("Expected 3 connections") - } - tr.cleanup(true) - - if len(tr.conns[typeUDP]) > 0 { - t.Error("Expected no cached connections") - } -} diff --git a/plugin/forward/policy.go b/plugin/forward/policy.go index e81e4ab91..7bd1f316a 100644 --- a/plugin/forward/policy.go +++ b/plugin/forward/policy.go @@ -4,12 +4,13 @@ import ( "sync/atomic" "time" + "github.com/coredns/coredns/plugin/pkg/proxy" "github.com/coredns/coredns/plugin/pkg/rand" ) // Policy defines a policy we use for selecting upstreams. type Policy interface { - List([]*Proxy) []*Proxy + List([]*proxy.Proxy) []*proxy.Proxy String() string } @@ -18,19 +19,19 @@ type random struct{} func (r *random) String() string { return "random" } -func (r *random) List(p []*Proxy) []*Proxy { +func (r *random) List(p []*proxy.Proxy) []*proxy.Proxy { switch len(p) { case 1: return p case 2: if rn.Int()%2 == 0 { - return []*Proxy{p[1], p[0]} // swap + return []*proxy.Proxy{p[1], p[0]} // swap } return p } perms := rn.Perm(len(p)) - rnd := make([]*Proxy, len(p)) + rnd := make([]*proxy.Proxy, len(p)) for i, p1 := range perms { rnd[i] = p[p1] @@ -45,11 +46,11 @@ type roundRobin struct { func (r *roundRobin) String() string { return "round_robin" } -func (r *roundRobin) List(p []*Proxy) []*Proxy { +func (r *roundRobin) List(p []*proxy.Proxy) []*proxy.Proxy { poolLen := uint32(len(p)) i := atomic.AddUint32(&r.robin, 1) % poolLen - robin := []*Proxy{p[i]} + robin := []*proxy.Proxy{p[i]} robin = append(robin, p[:i]...) robin = append(robin, p[i+1:]...) @@ -61,7 +62,7 @@ type sequential struct{} func (r *sequential) String() string { return "sequential" } -func (r *sequential) List(p []*Proxy) []*Proxy { +func (r *sequential) List(p []*proxy.Proxy) []*proxy.Proxy { return p } diff --git a/plugin/forward/proxy.go b/plugin/forward/proxy.go deleted file mode 100644 index 6a4b5693e..000000000 --- a/plugin/forward/proxy.go +++ /dev/null @@ -1,82 +0,0 @@ -package forward - -import ( - "crypto/tls" - "runtime" - "sync/atomic" - "time" - - "github.com/coredns/coredns/plugin/pkg/up" -) - -// Proxy defines an upstream host. -type Proxy struct { - fails uint32 - addr string - - transport *Transport - - // health checking - probe *up.Probe - health HealthChecker -} - -// NewProxy returns a new proxy. -func NewProxy(addr, trans string) *Proxy { - p := &Proxy{ - addr: addr, - fails: 0, - probe: up.New(), - transport: newTransport(addr), - } - p.health = NewHealthChecker(trans, true, ".") - runtime.SetFinalizer(p, (*Proxy).finalizer) - return p -} - -// SetTLSConfig sets the TLS config in the lower p.transport and in the healthchecking client. -func (p *Proxy) SetTLSConfig(cfg *tls.Config) { - p.transport.SetTLSConfig(cfg) - p.health.SetTLSConfig(cfg) -} - -// SetExpire sets the expire duration in the lower p.transport. -func (p *Proxy) SetExpire(expire time.Duration) { p.transport.SetExpire(expire) } - -// Healthcheck kicks of a round of health checks for this proxy. -func (p *Proxy) Healthcheck() { - if p.health == nil { - log.Warning("No healthchecker") - return - } - - p.probe.Do(func() error { - return p.health.Check(p) - }) -} - -// Down returns true if this proxy is down, i.e. has *more* fails than maxfails. -func (p *Proxy) Down(maxfails uint32) bool { - if maxfails == 0 { - return false - } - - fails := atomic.LoadUint32(&p.fails) - return fails > maxfails -} - -// close stops the health checking goroutine. -func (p *Proxy) stop() { p.probe.Stop() } -func (p *Proxy) finalizer() { p.transport.Stop() } - -// start starts the proxy's healthchecking. -func (p *Proxy) start(duration time.Duration) { - p.probe.Start(duration) - p.transport.Start() -} - -const ( - maxTimeout = 2 * time.Second -) - -var hcInterval = 500 * time.Millisecond diff --git a/plugin/forward/proxy_test.go b/plugin/forward/proxy_test.go index 74a0b5c4b..daf5f964c 100644 --- a/plugin/forward/proxy_test.go +++ b/plugin/forward/proxy_test.go @@ -6,9 +6,7 @@ import ( "github.com/coredns/caddy" "github.com/coredns/coredns/plugin/pkg/dnstest" - "github.com/coredns/coredns/plugin/pkg/transport" "github.com/coredns/coredns/plugin/test" - "github.com/coredns/coredns/request" "github.com/miekg/dns" ) @@ -70,30 +68,3 @@ func TestProxyTLSFail(t *testing.T) { t.Fatal("Expected *not* to receive reply, but got one") } } - -func TestProtocolSelection(t *testing.T) { - p := NewProxy("bad_address", transport.DNS) - - stateUDP := request.Request{W: &test.ResponseWriter{}, Req: new(dns.Msg)} - stateTCP := request.Request{W: &test.ResponseWriter{TCP: true}, Req: new(dns.Msg)} - ctx := context.TODO() - - go func() { - p.Connect(ctx, stateUDP, options{}) - p.Connect(ctx, stateUDP, options{forceTCP: true}) - p.Connect(ctx, stateUDP, options{preferUDP: true}) - p.Connect(ctx, stateUDP, options{preferUDP: true, forceTCP: true}) - p.Connect(ctx, stateTCP, options{}) - p.Connect(ctx, stateTCP, options{forceTCP: true}) - p.Connect(ctx, stateTCP, options{preferUDP: true}) - p.Connect(ctx, stateTCP, options{preferUDP: true, forceTCP: true}) - }() - - for i, exp := range []string{"udp", "tcp", "udp", "tcp", "tcp", "tcp", "udp", "tcp"} { - proto := <-p.transport.dial - p.transport.ret <- nil - if proto != exp { - t.Errorf("Unexpected protocol in case %d, expected %q, actual %q", i, exp, proto) - } - } -} diff --git a/plugin/forward/setup.go b/plugin/forward/setup.go index 7ca24df4d..6de0c870f 100644 --- a/plugin/forward/setup.go +++ b/plugin/forward/setup.go @@ -12,6 +12,7 @@ import ( "github.com/coredns/coredns/plugin" "github.com/coredns/coredns/plugin/dnstap" "github.com/coredns/coredns/plugin/pkg/parse" + "github.com/coredns/coredns/plugin/pkg/proxy" pkgtls "github.com/coredns/coredns/plugin/pkg/tls" "github.com/coredns/coredns/plugin/pkg/transport" @@ -67,7 +68,7 @@ func setup(c *caddy.Controller) error { // OnStartup starts a goroutines for all proxies. func (f *Forward) OnStartup() (err error) { for _, p := range f.proxies { - p.start(f.hcInterval) + p.Start(f.hcInterval) } return nil } @@ -75,7 +76,7 @@ func (f *Forward) OnStartup() (err error) { // OnShutdown stops all configured proxies. func (f *Forward) OnShutdown() error { for _, p := range f.proxies { - p.stop() + p.Stop() } return nil } @@ -127,7 +128,7 @@ func parseStanza(c *caddy.Controller) (*Forward, error) { if !allowedTrans[trans] { return f, fmt.Errorf("'%s' is not supported as a destination protocol in forward: %s", trans, host) } - p := NewProxy(h, trans) + p := proxy.NewProxy(h, trans) f.proxies = append(f.proxies, p) transports[i] = trans } @@ -152,12 +153,12 @@ func parseStanza(c *caddy.Controller) (*Forward, error) { f.proxies[i].SetTLSConfig(f.tlsConfig) } f.proxies[i].SetExpire(f.expire) - f.proxies[i].health.SetRecursionDesired(f.opts.hcRecursionDesired) + f.proxies[i].GetHealthchecker().SetRecursionDesired(f.opts.HCRecursionDesired) // when TLS is used, checks are set to tcp-tls - if f.opts.forceTCP && transports[i] != transport.TLS { - f.proxies[i].health.SetTCPTransport() + if f.opts.ForceTCP && transports[i] != transport.TLS { + f.proxies[i].GetHealthchecker().SetTCPTransport() } - f.proxies[i].health.SetDomain(f.opts.hcDomain) + f.proxies[i].GetHealthchecker().SetDomain(f.opts.HCDomain) } return f, nil @@ -194,12 +195,12 @@ func parseBlock(c *caddy.Controller, f *Forward) error { return fmt.Errorf("health_check can't be negative: %d", dur) } f.hcInterval = dur - f.opts.hcDomain = "." + f.opts.HCDomain = "." for c.NextArg() { switch hcOpts := c.Val(); hcOpts { case "no_rec": - f.opts.hcRecursionDesired = false + f.opts.HCRecursionDesired = false case "domain": if !c.NextArg() { return c.ArgErr() @@ -208,7 +209,7 @@ func parseBlock(c *caddy.Controller, f *Forward) error { if _, ok := dns.IsDomainName(hcDomain); !ok { return fmt.Errorf("health_check: invalid domain name %s", hcDomain) } - f.opts.hcDomain = plugin.Name(hcDomain).Normalize() + f.opts.HCDomain = plugin.Name(hcDomain).Normalize() default: return fmt.Errorf("health_check: unknown option %s", hcOpts) } @@ -218,12 +219,12 @@ func parseBlock(c *caddy.Controller, f *Forward) error { if c.NextArg() { return c.ArgErr() } - f.opts.forceTCP = true + f.opts.ForceTCP = true case "prefer_udp": if c.NextArg() { return c.ArgErr() } - f.opts.preferUDP = true + f.opts.PreferUDP = true case "tls": args := c.RemainingArgs() if len(args) > 3 { diff --git a/plugin/forward/setup_test.go b/plugin/forward/setup_test.go index 4b1743098..cf046b486 100644 --- a/plugin/forward/setup_test.go +++ b/plugin/forward/setup_test.go @@ -8,6 +8,7 @@ import ( "github.com/coredns/caddy" "github.com/coredns/coredns/core/dnsserver" + "github.com/coredns/coredns/plugin/pkg/proxy" "github.com/miekg/dns" ) @@ -19,31 +20,31 @@ func TestSetup(t *testing.T) { expectedFrom string expectedIgnored []string expectedFails uint32 - expectedOpts options + expectedOpts proxy.Options expectedErr string }{ // positive - {"forward . 127.0.0.1", false, ".", nil, 2, options{hcRecursionDesired: true, hcDomain: "."}, ""}, - {"forward . 127.0.0.1 {\nhealth_check 0.5s domain example.org\n}\n", false, ".", nil, 2, options{hcRecursionDesired: true, hcDomain: "example.org."}, ""}, - {"forward . 127.0.0.1 {\nexcept miek.nl\n}\n", false, ".", nil, 2, options{hcRecursionDesired: true, hcDomain: "."}, ""}, - {"forward . 127.0.0.1 {\nmax_fails 3\n}\n", false, ".", nil, 3, options{hcRecursionDesired: true, hcDomain: "."}, ""}, - {"forward . 127.0.0.1 {\nforce_tcp\n}\n", false, ".", nil, 2, options{forceTCP: true, hcRecursionDesired: true, hcDomain: "."}, ""}, - {"forward . 127.0.0.1 {\nprefer_udp\n}\n", false, ".", nil, 2, options{preferUDP: true, hcRecursionDesired: true, hcDomain: "."}, ""}, - {"forward . 127.0.0.1 {\nforce_tcp\nprefer_udp\n}\n", false, ".", nil, 2, options{preferUDP: true, forceTCP: true, hcRecursionDesired: true, hcDomain: "."}, ""}, - {"forward . 127.0.0.1:53", false, ".", nil, 2, options{hcRecursionDesired: true, hcDomain: "."}, ""}, - {"forward . 127.0.0.1:8080", false, ".", nil, 2, options{hcRecursionDesired: true, hcDomain: "."}, ""}, - {"forward . [::1]:53", false, ".", nil, 2, options{hcRecursionDesired: true, hcDomain: "."}, ""}, - {"forward . [2003::1]:53", false, ".", nil, 2, options{hcRecursionDesired: true, hcDomain: "."}, ""}, - {"forward . 127.0.0.1 \n", false, ".", nil, 2, options{hcRecursionDesired: true, hcDomain: "."}, ""}, - {"forward 10.9.3.0/18 127.0.0.1", false, "0.9.10.in-addr.arpa.", nil, 2, options{hcRecursionDesired: true, hcDomain: "."}, ""}, + {"forward . 127.0.0.1", false, ".", nil, 2, proxy.Options{HCRecursionDesired: true, HCDomain: "."}, ""}, + {"forward . 127.0.0.1 {\nhealth_check 0.5s domain example.org\n}\n", false, ".", nil, 2, proxy.Options{HCRecursionDesired: true, HCDomain: "example.org."}, ""}, + {"forward . 127.0.0.1 {\nexcept miek.nl\n}\n", false, ".", nil, 2, proxy.Options{HCRecursionDesired: true, HCDomain: "."}, ""}, + {"forward . 127.0.0.1 {\nmax_fails 3\n}\n", false, ".", nil, 3, proxy.Options{HCRecursionDesired: true, HCDomain: "."}, ""}, + {"forward . 127.0.0.1 {\nforce_tcp\n}\n", false, ".", nil, 2, proxy.Options{ForceTCP: true, HCRecursionDesired: true, HCDomain: "."}, ""}, + {"forward . 127.0.0.1 {\nprefer_udp\n}\n", false, ".", nil, 2, proxy.Options{PreferUDP: true, HCRecursionDesired: true, HCDomain: "."}, ""}, + {"forward . 127.0.0.1 {\nforce_tcp\nprefer_udp\n}\n", false, ".", nil, 2, proxy.Options{PreferUDP: true, ForceTCP: true, HCRecursionDesired: true, HCDomain: "."}, ""}, + {"forward . 127.0.0.1:53", false, ".", nil, 2, proxy.Options{HCRecursionDesired: true, HCDomain: "."}, ""}, + {"forward . 127.0.0.1:8080", false, ".", nil, 2, proxy.Options{HCRecursionDesired: true, HCDomain: "."}, ""}, + {"forward . [::1]:53", false, ".", nil, 2, proxy.Options{HCRecursionDesired: true, HCDomain: "."}, ""}, + {"forward . [2003::1]:53", false, ".", nil, 2, proxy.Options{HCRecursionDesired: true, HCDomain: "."}, ""}, + {"forward . 127.0.0.1 \n", false, ".", nil, 2, proxy.Options{HCRecursionDesired: true, HCDomain: "."}, ""}, + {"forward 10.9.3.0/18 127.0.0.1", false, "0.9.10.in-addr.arpa.", nil, 2, proxy.Options{HCRecursionDesired: true, HCDomain: "."}, ""}, {`forward . ::1 - forward com ::2`, false, ".", nil, 2, options{hcRecursionDesired: true, hcDomain: "."}, "plugin"}, + forward com ::2`, false, ".", nil, 2, proxy.Options{HCRecursionDesired: true, HCDomain: "."}, "plugin"}, // negative - {"forward . a27.0.0.1", true, "", nil, 0, options{hcRecursionDesired: true, hcDomain: "."}, "not an IP"}, - {"forward . 127.0.0.1 {\nblaatl\n}\n", true, "", nil, 0, options{hcRecursionDesired: true, hcDomain: "."}, "unknown property"}, - {"forward . 127.0.0.1 {\nhealth_check 0.5s domain\n}\n", true, "", nil, 0, options{hcRecursionDesired: true, hcDomain: "."}, "Wrong argument count or unexpected line ending after 'domain'"}, - {"forward . https://127.0.0.1 \n", true, ".", nil, 2, options{hcRecursionDesired: true, hcDomain: "."}, "'https' is not supported as a destination protocol in forward: https://127.0.0.1"}, - {"forward xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx 127.0.0.1 \n", true, ".", nil, 2, options{hcRecursionDesired: true, hcDomain: "."}, "unable to normalize 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'"}, + {"forward . a27.0.0.1", true, "", nil, 0, proxy.Options{HCRecursionDesired: true, HCDomain: "."}, "not an IP"}, + {"forward . 127.0.0.1 {\nblaatl\n}\n", true, "", nil, 0, proxy.Options{HCRecursionDesired: true, HCDomain: "."}, "unknown property"}, + {"forward . 127.0.0.1 {\nhealth_check 0.5s domain\n}\n", true, "", nil, 0, proxy.Options{HCRecursionDesired: true, HCDomain: "."}, "Wrong argument count or unexpected line ending after 'domain'"}, + {"forward . https://127.0.0.1 \n", true, ".", nil, 2, proxy.Options{HCRecursionDesired: true, HCDomain: "."}, "'https' is not supported as a destination protocol in forward: https://127.0.0.1"}, + {"forward xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx 127.0.0.1 \n", true, ".", nil, 2, proxy.Options{HCRecursionDesired: true, HCDomain: "."}, "unable to normalize 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx'"}, } for i, test := range tests { @@ -127,8 +128,8 @@ func TestSetupTLS(t *testing.T) { t.Errorf("Test %d: expected: %q, actual: %q", i, test.expectedServerName, f.tlsConfig.ServerName) } - if !test.shouldErr && test.expectedServerName != "" && test.expectedServerName != f.proxies[0].health.(*dnsHc).c.TLSConfig.ServerName { - t.Errorf("Test %d: expected: %q, actual: %q", i, test.expectedServerName, f.proxies[0].health.(*dnsHc).c.TLSConfig.ServerName) + if !test.shouldErr && test.expectedServerName != "" && test.expectedServerName != f.proxies[0].GetHealthchecker().GetTLSConfig().ServerName { + t.Errorf("Test %d: expected: %q, actual: %q", i, test.expectedServerName, f.proxies[0].GetHealthchecker().GetTLSConfig().ServerName) } } } @@ -179,14 +180,14 @@ nameserver 10.10.255.253`), 0666); err != nil { f := fs[0] for j, n := range test.expectedNames { - addr := f.proxies[j].addr + addr := f.proxies[j].Addr() if n != addr { t.Errorf("Test %d, expected %q, got %q", j, n, addr) } } for _, p := range f.proxies { - p.health.Check(p) // this should almost always err, we don't care it shouldn't crash + p.Healthcheck() // this should almost always err, we don't care it shouldn't crash } } } @@ -279,9 +280,9 @@ func TestSetupHealthCheck(t *testing.T) { } f := fs[0] - if f.opts.hcRecursionDesired != test.expectedRecVal || f.proxies[0].health.GetRecursionDesired() != test.expectedRecVal || - f.opts.hcDomain != test.expectedDomain || f.proxies[0].health.GetDomain() != test.expectedDomain || !dns.IsFqdn(f.proxies[0].health.GetDomain()) { - t.Errorf("Test %d: expectedRec: %v, got: %v. expectedDomain: %s, got: %s. ", i, test.expectedRecVal, f.opts.hcRecursionDesired, test.expectedDomain, f.opts.hcDomain) + if f.opts.HCRecursionDesired != test.expectedRecVal || f.proxies[0].GetHealthchecker().GetRecursionDesired() != test.expectedRecVal || + f.opts.HCDomain != test.expectedDomain || f.proxies[0].GetHealthchecker().GetDomain() != test.expectedDomain || !dns.IsFqdn(f.proxies[0].GetHealthchecker().GetDomain()) { + t.Errorf("Test %d: expectedRec: %v, got: %v. expectedDomain: %s, got: %s. ", i, test.expectedRecVal, f.opts.HCRecursionDesired, test.expectedDomain, f.opts.HCDomain) } } } diff --git a/plugin/forward/type.go b/plugin/forward/type.go deleted file mode 100644 index 9de842fbe..000000000 --- a/plugin/forward/type.go +++ /dev/null @@ -1,37 +0,0 @@ -package forward - -import "net" - -type transportType int - -const ( - typeUDP transportType = iota - typeTCP - typeTLS - typeTotalCount // keep this last -) - -func stringToTransportType(s string) transportType { - switch s { - case "udp": - return typeUDP - case "tcp": - return typeTCP - case "tcp-tls": - return typeTLS - } - - return typeUDP -} - -func (t *Transport) transportTypeFromConn(pc *persistConn) transportType { - if _, ok := pc.c.Conn.(*net.UDPConn); ok { - return typeUDP - } - - if t.tlsConfig == nil { - return typeTCP - } - - return typeTLS -} |