aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar Miek Gieben <miek@miek.nl> 2018-02-15 10:21:57 +0100
committerGravatar GitHub <noreply@github.com> 2018-02-15 10:21:57 +0100
commit16504234e5f9f01c3b079be719082da4abb781a3 (patch)
treeeefd5eb76a4ae6c7a77135371e7fff3b122da5fe
parent8b035fa938f94ed9bff3f44ee14daba34b13eafb (diff)
downloadcoredns-16504234e5f9f01c3b079be719082da4abb781a3.tar.gz
coredns-16504234e5f9f01c3b079be719082da4abb781a3.tar.zst
coredns-16504234e5f9f01c3b079be719082da4abb781a3.zip
plugin/forward using pkg/up (#1493)
* plugin/forward: on demand healtchecking Only start doing health checks when we encouner an error (any error). This uses the new pluing/pkg/up package to abstract away the actual checking. This reduces the LOC quite a bit; does need more testing, unit testing and tcpdumping a bit. * fix tests * Fix readme * Use pkg/up for healthchecks * remove unused channel * more cleanups * update readme * * Again do go generate and go build; still referencing the wrong forward repo? Anyway fixed. * Use pkg/up for doing the healtchecks to cut back on unwanted queries * Change up.Func to return an error instead of a boolean. * Drop the string target argument as it doesn't make sense. * Add healthcheck test on failing to get an upstream answer. TODO(miek): double check Forward and Lookup and how they interact with HC, and if we correctly call close() on those * actual test * Tests here * more tests * try getting rid of host * Get rid of the host indirection * Finish removing hosts * moar testing * import fmt * field is not used * docs * move some stuff * bring back health_check * maxfails=0 test * git and merging, bah * review
-rw-r--r--plugin/forward/README.md57
-rw-r--r--plugin/forward/connect.go9
-rw-r--r--plugin/forward/forward.go36
-rw-r--r--plugin/forward/forward_test.go3
-rw-r--r--plugin/forward/health.go47
-rw-r--r--plugin/forward/health_test.go136
-rw-r--r--plugin/forward/host.go44
-rw-r--r--plugin/forward/lookup.go11
-rw-r--r--plugin/forward/lookup_test.go3
-rw-r--r--plugin/forward/persistent.go43
-rw-r--r--plugin/forward/persistent_test.go3
-rw-r--r--plugin/forward/proxy.go87
-rw-r--r--plugin/forward/setup.go27
-rw-r--r--plugin/pkg/up/up.go11
-rw-r--r--plugin/pkg/up/up_test.go8
15 files changed, 305 insertions, 220 deletions
diff --git a/plugin/forward/README.md b/plugin/forward/README.md
index bbef305db..72367b3cc 100644
--- a/plugin/forward/README.md
+++ b/plugin/forward/README.md
@@ -6,10 +6,17 @@
## Description
-The *forward* plugin is generally faster (~30+%) than *proxy* as it re-uses already opened sockets
-to the upstreams. It supports UDP, TCP and DNS-over-TLS and uses inband health checking that is
-enabled by default.
-When *all* upstreams are down it assumes healtchecking as a mechanism has failed and will try to
+The *forward* plugin re-uses already opened sockets to the upstreams. It supports UDP, TCP and
+DNS-over-TLS and uses in band health checking.
+
+When it detects an error a health check is performed. This checks runs in a loop, every *0.5s*, for
+as long as the upstream reports unhealthy. Once healthy we stop health checking (until the next
+error). The health checks use a recursive DNS query (`. IN NS`) to get upstream health. Any response
+that is not a network error (REFUSED, NOTIMPL, SERVFAIL, etc) is taken as a healthy upstream. The
+health check uses the same protocol as specified in **TO**. If `max_fails` is set to 0, no checking
+is performed and upstreams will always be considered healthy.
+
+When *all* upstreams are down it assumes health checking as a mechanism has failed and will try to
connect to a random upstream (which may or may not work).
## Syntax
@@ -22,16 +29,11 @@ forward FROM TO...
* **FROM** is the base domain to match for the request to be forwarded.
* **TO...** are the destination endpoints to forward to. The **TO** syntax allows you to specify
- a protocol, `tls://9.9.9.9` or `dns://` for plain DNS. The number of upstreams is limited to 15.
-
-The health checks are done every *0.5s*. After *two* failed checks the upstream is considered
-unhealthy. The health checks use a recursive DNS query (`. IN NS`) to get upstream health. Any
-response that is not an error (REFUSED, NOTIMPL, SERVFAIL, etc) is taken as a healthy upstream. The
-health check uses the same protocol as specific in the **TO**. On startup each upstream is marked
-unhealthy until it passes a health check. A 0 duration will disable any health checks.
+ a protocol, `tls://9.9.9.9` or `dns://` (or no protocol) for plain DNS. The number of upstreams is
+ limited to 15.
-Multiple upstreams are randomized (default policy) on first use. When a healthy proxy returns an
-error during the exchange the next upstream in the list is tried.
+Multiple upstreams are randomized (see `policy`) on first use. When a healthy proxy returns an error
+during the exchange the next upstream in the list is tried.
Extra knobs are available with an expanded syntax:
@@ -39,12 +41,12 @@ Extra knobs are available with an expanded syntax:
forward FROM TO... {
except IGNORED_NAMES...
force_tcp
- health_check DURATION
expire DURATION
max_fails INTEGER
tls CERT KEY CA
tls_servername NAME
policy random|round_robin
+ health_checks DURATION
}
~~~
@@ -52,21 +54,16 @@ forward FROM TO... {
* **IGNORED_NAMES** in `except` is a space-separated list of domains to exclude from forwarding.
Requests that match none of these names will be passed through.
* `force_tcp`, use TCP even when the request comes in over UDP.
-* `health_checks`, use a different **DURATION** for health checking, the default duration is 0.5s.
- A value of 0 disables the health checks completely.
* `max_fails` is the number of subsequent failed health checks that are needed before considering
- a backend to be down. If 0, the backend will never be marked as down. Default is 2.
+ an upstream to be down. If 0, the upstream will never be marked as down (nor health checked).
+ Default is 2.
* `expire` **DURATION**, expire (cached) connections after this time, the default is 10s.
* `tls` **CERT** **KEY** **CA** define the TLS properties for TLS; if you leave this out the
system's configuration will be used.
* `tls_servername` **NAME** allows you to set a server name in the TLS configuration; for instance 9.9.9.9
needs this to be set to `dns.quad9.net`.
* `policy` specifies the policy to use for selecting upstream servers. The default is `random`.
-
-The upstream selection is done via random (default policy) selection. If the socket for this client
-isn't known *forward* will randomly choose one. If this turns out to be unhealthy, the next one is
-tried. If *all* hosts are down, we assume health checking is broken and select a *random* upstream to
-try.
+* `health_checks`, use a different **DURATION** for health checking, the default duration is 0.5s.
Also note the TLS config is "global" for the whole forwarding proxy if you need a different
`tls-name` for different upstreams you're out of luck.
@@ -80,7 +77,7 @@ If monitoring is enabled (via the *prometheus* directive) then the following met
* `coredns_forward_response_rcode_total{to, rcode}` - count of RCODEs per upstream.
* `coredns_forward_healthcheck_failure_count_total{to}` - number of failed health checks per upstream.
* `coredns_forward_healthcheck_broken_count_total{}` - counter of when all upstreams are unhealthy,
- and we are randomly spraying to a target.
+ and we are randomly (this always uses the `random` policy) spraying to an upstream.
* `coredns_forward_socket_count_total{to}` - number of cached sockets per upstream.
Where `to` is one of the upstream servers (**TO** from the config), `proto` is the protocol used by
@@ -125,16 +122,10 @@ Proxy everything except `example.org` using the host's `resolv.conf`'s nameserve
}
~~~
-Forward to a IPv6 host:
-
-~~~ corefile
-. {
- forward . [::1]:1053
-}
-~~~
-
Proxy all requests to 9.9.9.9 using the DNS-over-TLS protocol, and cache every answer for up to 30
-seconds.
+seconds. Note the `tls_servername` is mandatory if you want a working setup, as 9.9.9.9 can't be
+used in the TLS negotiation. Also set the health check duration to 5s to not completely swamp the
+service with health checks.
~~~ corefile
. {
@@ -148,7 +139,7 @@ seconds.
## Bugs
-The TLS config is global for the whole forwarding proxy if you need a different `tls-name` for
+The TLS config is global for the whole forwarding proxy if you need a different `tls_serveraame` for
different upstreams you're out of luck.
## Also See
diff --git a/plugin/forward/connect.go b/plugin/forward/connect.go
index cdad29ed1..5967c396c 100644
--- a/plugin/forward/connect.go
+++ b/plugin/forward/connect.go
@@ -21,9 +21,6 @@ func (p *Proxy) connect(ctx context.Context, state request.Request, forceTCP, me
if forceTCP {
proto = "tcp"
}
- if p.host.tlsConfig != nil {
- proto = "tcp-tls"
- }
conn, err := p.Dial(proto)
if err != nil {
@@ -57,9 +54,9 @@ func (p *Proxy) connect(ctx context.Context, state request.Request, forceTCP, me
rc = strconv.Itoa(ret.Rcode)
}
- RequestCount.WithLabelValues(p.host.addr).Add(1)
- RcodeCount.WithLabelValues(rc, p.host.addr).Add(1)
- RequestDuration.WithLabelValues(p.host.addr).Observe(time.Since(start).Seconds())
+ RequestCount.WithLabelValues(p.addr).Add(1)
+ RcodeCount.WithLabelValues(rc, p.addr).Add(1)
+ RequestDuration.WithLabelValues(p.addr).Observe(time.Since(start).Seconds())
}
return ret, nil
diff --git a/plugin/forward/forward.go b/plugin/forward/forward.go
index 35885008e..8a3e7188c 100644
--- a/plugin/forward/forward.go
+++ b/plugin/forward/forward.go
@@ -20,8 +20,9 @@ import (
// Forward represents a plugin instance that can proxy requests to another (DNS) server. It has a list
// of proxies each representing one upstream proxy.
type Forward struct {
- proxies []*Proxy
- p Policy
+ proxies []*Proxy
+ p Policy
+ hcInterval time.Duration
from string
ignored []string
@@ -31,22 +32,21 @@ type Forward struct {
maxfails uint32
expire time.Duration
- forceTCP bool // also here for testing
- hcInterval time.Duration // also here for testing
+ forceTCP bool // also here for testing
Next plugin.Handler
}
// New returns a new Forward.
func New() *Forward {
- f := &Forward{maxfails: 2, tlsConfig: new(tls.Config), expire: defaultExpire, hcInterval: hcDuration, p: new(random)}
+ f := &Forward{maxfails: 2, tlsConfig: new(tls.Config), expire: defaultExpire, p: new(random), from: ".", hcInterval: hcDuration}
return f
}
// SetProxy appends p to the proxy list and starts healthchecking.
func (f *Forward) SetProxy(p *Proxy) {
f.proxies = append(f.proxies, p)
- go p.healthCheck()
+ p.start(f.hcInterval)
}
// Len returns the number of configured proxies.
@@ -92,7 +92,27 @@ func (f *Forward) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg
child.Finish()
}
+ // If you query for instance ANY isc.org; you get a truncated query back which miekg/dns fails to unpack
+ // because the RRs are not finished. The returned message can be useful or useless. Return the original
+ // query with some header bits set that they should retry with TCP.
+ if err == dns.ErrTruncated {
+ // We may or may not have something sensible... if not reassemble something to send to the client.
+ if ret == nil {
+ ret = new(dns.Msg)
+ ret.SetReply(r)
+ ret.Truncated = true
+ ret.Authoritative = true
+ ret.Rcode = dns.RcodeSuccess
+ }
+ err = nil // and reset err to pass this back to the client.
+ }
+
if err != nil {
+ // Kick off health check to see if *our* upstream is broken.
+ if f.maxfails != 0 {
+ proxy.Healthcheck()
+ }
+
if fails < len(f.proxies) {
continue
}
@@ -140,8 +160,8 @@ func (f *Forward) isAllowedDomain(name string) bool {
func (f *Forward) list() []*Proxy { return f.p.List(f.proxies) }
var (
- errInvalidDomain = errors.New("invalid domain for proxy")
- errNoHealthy = errors.New("no healthy proxies")
+ errInvalidDomain = errors.New("invalid domain for forward")
+ errNoHealthy = errors.New("no healthy proxies or upstream error")
errNoForward = errors.New("no forwarder defined")
)
diff --git a/plugin/forward/forward_test.go b/plugin/forward/forward_test.go
index d467a0efa..26167c25d 100644
--- a/plugin/forward/forward_test.go
+++ b/plugin/forward/forward_test.go
@@ -6,6 +6,7 @@ import (
"github.com/coredns/coredns/plugin/pkg/dnstest"
"github.com/coredns/coredns/plugin/test"
"github.com/coredns/coredns/request"
+
"github.com/miekg/dns"
)
@@ -18,7 +19,7 @@ func TestForward(t *testing.T) {
})
defer s.Close()
- p := NewProxy(s.Addr)
+ p := NewProxy(s.Addr, nil /* not TLS */)
f := New()
f.SetProxy(p)
defer f.Close()
diff --git a/plugin/forward/health.go b/plugin/forward/health.go
index e277f30a6..cd4b96e27 100644
--- a/plugin/forward/health.go
+++ b/plugin/forward/health.go
@@ -1,7 +1,6 @@
package forward
import (
- "log"
"sync/atomic"
"github.com/miekg/dns"
@@ -10,41 +9,25 @@ import (
// For HC we send to . IN NS +norec message to the upstream. Dial timeouts and empty
// replies are considered fails, basically anything else constitutes a healthy upstream.
-func (h *host) Check() {
- h.Lock()
-
- if h.checking {
- h.Unlock()
- return
- }
-
- h.checking = true
- h.Unlock()
-
- err := h.send()
+// Check is used as the up.Func in the up.Probe.
+func (p *Proxy) Check() error {
+ err := p.send()
if err != nil {
- log.Printf("[INFO] healtheck of %s failed with %s", h.addr, err)
-
- HealthcheckFailureCount.WithLabelValues(h.addr).Add(1)
-
- atomic.AddUint32(&h.fails, 1)
- } else {
- atomic.StoreUint32(&h.fails, 0)
+ HealthcheckFailureCount.WithLabelValues(p.addr).Add(1)
+ atomic.AddUint32(&p.fails, 1)
+ return err
}
- h.Lock()
- h.checking = false
- h.Unlock()
-
- return
+ atomic.StoreUint32(&p.fails, 0)
+ return nil
}
-func (h *host) send() error {
+func (p *Proxy) send() error {
hcping := new(dns.Msg)
hcping.SetQuestion(".", dns.TypeNS)
hcping.RecursionDesired = false
- m, _, err := h.client.Exchange(hcping, h.addr)
+ m, _, err := p.client.Exchange(hcping, p.addr)
// If we got a header, we're alright, basically only care about I/O errors 'n stuff
if err != nil && m != nil {
// Silly check, something sane came back
@@ -55,13 +38,3 @@ func (h *host) send() error {
return err
}
-
-// down returns true is this host has more than maxfails fails.
-func (h *host) down(maxfails uint32) bool {
- if maxfails == 0 {
- return false
- }
-
- fails := atomic.LoadUint32(&h.fails)
- return fails > maxfails
-}
diff --git a/plugin/forward/health_test.go b/plugin/forward/health_test.go
new file mode 100644
index 000000000..2698d13e2
--- /dev/null
+++ b/plugin/forward/health_test.go
@@ -0,0 +1,136 @@
+package forward
+
+import (
+ "sync/atomic"
+ "testing"
+ "time"
+
+ "github.com/coredns/coredns/plugin/pkg/dnstest"
+ "github.com/coredns/coredns/plugin/test"
+
+ "github.com/miekg/dns"
+ "golang.org/x/net/context"
+)
+
+func TestHealth(t *testing.T) {
+ const expected = 0
+ i := uint32(0)
+ s := dnstest.NewServer(func(w dns.ResponseWriter, r *dns.Msg) {
+ if r.Question[0].Name == "." {
+ atomic.AddUint32(&i, 1)
+ }
+ ret := new(dns.Msg)
+ ret.SetReply(r)
+ w.WriteMsg(ret)
+ })
+ defer s.Close()
+
+ p := NewProxy(s.Addr, nil /* no TLS */)
+ f := New()
+ f.SetProxy(p)
+ defer f.Close()
+
+ req := new(dns.Msg)
+ req.SetQuestion("example.org.", dns.TypeA)
+
+ f.ServeDNS(context.TODO(), &test.ResponseWriter{}, req)
+
+ time.Sleep(1 * time.Second)
+ i1 := atomic.LoadUint32(&i)
+ if i1 != expected {
+ t.Errorf("Expected number of health checks to be %d, got %d", expected, i1)
+ }
+}
+
+func TestHealthTimeout(t *testing.T) {
+ const expected = 1
+ i := uint32(0)
+ s := dnstest.NewServer(func(w dns.ResponseWriter, r *dns.Msg) {
+ if r.Question[0].Name == "." {
+ // health check, answer
+ atomic.AddUint32(&i, 1)
+ ret := new(dns.Msg)
+ ret.SetReply(r)
+ w.WriteMsg(ret)
+ }
+ // not a health check, do a timeout
+ })
+ defer s.Close()
+
+ p := NewProxy(s.Addr, nil /* no TLS */)
+ f := New()
+ f.SetProxy(p)
+ defer f.Close()
+
+ req := new(dns.Msg)
+ req.SetQuestion("example.org.", dns.TypeA)
+
+ f.ServeDNS(context.TODO(), &test.ResponseWriter{}, req)
+
+ time.Sleep(1 * time.Second)
+ i1 := atomic.LoadUint32(&i)
+ if i1 != expected {
+ t.Errorf("Expected number of health checks to be %d, got %d", expected, i1)
+ }
+}
+
+func TestHealthFailTwice(t *testing.T) {
+ const expected = 2
+ i := uint32(0)
+ s := dnstest.NewServer(func(w dns.ResponseWriter, r *dns.Msg) {
+ if r.Question[0].Name == "." {
+ atomic.AddUint32(&i, 1)
+ i1 := atomic.LoadUint32(&i)
+ // Timeout health until we get the second one
+ if i1 < 2 {
+ return
+ }
+ ret := new(dns.Msg)
+ ret.SetReply(r)
+ w.WriteMsg(ret)
+ }
+ })
+ defer s.Close()
+
+ p := NewProxy(s.Addr, nil /* no TLS */)
+ f := New()
+ f.SetProxy(p)
+ defer f.Close()
+
+ req := new(dns.Msg)
+ req.SetQuestion("example.org.", dns.TypeA)
+
+ f.ServeDNS(context.TODO(), &test.ResponseWriter{}, req)
+
+ time.Sleep(3 * time.Second)
+ i1 := atomic.LoadUint32(&i)
+ if i1 != expected {
+ t.Errorf("Expected number of health checks to be %d, got %d", expected, i1)
+ }
+}
+
+func TestHealthMaxFails(t *testing.T) {
+ const expected = 0
+ i := uint32(0)
+ s := dnstest.NewServer(func(w dns.ResponseWriter, r *dns.Msg) {
+ // timeout
+ })
+ defer s.Close()
+
+ p := NewProxy(s.Addr, nil /* no TLS */)
+ f := New()
+ f.maxfails = 0
+ f.SetProxy(p)
+ defer f.Close()
+
+ req := new(dns.Msg)
+ req.SetQuestion("example.org.", dns.TypeA)
+
+ f.ServeDNS(context.TODO(), &test.ResponseWriter{}, req)
+
+ time.Sleep(1 * time.Second)
+ i1 := atomic.LoadUint32(&i)
+ if i1 != expected {
+ t.Errorf("Expected number of health checks to be %d, got %d", expected, i1)
+ }
+}
diff --git a/plugin/forward/host.go b/plugin/forward/host.go
deleted file mode 100644
index 48d6c7d6e..000000000
--- a/plugin/forward/host.go
+++ /dev/null
@@ -1,44 +0,0 @@
-package forward
-
-import (
- "crypto/tls"
- "sync"
- "time"
-
- "github.com/miekg/dns"
-)
-
-type host struct {
- addr string
- client *dns.Client
-
- tlsConfig *tls.Config
- expire time.Duration
-
- fails uint32
- sync.RWMutex
- checking bool
-}
-
-// newHost returns a new host, the fails are set to 1, i.e.
-// the first healthcheck must succeed before we use this host.
-func newHost(addr string) *host {
- return &host{addr: addr, fails: 1, expire: defaultExpire}
-}
-
-// setClient sets and configures the dns.Client in host.
-func (h *host) SetClient() {
- c := new(dns.Client)
- c.Net = "udp"
- c.ReadTimeout = 2 * time.Second
- c.WriteTimeout = 2 * time.Second
-
- if h.tlsConfig != nil {
- c.Net = "tcp-tls"
- c.TLSConfig = h.tlsConfig
- }
-
- h.client = c
-}
-
-const defaultExpire = 10 * time.Second
diff --git a/plugin/forward/lookup.go b/plugin/forward/lookup.go
index 47c4319cf..d63dc29a3 100644
--- a/plugin/forward/lookup.go
+++ b/plugin/forward/lookup.go
@@ -5,10 +5,6 @@
package forward
import (
- "crypto/tls"
- "log"
- "time"
-
"github.com/coredns/coredns/request"
"github.com/miekg/dns"
@@ -32,12 +28,10 @@ func (f *Forward) Forward(state request.Request) (*dns.Msg, error) {
// All upstream proxies are dead, assume healtcheck is complete broken and randomly
// select an upstream to connect to.
proxy = f.list()[0]
- log.Printf("[WARNING] All upstreams down, picking random one to connect to %s", proxy.host.addr)
}
ret, err := proxy.connect(context.Background(), state, f.forceTCP, true)
if err != nil {
- log.Printf("[WARNING] Failed to connect to %s: %s", proxy.host.addr, err)
if fails < len(f.proxies) {
continue
}
@@ -68,10 +62,11 @@ func (f *Forward) Lookup(state request.Request, name string, typ uint16) (*dns.M
}
// NewLookup returns a Forward that can be used for plugin that need an upstream to resolve external names.
+// Note that the caller must run Close on the forward to stop the health checking goroutines.
func NewLookup(addr []string) *Forward {
- f := &Forward{maxfails: 2, tlsConfig: new(tls.Config), expire: defaultExpire, hcInterval: 2 * time.Second}
+ f := New()
for i := range addr {
- p := NewProxy(addr[i])
+ p := NewProxy(addr[i], nil)
f.SetProxy(p)
}
return f
diff --git a/plugin/forward/lookup_test.go b/plugin/forward/lookup_test.go
index 69c7a1949..e37a0c5d7 100644
--- a/plugin/forward/lookup_test.go
+++ b/plugin/forward/lookup_test.go
@@ -6,6 +6,7 @@ import (
"github.com/coredns/coredns/plugin/pkg/dnstest"
"github.com/coredns/coredns/plugin/test"
"github.com/coredns/coredns/request"
+
"github.com/miekg/dns"
)
@@ -18,7 +19,7 @@ func TestLookup(t *testing.T) {
})
defer s.Close()
- p := NewProxy(s.Addr)
+ p := NewProxy(s.Addr, nil /* no TLS */)
f := New()
f.SetProxy(p)
defer f.Close()
diff --git a/plugin/forward/persistent.go b/plugin/forward/persistent.go
index 6a7c4464e..7bf083b49 100644
--- a/plugin/forward/persistent.go
+++ b/plugin/forward/persistent.go
@@ -1,6 +1,7 @@
package forward
import (
+ "crypto/tls"
"net"
"time"
@@ -21,8 +22,10 @@ type connErr struct {
// transport hold the persistent cache.
type transport struct {
- conns map[string][]*persistConn // Buckets for udp, tcp and tcp-tls.
- host *host
+ conns map[string][]*persistConn // Buckets for udp, tcp and tcp-tls.
+ expire time.Duration // After this duration a connection is expired.
+ addr string
+ tlsConfig *tls.Config
dial chan string
yield chan connErr
@@ -35,10 +38,11 @@ type transport struct {
stop chan bool
}
-func newTransport(h *host) *transport {
+func newTransport(addr string, tlsConfig *tls.Config) *transport {
t := &transport{
conns: make(map[string][]*persistConn),
- host: h,
+ expire: defaultExpire,
+ addr: addr,
dial: make(chan string),
yield: make(chan connErr),
ret: make(chan connErr),
@@ -51,7 +55,7 @@ func newTransport(h *host) *transport {
}
// len returns the number of connection, used for metrics. Can only be safely
-// used inside connManager() because of races.
+// used inside connManager() because of data races.
func (t *transport) len() int {
l := 0
for _, conns := range t.conns {
@@ -79,7 +83,7 @@ Wait:
i := 0
for i = 0; i < len(t.conns[proto]); i++ {
pc := t.conns[proto][i]
- if time.Since(pc.used) < t.host.expire {
+ if time.Since(pc.used) < t.expire {
// Found one, remove from pool and return this conn.
t.conns[proto] = t.conns[proto][i+1:]
t.ret <- connErr{pc.c, nil}
@@ -91,22 +95,22 @@ Wait:
// Not conns were found. Connect to the upstream to create one.
t.conns[proto] = t.conns[proto][i:]
- SocketGauge.WithLabelValues(t.host.addr).Set(float64(t.len()))
+ SocketGauge.WithLabelValues(t.addr).Set(float64(t.len()))
go func() {
if proto != "tcp-tls" {
- c, err := dns.DialTimeout(proto, t.host.addr, dialTimeout)
+ c, err := dns.DialTimeout(proto, t.addr, dialTimeout)
t.ret <- connErr{c, err}
return
}
- c, err := dns.DialTimeoutWithTLS("tcp", t.host.addr, t.host.tlsConfig, dialTimeout)
+ c, err := dns.DialTimeoutWithTLS("tcp", t.addr, t.tlsConfig, dialTimeout)
t.ret <- connErr{c, err}
}()
case conn := <-t.yield:
- SocketGauge.WithLabelValues(t.host.addr).Set(float64(t.len() + 1))
+ SocketGauge.WithLabelValues(t.addr).Set(float64(t.len() + 1))
// no proto here, infer from config and conn
if _, ok := conn.c.Conn.(*net.UDPConn); ok {
@@ -114,7 +118,7 @@ Wait:
continue Wait
}
- if t.host.tlsConfig == nil {
+ if t.tlsConfig == nil {
t.conns["tcp"] = append(t.conns["tcp"], &persistConn{conn.c, time.Now()})
continue Wait
}
@@ -134,15 +138,30 @@ Wait:
}
}
+// Dial dials the address configured in transport, potentially reusing a connection or creating a new one.
func (t *transport) Dial(proto string) (*dns.Conn, error) {
+ // If tls has been configured; use it.
+ if t.tlsConfig != nil {
+ proto = "tcp-tls"
+ }
+
t.dial <- proto
c := <-t.ret
return c.c, c.err
}
+// Yield return the connection to transport for reuse.
func (t *transport) Yield(c *dns.Conn) {
t.yield <- connErr{c, nil}
}
-// Stop stops the transports.
+// Stop stops the transport's connection manager.
func (t *transport) Stop() { t.stop <- true }
+
+// SetExpire sets the connection expire time in transport.
+func (t *transport) SetExpire(expire time.Duration) { t.expire = expire }
+
+// SetTLSConfig sets the TLS config in transport.
+func (t *transport) SetTLSConfig(cfg *tls.Config) { t.tlsConfig = cfg }
+
+const defaultExpire = 10 * time.Second
diff --git a/plugin/forward/persistent_test.go b/plugin/forward/persistent_test.go
index 5674658e6..5fa491a01 100644
--- a/plugin/forward/persistent_test.go
+++ b/plugin/forward/persistent_test.go
@@ -16,8 +16,7 @@ func TestPersistent(t *testing.T) {
})
defer s.Close()
- h := newHost(s.Addr)
- tr := newTransport(h)
+ tr := newTransport(s.Addr, nil /* no TLS */)
defer tr.Stop()
c1, _ := tr.Dial("udp")
diff --git a/plugin/forward/proxy.go b/plugin/forward/proxy.go
index c89490374..30bab52d1 100644
--- a/plugin/forward/proxy.go
+++ b/plugin/forward/proxy.go
@@ -2,47 +2,60 @@ package forward
import (
"crypto/tls"
- "sync"
+ "sync/atomic"
"time"
+ "github.com/coredns/coredns/plugin/pkg/up"
+
"github.com/miekg/dns"
)
// Proxy defines an upstream host.
type Proxy struct {
- host *host
+ addr string
+ client *dns.Client
+ // Connection caching
+ expire time.Duration
transport *transport
- // copied from Forward.
- hcInterval time.Duration
- forceTCP bool
-
- stop chan bool
-
- sync.RWMutex
+ // health checking
+ probe *up.Probe
+ fails uint32
}
// NewProxy returns a new proxy.
-func NewProxy(addr string) *Proxy {
- host := newHost(addr)
-
+func NewProxy(addr string, tlsConfig *tls.Config) *Proxy {
p := &Proxy{
- host: host,
- hcInterval: hcDuration,
- stop: make(chan bool),
- transport: newTransport(host),
+ addr: addr,
+ fails: 0,
+ probe: up.New(),
+ transport: newTransport(addr, tlsConfig),
}
+ p.client = dnsClient(tlsConfig)
return p
}
-// SetTLSConfig sets the TLS config in the lower p.host.
-func (p *Proxy) SetTLSConfig(cfg *tls.Config) { p.host.tlsConfig = cfg }
+// dnsClient returns a client used for health checking.
+func dnsClient(tlsConfig *tls.Config) *dns.Client {
+ c := new(dns.Client)
+ c.Net = "udp"
+ // TODO(miek): this should be half of hcDuration?
+ c.ReadTimeout = 1 * time.Second
+ c.WriteTimeout = 1 * time.Second
+
+ if tlsConfig != nil {
+ c.Net = "tcp-tls"
+ c.TLSConfig = tlsConfig
+ }
+ return c
+}
-// SetExpire sets the expire duration in the lower p.host.
-func (p *Proxy) SetExpire(expire time.Duration) { p.host.expire = expire }
+// SetTLSConfig sets the TLS config in the lower p.transport.
+func (p *Proxy) SetTLSConfig(cfg *tls.Config) { p.transport.SetTLSConfig(cfg) }
-func (p *Proxy) close() { p.stop <- true }
+// SetExpire sets the expire duration in the lower p.transport.
+func (p *Proxy) SetExpire(expire time.Duration) { p.transport.SetExpire(expire) }
// Dial connects to the host in p with the configured transport.
func (p *Proxy) Dial(proto string) (*dns.Conn, error) { return p.transport.Dial(proto) }
@@ -50,26 +63,28 @@ func (p *Proxy) Dial(proto string) (*dns.Conn, error) { return p.transport.Dial(
// Yield returns the connection to the pool.
func (p *Proxy) Yield(c *dns.Conn) { p.transport.Yield(c) }
-// Down returns if this proxy is up or down.
-func (p *Proxy) Down(maxfails uint32) bool { return p.host.down(maxfails) }
+// Healthcheck kicks of a round of health checks for this proxy.
+func (p *Proxy) Healthcheck() { p.probe.Do(p.Check) }
-func (p *Proxy) healthCheck() {
+// Down returns true if this proxy is down, i.e. has *more* fails than maxfails.
+func (p *Proxy) Down(maxfails uint32) bool {
+ if maxfails == 0 {
+ return false
+ }
- // stop channel
- p.host.SetClient()
+ fails := atomic.LoadUint32(&p.fails)
+ return fails > maxfails
+}
- p.host.Check()
- tick := time.NewTicker(p.hcInterval)
- for {
- select {
- case <-tick.C:
- p.host.Check()
- case <-p.stop:
- return
- }
- }
+// close stops the health checking goroutine.
+func (p *Proxy) close() {
+ p.probe.Stop()
+ p.transport.Stop()
}
+// start starts the proxy's healthchecking.
+func (p *Proxy) start(duration time.Duration) { p.probe.Start(duration) }
+
const (
dialTimeout = 4 * time.Second
timeout = 2 * time.Second
diff --git a/plugin/forward/setup.go b/plugin/forward/setup.go
index bed20f0c7..5231bcc22 100644
--- a/plugin/forward/setup.go
+++ b/plugin/forward/setup.go
@@ -62,25 +62,14 @@ func setup(c *caddy.Controller) error {
// OnStartup starts a goroutines for all proxies.
func (f *Forward) OnStartup() (err error) {
- if f.hcInterval == 0 {
- for _, p := range f.proxies {
- p.host.fails = 0
- }
- return nil
- }
-
for _, p := range f.proxies {
- go p.healthCheck()
+ p.start(f.hcInterval)
}
return nil
}
// OnShutdown stops all configured proxies.
func (f *Forward) OnShutdown() error {
- if f.hcInterval == 0 {
- return nil
- }
-
for _, p := range f.proxies {
p.close()
}
@@ -88,9 +77,7 @@ func (f *Forward) OnShutdown() error {
}
// Close is a synonym for OnShutdown().
-func (f *Forward) Close() {
- f.OnShutdown()
-}
+func (f *Forward) Close() { f.OnShutdown() }
func parseForward(c *caddy.Controller) (*Forward, error) {
f := New()
@@ -140,8 +127,8 @@ func parseForward(c *caddy.Controller) (*Forward, error) {
}
// We can't set tlsConfig here, because we haven't parsed it yet.
- // We set it below at the end of parseBlock.
- p := NewProxy(h)
+ // We set it below at the end of parseBlock, use nil now.
+ p := NewProxy(h, nil /* no TLS */)
f.proxies = append(f.proxies, p)
}
@@ -200,17 +187,11 @@ func parseBlock(c *caddy.Controller, f *Forward) error {
return fmt.Errorf("health_check can't be negative: %d", dur)
}
f.hcInterval = dur
- for i := range f.proxies {
- f.proxies[i].hcInterval = dur
- }
case "force_tcp":
if c.NextArg() {
return c.ArgErr()
}
f.forceTCP = true
- for i := range f.proxies {
- f.proxies[i].forceTCP = true
- }
case "tls":
args := c.RemainingArgs()
if len(args) != 3 {
diff --git a/plugin/pkg/up/up.go b/plugin/pkg/up/up.go
index af8de5ed5..06971eef7 100644
--- a/plugin/pkg/up/up.go
+++ b/plugin/pkg/up/up.go
@@ -17,8 +17,8 @@ type Probe struct {
inprogress bool
}
-// Func is used to determine if a target is alive. If so this function must return true.
-type Func func(target string) bool
+// Func is used to determine if a target is alive. If so this function must return nil.
+type Func func() error
// New returns a pointer to an intialized Probe.
func New() *Probe {
@@ -32,9 +32,9 @@ func (p *Probe) Do(f Func) { p.do <- f }
func (p *Probe) Stop() { p.stop <- true }
// Start will start the probe manager, after which probes can be initialized with Do.
-func (p *Probe) Start(target string, interval time.Duration) { go p.start(target, interval) }
+func (p *Probe) Start(interval time.Duration) { go p.start(interval) }
-func (p *Probe) start(target string, interval time.Duration) {
+func (p *Probe) start(interval time.Duration) {
for {
select {
case <-p.stop:
@@ -52,9 +52,10 @@ func (p *Probe) start(target string, interval time.Duration) {
// we return from the goroutine and we can accept another Func to run.
go func() {
for {
- if ok := f(target); ok {
+ if err := f(); err == nil {
break
}
+ // TODO(miek): little bit of exponential backoff here?
time.Sleep(interval)
}
p.Lock()
diff --git a/plugin/pkg/up/up_test.go b/plugin/pkg/up/up_test.go
index cb56658d1..eeaecea3b 100644
--- a/plugin/pkg/up/up_test.go
+++ b/plugin/pkg/up/up_test.go
@@ -12,20 +12,20 @@ func TestUp(t *testing.T) {
wg := sync.WaitGroup{}
hits := int32(0)
- upfunc := func(s string) bool {
+ upfunc := func() error {
atomic.AddInt32(&hits, 1)
// Sleep tiny amount so that our other pr.Do() calls hit the lock.
time.Sleep(3 * time.Millisecond)
wg.Done()
- return true
+ return nil
}
- pr.Start("nonexistent", 5*time.Millisecond)
+ pr.Start(5 * time.Millisecond)
defer pr.Stop()
// These functions AddInt32 to the same hits variable, but we only want to wait when
// upfunc finishes, as that only calls Done() on the waitgroup.
- upfuncNoWg := func(s string) bool { atomic.AddInt32(&hits, 1); return true }
+ upfuncNoWg := func() error { atomic.AddInt32(&hits, 1); return nil }
wg.Add(1)
pr.Do(upfunc)
pr.Do(upfuncNoWg)