diff options
Diffstat (limited to 'plugin/proxy')
-rw-r--r-- | plugin/proxy/README.md | 2 | ||||
-rw-r--r-- | plugin/proxy/down.go | 16 | ||||
-rw-r--r-- | plugin/proxy/google.go | 6 | ||||
-rw-r--r-- | plugin/proxy/grpc_test.go | 1 | ||||
-rw-r--r-- | plugin/proxy/healthcheck_test.go | 66 | ||||
-rw-r--r-- | plugin/proxy/lookup.go | 9 | ||||
-rw-r--r-- | plugin/proxy/proxy.go | 16 | ||||
-rw-r--r-- | plugin/proxy/proxy_test.go | 41 | ||||
-rw-r--r-- | plugin/proxy/upstream.go | 16 |
9 files changed, 103 insertions, 70 deletions
diff --git a/plugin/proxy/README.md b/plugin/proxy/README.md index a064a3418..f8d805f15 100644 --- a/plugin/proxy/README.md +++ b/plugin/proxy/README.md @@ -38,7 +38,7 @@ proxy FROM TO... { random, least_conn, or round_robin. Default is random. * `fail_timeout` specifies how long to consider a backend as down after it has failed. While it is down, requests will not be routed to that backend. A backend is "down" if CoreDNS fails to - communicate with it. The default value is 10 seconds ("10s"). + communicate with it. The default value is 2 seconds ("2s"). * `max_fails` is the number of failures within fail_timeout that are needed before considering a backend to be down. If 0, the backend will never be marked as down. Default is 1. * `health_check` will check **PATH** (on **PORT**) on each backend. If a backend returns a status code of diff --git a/plugin/proxy/down.go b/plugin/proxy/down.go index 5dc8b678d..11f839b46 100644 --- a/plugin/proxy/down.go +++ b/plugin/proxy/down.go @@ -2,7 +2,6 @@ package proxy import ( "sync/atomic" - "time" "github.com/coredns/coredns/plugin/pkg/healthcheck" ) @@ -10,21 +9,10 @@ import ( // Default CheckDown functions for use in the proxy plugin. var checkDownFunc = func(upstream *staticUpstream) healthcheck.UpstreamHostDownFunc { return func(uh *healthcheck.UpstreamHost) bool { - - down := false - - uh.Lock() - until := uh.OkUntil - uh.Unlock() - - if !until.IsZero() && time.Now().After(until) { - down = true - } - fails := atomic.LoadInt32(&uh.Fails) if fails >= upstream.MaxFails && upstream.MaxFails != 0 { - down = true + return true } - return down + return false } } diff --git a/plugin/proxy/google.go b/plugin/proxy/google.go index 91d4cba26..f635fc500 100644 --- a/plugin/proxy/google.go +++ b/plugin/proxy/google.go @@ -194,22 +194,20 @@ func newUpstream(hosts []string, old *staticUpstream) Upstream { HealthCheck: healthcheck.HealthCheck{ FailTimeout: 5 * time.Second, MaxFails: 3, - Future: 12 * time.Second, }, ex: old.ex, IgnoredSubDomains: old.IgnoredSubDomains, } upstream.Hosts = make([]*healthcheck.UpstreamHost, len(hosts)) - for i, h := range hosts { + for i, host := range hosts { uh := &healthcheck.UpstreamHost{ - Name: h, + Name: host, Conns: 0, Fails: 0, FailTimeout: upstream.FailTimeout, CheckDown: checkDownFunc(upstream), } - upstream.Hosts[i] = uh } return upstream diff --git a/plugin/proxy/grpc_test.go b/plugin/proxy/grpc_test.go index 52c5737d6..c6e6e20cc 100644 --- a/plugin/proxy/grpc_test.go +++ b/plugin/proxy/grpc_test.go @@ -28,7 +28,6 @@ func TestStartupShutdown(t *testing.T) { HealthCheck: healthcheck.HealthCheck{ Hosts: pool(), FailTimeout: 10 * time.Second, - Future: 60 * time.Second, MaxFails: 1, }, } diff --git a/plugin/proxy/healthcheck_test.go b/plugin/proxy/healthcheck_test.go new file mode 100644 index 000000000..53b9446ff --- /dev/null +++ b/plugin/proxy/healthcheck_test.go @@ -0,0 +1,66 @@ +package proxy + +import ( + "fmt" + "io/ioutil" + "log" + "net/http" + "net/http/httptest" + "strings" + "sync/atomic" + "testing" + "time" + + "github.com/coredns/coredns/plugin/test" + "github.com/coredns/coredns/request" + + "github.com/mholt/caddy/caddyfile" + "github.com/miekg/dns" +) + +func init() { + log.SetOutput(ioutil.Discard) +} + +func TestUnhealthy(t *testing.T) { + // High HC interval, we want to test the HC after failed queries. + config := "proxy . %s {\n health_check /healthcheck:%s 10s \nfail_timeout 100ms\n}" + + backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + r.Body.Close() + w.Write([]byte("OK")) + })) + defer backend.Close() + + port := backend.URL[17:] // Remove all crap up to the port + back := backend.URL[7:] // Remove http:// + + c := caddyfile.NewDispenser("testfile", strings.NewReader(fmt.Sprintf(config, back, port))) + upstreams, err := NewStaticUpstreams(&c) + if err != nil { + t.Errorf("Expected no error. Got: %s", err) + } + p := &Proxy{Upstreams: &upstreams} + m := new(dns.Msg) + m.SetQuestion("example.org.", dns.TypeA) + state := request.Request{W: &test.ResponseWriter{}, Req: m} + + // Should all fail. + for j := 0; j < failureCheck; j++ { + if _, err := p.Forward(state); err == nil { + t.Errorf("Expected error. Got: nil") + } + } + + fails := atomic.LoadInt32(&upstreams[0].(*staticUpstream).Hosts[0].Fails) + if fails != 3 { + t.Errorf("Expected %d fails, got %d", 3, fails) + } + // HC should be kicked off, and reset the counter to 0 + i := 0 + for fails != 0 { + fails = atomic.LoadInt32(&upstreams[0].(*staticUpstream).Hosts[0].Fails) + time.Sleep(100 * time.Microsecond) + i++ + } +} diff --git a/plugin/proxy/lookup.go b/plugin/proxy/lookup.go index fc0f3e01f..51b8c4690 100644 --- a/plugin/proxy/lookup.go +++ b/plugin/proxy/lookup.go @@ -29,7 +29,6 @@ func NewLookupWithOption(hosts []string, opts Options) Proxy { HealthCheck: healthcheck.HealthCheck{ FailTimeout: 5 * time.Second, MaxFails: 3, - Future: 12 * time.Second, }, ex: newDNSExWithOption(opts), } @@ -38,8 +37,6 @@ func NewLookupWithOption(hosts []string, opts Options) Proxy { for i, host := range hosts { uh := &healthcheck.UpstreamHost{ Name: host, - Conns: 0, - Fails: 0, FailTimeout: upstream.FailTimeout, CheckDown: checkDownFunc(upstream), } @@ -106,14 +103,18 @@ func (p Proxy) lookup(state request.Request) (*dns.Msg, error) { timeout := host.FailTimeout if timeout == 0 { - timeout = 2 * time.Second + timeout = defaultFailTimeout } atomic.AddInt32(&host.Fails, 1) + fails := atomic.LoadInt32(&host.Fails) go func(host *healthcheck.UpstreamHost, timeout time.Duration) { time.Sleep(timeout) atomic.AddInt32(&host.Fails, -1) + if fails%failureCheck == 0 { // Kick off healthcheck on eveyry third failure. + host.HealthCheckURL() + } }(host, timeout) } return nil, fmt.Errorf("%s: %s", errUnreachable, backendErr) diff --git a/plugin/proxy/proxy.go b/plugin/proxy/proxy.go index f0e6eadad..7b2abd89e 100644 --- a/plugin/proxy/proxy.go +++ b/plugin/proxy/proxy.go @@ -127,14 +127,19 @@ func (p Proxy) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg) ( timeout := host.FailTimeout if timeout == 0 { - timeout = 2 * time.Second + timeout = defaultFailTimeout } atomic.AddInt32(&host.Fails, 1) + fails := atomic.LoadInt32(&host.Fails) go func(host *healthcheck.UpstreamHost, timeout time.Duration) { time.Sleep(timeout) + // we may go negative here, should be rectified by the HC. atomic.AddInt32(&host.Fails, -1) + if fails%failureCheck == 0 { // Kick off healthcheck on eveyry third failure. + host.HealthCheckURL() + } }(host, timeout) } @@ -167,9 +172,6 @@ func (p Proxy) match(state request.Request) (u Upstream) { // Name implements the Handler interface. func (p Proxy) Name() string { return "proxy" } -// defaultTimeout is the default networking timeout for DNS requests. -const defaultTimeout = 5 * time.Second - func toDnstap(ctx context.Context, host string, ex Exchanger, state request.Request, reply *dns.Msg, queryEpoch, respEpoch uint64) (err error) { if tapper := dnstap.TapperFromContext(ctx); tapper != nil { // Query @@ -206,3 +208,9 @@ func toDnstap(ctx context.Context, host string, ex Exchanger, state request.Requ } return } + +const ( + defaultFailTimeout = 2 * time.Second + defaultTimeout = 5 * time.Second + failureCheck = 3 +) diff --git a/plugin/proxy/proxy_test.go b/plugin/proxy/proxy_test.go index b0cb9c3cb..0d29c2329 100644 --- a/plugin/proxy/proxy_test.go +++ b/plugin/proxy/proxy_test.go @@ -15,29 +15,16 @@ import ( func TestStop(t *testing.T) { config := "proxy . %s {\n health_check /healthcheck:%s %dms \n}" tests := []struct { - name string intervalInMilliseconds int numHealthcheckIntervals int }{ - { - "No Healthchecks After Stop - 5ms, 1 intervals", - 5, - 1, - }, - { - "No Healthchecks After Stop - 5ms, 2 intervals", - 5, - 2, - }, - { - "No Healthchecks After Stop - 5ms, 3 intervals", - 5, - 3, - }, + {5, 1}, + {5, 2}, + {5, 3}, } - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { + for i, test := range tests { + t.Run(fmt.Sprintf("Test %d", i), func(t *testing.T) { // Set up proxy. var counter int64 @@ -53,7 +40,7 @@ func TestStop(t *testing.T) { c := caddyfile.NewDispenser("Testfile", strings.NewReader(fmt.Sprintf(config, back, port, test.intervalInMilliseconds))) upstreams, err := NewStaticUpstreams(&c) if err != nil { - t.Error("Expected no error. Got:", err.Error()) + t.Errorf("Test %d, expected no error. Got: %s", i, err) } // Give some time for healthchecks to hit the server. @@ -61,27 +48,25 @@ func TestStop(t *testing.T) { for _, upstream := range upstreams { if err := upstream.Stop(); err != nil { - t.Error("Expected no error stopping upstream. Got: ", err.Error()) + t.Errorf("Test %d, expected no error stopping upstream, got: %s", i, err) } } - counterValueAfterShutdown := atomic.LoadInt64(&counter) + counterAfterShutdown := atomic.LoadInt64(&counter) // Give some time to see if healthchecks are still hitting the server. time.Sleep(time.Duration(test.intervalInMilliseconds*test.numHealthcheckIntervals) * time.Millisecond) - if counterValueAfterShutdown == 0 { - t.Error("Expected healthchecks to hit test server. Got no healthchecks.") + if counterAfterShutdown == 0 { + t.Errorf("Test %d, Expected healthchecks to hit test server, got none", i) } // health checks are in a go routine now, so one may well occur after we shutdown, // but we only ever expect one more - counterValueAfterWaiting := atomic.LoadInt64(&counter) - if counterValueAfterWaiting > (counterValueAfterShutdown + 1) { - t.Errorf("Expected no more healthchecks after shutdown. Got: %d healthchecks after shutdown", counterValueAfterWaiting-counterValueAfterShutdown) + counterAfterWaiting := atomic.LoadInt64(&counter) + if counterAfterWaiting > (counterAfterShutdown + 1) { + t.Errorf("Test %d, expected no more healthchecks after shutdown. got: %d healthchecks after shutdown", i, counterAfterWaiting-counterAfterShutdown) } - }) - } } diff --git a/plugin/proxy/upstream.go b/plugin/proxy/upstream.go index 0ab29de51..151fcad60 100644 --- a/plugin/proxy/upstream.go +++ b/plugin/proxy/upstream.go @@ -33,7 +33,6 @@ func NewStaticUpstreams(c *caddyfile.Dispenser) ([]Upstream, error) { HealthCheck: healthcheck.HealthCheck{ FailTimeout: 5 * time.Second, MaxFails: 3, - Future: 12 * time.Second, }, ex: newDNSEx(), } @@ -61,15 +60,13 @@ func NewStaticUpstreams(c *caddyfile.Dispenser) ([]Upstream, error) { } upstream.Hosts = make([]*healthcheck.UpstreamHost, len(toHosts)) + for i, host := range toHosts { uh := &healthcheck.UpstreamHost{ Name: host, - Conns: 0, - Fails: 0, FailTimeout: upstream.FailTimeout, CheckDown: checkDownFunc(upstream), } - upstream.Hosts[i] = uh } upstream.Start() @@ -79,10 +76,6 @@ func NewStaticUpstreams(c *caddyfile.Dispenser) ([]Upstream, error) { return upstreams, nil } -func (u *staticUpstream) From() string { - return u.from -} - func parseBlock(c *caddyfile.Dispenser, u *staticUpstream) error { switch c.Val() { case "policy": @@ -128,12 +121,6 @@ func parseBlock(c *caddyfile.Dispenser, u *staticUpstream) error { return err } u.HealthCheck.Interval = dur - u.Future = 2 * dur - - // set a minimum of 3 seconds - if u.Future < (3 * time.Second) { - u.Future = 3 * time.Second - } } case "except": ignoredDomains := c.RemainingArgs() @@ -204,3 +191,4 @@ func (u *staticUpstream) IsAllowedDomain(name string) bool { } func (u *staticUpstream) Exchanger() Exchanger { return u.ex } +func (u *staticUpstream) From() string { return u.from } |