From a1b175ef78783df180dffe50d756a85c66cfb1b7 Mon Sep 17 00:00:00 2001 From: Yong Tang Date: Wed, 9 Aug 2017 09:21:33 -0700 Subject: Move Healthcheck to middleware/pkg/healthcheck (#854) * Move healthcheck out Signed-off-by: Yong Tang * Move healthcheck to middleware/pkg/healthcheck Signed-off-by: Yong Tang --- middleware/pkg/healthcheck/healthcheck.go | 236 ++++++++++++++++++++++++++++++ middleware/pkg/healthcheck/policy.go | 119 +++++++++++++++ middleware/pkg/healthcheck/policy_test.go | 141 ++++++++++++++++++ middleware/proxy/google.go | 26 ++-- middleware/proxy/grpc_test.go | 20 +-- middleware/proxy/lookup.go | 29 ++-- middleware/proxy/policy.go | 113 -------------- middleware/proxy/policy_test.go | 88 ----------- middleware/proxy/proxy.go | 45 +----- middleware/proxy/upstream.go | 226 +++------------------------- middleware/proxy/upstream_test.go | 58 -------- 11 files changed, 556 insertions(+), 545 deletions(-) create mode 100644 middleware/pkg/healthcheck/healthcheck.go create mode 100644 middleware/pkg/healthcheck/policy.go create mode 100644 middleware/pkg/healthcheck/policy_test.go delete mode 100644 middleware/proxy/policy.go delete mode 100644 middleware/proxy/policy_test.go diff --git a/middleware/pkg/healthcheck/healthcheck.go b/middleware/pkg/healthcheck/healthcheck.go new file mode 100644 index 000000000..e0152a47b --- /dev/null +++ b/middleware/pkg/healthcheck/healthcheck.go @@ -0,0 +1,236 @@ +package healthcheck + +import ( + "io" + "io/ioutil" + "log" + "net" + "net/http" + "net/url" + "sync" + "sync/atomic" + "time" +) + +// UpstreamHostDownFunc can be used to customize how Down behaves. +type UpstreamHostDownFunc func(*UpstreamHost) bool + +// UpstreamHost represents a single proxy upstream +type UpstreamHost struct { + Conns int64 // must be first field to be 64-bit aligned on 32-bit systems + Name string // IP address (and port) of this upstream host + Fails int32 + FailTimeout time.Duration + OkUntil time.Time + CheckDown UpstreamHostDownFunc + CheckURL string + WithoutPathPrefix string + Checking bool + CheckMu sync.Mutex +} + +// Down checks whether the upstream host is down or not. +// Down will try to use uh.CheckDown first, and will fall +// back to some default criteria if necessary. +func (uh *UpstreamHost) Down() bool { + if uh.CheckDown == nil { + // Default settings + fails := atomic.LoadInt32(&uh.Fails) + after := false + + uh.CheckMu.Lock() + until := uh.OkUntil + uh.CheckMu.Unlock() + + if !until.IsZero() && time.Now().After(until) { + after = true + } + + return after || fails > 0 + } + return uh.CheckDown(uh) +} + +// HostPool is a collection of UpstreamHosts. +type HostPool []*UpstreamHost + +type HealthCheck struct { + wg sync.WaitGroup // Used to wait for running goroutines to stop. + stop chan struct{} // Signals running goroutines to stop. + Hosts HostPool + Policy Policy + Spray Policy + FailTimeout time.Duration + MaxFails int32 + Future time.Duration + Path string + Port string + Interval time.Duration +} + +func (u *HealthCheck) Start() { + u.stop = make(chan struct{}) + if u.Path != "" { + u.wg.Add(1) + go func() { + defer u.wg.Done() + u.HealthCheckWorker(u.stop) + }() + } +} + +// Stop sends a signal to all goroutines started by this staticUpstream to exit +// and waits for them to finish before returning. +func (u *HealthCheck) Stop() error { + close(u.stop) + u.wg.Wait() + return nil +} + +// This was moved into a thread so that each host could throw a health +// check at the same time. The reason for this is that if we are checking +// 3 hosts, and the first one is gone, and we spend minutes timing out to +// fail it, we would not have been doing any other health checks in that +// time. So we now have a per-host lock and a threaded health check. +// +// We use the Checking bool to avoid concurrent checks against the same +// host; if one is taking a long time, the next one will find a check in +// progress and simply return before trying. +// +// We are carefully avoiding having the mutex locked while we check, +// otherwise checks will back up, potentially a lot of them if a host is +// absent for a long time. This arrangement makes checks quickly see if +// they are the only one running and abort otherwise. +func healthCheckURL(nextTs time.Time, host *UpstreamHost) { + + // lock for our bool check. We don't just defer the unlock because + // we don't want the lock held while http.Get runs + host.CheckMu.Lock() + + // are we mid check? Don't run another one + if host.Checking { + host.CheckMu.Unlock() + return + } + + host.Checking = true + host.CheckMu.Unlock() + + //log.Printf("[DEBUG] Healthchecking %s, nextTs is %s\n", url, nextTs.Local()) + + // fetch that url. This has been moved into a go func because + // when the remote host is not merely not serving, but actually + // absent, then tcp syn timeouts can be very long, and so one + // fetch could last several check intervals + if r, err := http.Get(host.CheckURL); err == nil { + io.Copy(ioutil.Discard, r.Body) + r.Body.Close() + + if r.StatusCode < 200 || r.StatusCode >= 400 { + log.Printf("[WARNING] Host %s health check returned HTTP code %d\n", + host.Name, r.StatusCode) + nextTs = time.Unix(0, 0) + } + } else { + log.Printf("[WARNING] Host %s health check probe failed: %v\n", host.Name, err) + nextTs = time.Unix(0, 0) + } + + host.CheckMu.Lock() + host.Checking = false + host.OkUntil = nextTs + host.CheckMu.Unlock() +} + +func (u *HealthCheck) healthCheck() { + for _, host := range u.Hosts { + + if host.CheckURL == "" { + var hostName, checkPort string + + // The DNS server might be an HTTP server. If so, extract its name. + ret, err := url.Parse(host.Name) + if err == nil && len(ret.Host) > 0 { + hostName = ret.Host + } else { + hostName = host.Name + } + + // Extract the port number from the parsed server name. + checkHostName, checkPort, err := net.SplitHostPort(hostName) + if err != nil { + checkHostName = hostName + } + + if u.Port != "" { + checkPort = u.Port + } + + host.CheckURL = "http://" + net.JoinHostPort(checkHostName, checkPort) + u.Path + } + + // calculate this before the get + nextTs := time.Now().Add(u.Future) + + // locks/bools should prevent requests backing up + go healthCheckURL(nextTs, host) + } +} + +func (u *HealthCheck) HealthCheckWorker(stop chan struct{}) { + ticker := time.NewTicker(u.Interval) + u.healthCheck() + for { + select { + case <-ticker.C: + u.healthCheck() + case <-stop: + ticker.Stop() + return + } + } +} + +func (u *HealthCheck) Select() *UpstreamHost { + pool := u.Hosts + if len(pool) == 1 { + if pool[0].Down() && u.Spray == nil { + return nil + } + return pool[0] + } + allDown := true + for _, host := range pool { + if !host.Down() { + allDown = false + break + } + } + if allDown { + if u.Spray == nil { + return nil + } + return u.Spray.Select(pool) + } + + if u.Policy == nil { + h := (&Random{}).Select(pool) + if h != nil { + return h + } + if h == nil && u.Spray == nil { + return nil + } + return u.Spray.Select(pool) + } + + h := u.Policy.Select(pool) + if h != nil { + return h + } + + if u.Spray == nil { + return nil + } + return u.Spray.Select(pool) +} diff --git a/middleware/pkg/healthcheck/policy.go b/middleware/pkg/healthcheck/policy.go new file mode 100644 index 000000000..0cef8d79a --- /dev/null +++ b/middleware/pkg/healthcheck/policy.go @@ -0,0 +1,119 @@ +package healthcheck + +import ( + "log" + "math/rand" + "sync/atomic" +) + +var ( + SupportedPolicies = make(map[string]func() Policy) +) + +// RegisterPolicy adds a custom policy to the proxy. +func RegisterPolicy(name string, policy func() Policy) { + SupportedPolicies[name] = policy +} + +// Policy decides how a host will be selected from a pool. When all hosts are unhealthy, it is assumed the +// healthchecking failed. In this case each policy will *randomly* return a host from the pool to prevent +// no traffic to go through at all. +type Policy interface { + Select(pool HostPool) *UpstreamHost +} + +func init() { + RegisterPolicy("random", func() Policy { return &Random{} }) + RegisterPolicy("least_conn", func() Policy { return &LeastConn{} }) + RegisterPolicy("round_robin", func() Policy { return &RoundRobin{} }) +} + +// Random is a policy that selects up hosts from a pool at random. +type Random struct{} + +// Select selects an up host at random from the specified pool. +func (r *Random) Select(pool HostPool) *UpstreamHost { + // instead of just generating a random index + // this is done to prevent selecting a down host + var randHost *UpstreamHost + count := 0 + for _, host := range pool { + if host.Down() { + continue + } + count++ + if count == 1 { + randHost = host + } else { + r := rand.Int() % count + if r == (count - 1) { + randHost = host + } + } + } + return randHost +} + +// Spray is a policy that selects a host from a pool at random. This should be used as a last ditch +// attempt to get a host when all hosts are reporting unhealthy. +type Spray struct{} + +// Select selects an up host at random from the specified pool. +func (r *Spray) Select(pool HostPool) *UpstreamHost { + rnd := rand.Int() % len(pool) + randHost := pool[rnd] + log.Printf("[WARNING] All hosts reported as down, spraying to target: %s", randHost.Name) + return randHost +} + +// LeastConn is a policy that selects the host with the least connections. +type LeastConn struct{} + +// Select selects the up host with the least number of connections in the +// pool. If more than one host has the same least number of connections, +// one of the hosts is chosen at random. +func (r *LeastConn) Select(pool HostPool) *UpstreamHost { + var bestHost *UpstreamHost + count := 0 + leastConn := int64(1<<63 - 1) + for _, host := range pool { + if host.Down() { + continue + } + hostConns := host.Conns + if hostConns < leastConn { + bestHost = host + leastConn = hostConns + count = 1 + } else if hostConns == leastConn { + // randomly select host among hosts with least connections + count++ + if count == 1 { + bestHost = host + } else { + r := rand.Int() % count + if r == (count - 1) { + bestHost = host + } + } + } + } + return bestHost +} + +// RoundRobin is a policy that selects hosts based on round robin ordering. +type RoundRobin struct { + Robin uint32 +} + +// Select selects an up host from the pool using a round robin ordering scheme. +func (r *RoundRobin) Select(pool HostPool) *UpstreamHost { + poolLen := uint32(len(pool)) + selection := atomic.AddUint32(&r.Robin, 1) % poolLen + host := pool[selection] + // if the currently selected host is down, just ffwd to up host + for i := uint32(1); host.Down() && i < poolLen; i++ { + host = pool[(selection+i)%poolLen] + } + return host +} diff --git a/middleware/pkg/healthcheck/policy_test.go b/middleware/pkg/healthcheck/policy_test.go new file mode 100644 index 000000000..16cae7266 --- /dev/null +++ b/middleware/pkg/healthcheck/policy_test.go @@ -0,0 +1,141 @@ +package healthcheck + +import ( + "io/ioutil" + "log" + "net/http" + "net/http/httptest" + "os" + "testing" + "time" +) + +var workableServer *httptest.Server + +func TestMain(m *testing.M) { + workableServer = httptest.NewServer(http.HandlerFunc( + func(w http.ResponseWriter, r *http.Request) { + // do nothing + })) + r := m.Run() + workableServer.Close() + os.Exit(r) +} + +type customPolicy struct{} + +func (r *customPolicy) Select(pool HostPool) *UpstreamHost { + return pool[0] +} + +func testPool() HostPool { + pool := []*UpstreamHost{ + { + Name: workableServer.URL, // this should resolve (healthcheck test) + }, + { + Name: "http://shouldnot.resolve", // this shouldn't + }, + { + Name: "http://C", + }, + } + return HostPool(pool) +} + +func TestRegisterPolicy(t *testing.T) { + name := "custom" + customPolicy := &customPolicy{} + RegisterPolicy(name, func() Policy { return customPolicy }) + if _, ok := SupportedPolicies[name]; !ok { + t.Error("Expected supportedPolicies to have a custom policy.") + } + +} + +func TestHealthCheck(t *testing.T) { + log.SetOutput(ioutil.Discard) + + u := &HealthCheck{ + Hosts: testPool(), + FailTimeout: 10 * time.Second, + Future: 60 * time.Second, + MaxFails: 1, + } + + u.healthCheck() + // sleep a bit, it's async now + time.Sleep(time.Duration(2 * time.Second)) + + if u.Hosts[0].Down() { + t.Error("Expected first host in testpool to not fail healthcheck.") + } + if !u.Hosts[1].Down() { + t.Error("Expected second host in testpool to fail healthcheck.") + } +} + +func TestSelect(t *testing.T) { + u := &HealthCheck{ + Hosts: testPool()[:3], + FailTimeout: 10 * time.Second, + Future: 60 * time.Second, + MaxFails: 1, + } + u.Hosts[0].OkUntil = time.Unix(0, 0) + u.Hosts[1].OkUntil = time.Unix(0, 0) + u.Hosts[2].OkUntil = time.Unix(0, 0) + if h := u.Select(); h != nil { + t.Error("Expected select to return nil as all host are down") + } + u.Hosts[2].OkUntil = time.Time{} + if h := u.Select(); h == nil { + t.Error("Expected select to not return nil") + } +} + +func TestRoundRobinPolicy(t *testing.T) { + pool := testPool() + rrPolicy := &RoundRobin{} + h := rrPolicy.Select(pool) + // First selected host is 1, because counter starts at 0 + // and increments before host is selected + if h != pool[1] { + t.Error("Expected first round robin host to be second host in the pool.") + } + h = rrPolicy.Select(pool) + if h != pool[2] { + t.Error("Expected second round robin host to be third host in the pool.") + } + // mark host as down + pool[0].OkUntil = time.Unix(0, 0) + h = rrPolicy.Select(pool) + if h != pool[1] { + t.Error("Expected third round robin host to be first host in the pool.") + } +} + +func TestLeastConnPolicy(t *testing.T) { + pool := testPool() + lcPolicy := &LeastConn{} + pool[0].Conns = 10 + pool[1].Conns = 10 + h := lcPolicy.Select(pool) + if h != pool[2] { + t.Error("Expected least connection host to be third host.") + } + pool[2].Conns = 100 + h = lcPolicy.Select(pool) + if h != pool[0] && h != pool[1] { + t.Error("Expected least connection host to be first or second host.") + } +} + +func TestCustomPolicy(t *testing.T) { + pool := testPool() + customPolicy := &customPolicy{} + h := customPolicy.Select(pool) + if h != pool[0] { + t.Error("Expected custom policy host to be the first host.") + } +} diff --git a/middleware/proxy/google.go b/middleware/proxy/google.go index f021bb2b3..b71d0fb1b 100644 --- a/middleware/proxy/google.go +++ b/middleware/proxy/google.go @@ -14,6 +14,7 @@ import ( "time" "github.com/coredns/coredns/middleware/pkg/debug" + "github.com/coredns/coredns/middleware/pkg/healthcheck" "github.com/coredns/coredns/request" "github.com/miekg/dns" @@ -200,34 +201,33 @@ func extractAnswer(m *dns.Msg) ([]string, error) { // newUpstream returns an upstream initialized with hosts. func newUpstream(hosts []string, old *staticUpstream) Upstream { upstream := &staticUpstream{ - from: old.from, - Hosts: nil, - Policy: &Random{}, - Spray: nil, - FailTimeout: 10 * time.Second, - MaxFails: 3, - Future: 60 * time.Second, + from: old.from, + HealthCheck: healthcheck.HealthCheck{ + FailTimeout: 10 * time.Second, + MaxFails: 3, + Future: 60 * time.Second, + }, ex: old.ex, WithoutPathPrefix: old.WithoutPathPrefix, IgnoredSubDomains: old.IgnoredSubDomains, } - upstream.Hosts = make([]*UpstreamHost, len(hosts)) + upstream.Hosts = make([]*healthcheck.UpstreamHost, len(hosts)) for i, h := range hosts { - uh := &UpstreamHost{ + uh := &healthcheck.UpstreamHost{ Name: h, Conns: 0, Fails: 0, FailTimeout: upstream.FailTimeout, - CheckDown: func(upstream *staticUpstream) UpstreamHostDownFunc { - return func(uh *UpstreamHost) bool { + CheckDown: func(upstream *staticUpstream) healthcheck.UpstreamHostDownFunc { + return func(uh *healthcheck.UpstreamHost) bool { down := false - uh.checkMu.Lock() + uh.CheckMu.Lock() until := uh.OkUntil - uh.checkMu.Unlock() + uh.CheckMu.Unlock() if !until.IsZero() && time.Now().After(until) { down = true diff --git a/middleware/proxy/grpc_test.go b/middleware/proxy/grpc_test.go index e303e1594..dcde7cc0e 100644 --- a/middleware/proxy/grpc_test.go +++ b/middleware/proxy/grpc_test.go @@ -4,11 +4,13 @@ import ( "testing" "time" + "github.com/coredns/coredns/middleware/pkg/healthcheck" + "google.golang.org/grpc/grpclog" ) -func pool() []*UpstreamHost { - return []*UpstreamHost{ +func pool() []*healthcheck.UpstreamHost { + return []*healthcheck.UpstreamHost{ { Name: "localhost:10053", }, @@ -22,13 +24,13 @@ func TestStartupShutdown(t *testing.T) { grpclog.SetLogger(discard{}) upstream := &staticUpstream{ - from: ".", - Hosts: pool(), - Policy: &Random{}, - Spray: nil, - FailTimeout: 10 * time.Second, - Future: 60 * time.Second, - MaxFails: 1, + from: ".", + HealthCheck: healthcheck.HealthCheck{ + Hosts: pool(), + FailTimeout: 10 * time.Second, + Future: 60 * time.Second, + MaxFails: 1, + }, } g := newGrpcClient(nil, upstream) upstream.ex = g diff --git a/middleware/proxy/lookup.go b/middleware/proxy/lookup.go index a6c714f39..eda0d0a0c 100644 --- a/middleware/proxy/lookup.go +++ b/middleware/proxy/lookup.go @@ -7,6 +7,7 @@ import ( "sync/atomic" "time" + "github.com/coredns/coredns/middleware/pkg/healthcheck" "github.com/coredns/coredns/request" "github.com/miekg/dns" @@ -24,31 +25,31 @@ func NewLookupWithOption(hosts []string, opts Options) Proxy { // TODO(miek): this needs to be unified with upstream.go's NewStaticUpstreams, caddy uses NewHost // we should copy/make something similar. upstream := &staticUpstream{ - from: ".", - Hosts: make([]*UpstreamHost, len(hosts)), - Policy: &Random{}, - Spray: nil, - FailTimeout: 10 * time.Second, - MaxFails: 3, // TODO(miek): disable error checking for simple lookups? - Future: 60 * time.Second, - ex: newDNSExWithOption(opts), + from: ".", + HealthCheck: healthcheck.HealthCheck{ + FailTimeout: 10 * time.Second, + MaxFails: 3, // TODO(miek): disable error checking for simple lookups? + Future: 60 * time.Second, + }, + ex: newDNSExWithOption(opts), } + upstream.Hosts = make([]*healthcheck.UpstreamHost, len(hosts)) for i, host := range hosts { - uh := &UpstreamHost{ + uh := &healthcheck.UpstreamHost{ Name: host, Conns: 0, Fails: 0, FailTimeout: upstream.FailTimeout, - CheckDown: func(upstream *staticUpstream) UpstreamHostDownFunc { - return func(uh *UpstreamHost) bool { + CheckDown: func(upstream *staticUpstream) healthcheck.UpstreamHostDownFunc { + return func(uh *healthcheck.UpstreamHost) bool { down := false - uh.checkMu.Lock() + uh.CheckMu.Lock() until := uh.OkUntil - uh.checkMu.Unlock() + uh.CheckMu.Unlock() if !until.IsZero() && time.Now().After(until) { down = true @@ -120,7 +121,7 @@ func (p Proxy) lookup(state request.Request) (*dns.Msg, error) { timeout = 10 * time.Second } atomic.AddInt32(&host.Fails, 1) - go func(host *UpstreamHost, timeout time.Duration) { + go func(host *healthcheck.UpstreamHost, timeout time.Duration) { time.Sleep(timeout) atomic.AddInt32(&host.Fails, -1) }(host, timeout) diff --git a/middleware/proxy/policy.go b/middleware/proxy/policy.go deleted file mode 100644 index e0c9d7e2b..000000000 --- a/middleware/proxy/policy.go +++ /dev/null @@ -1,113 +0,0 @@ -package proxy - -import ( - "log" - "math/rand" - "sync/atomic" -) - -// HostPool is a collection of UpstreamHosts. -type HostPool []*UpstreamHost - -// Policy decides how a host will be selected from a pool. When all hosts are unhealthy, it is assumed the -// healthchecking failed. In this case each policy will *randomly* return a host from the pool to prevent -// no traffic to go through at all. -type Policy interface { - Select(pool HostPool) *UpstreamHost -} - -func init() { - RegisterPolicy("random", func() Policy { return &Random{} }) - RegisterPolicy("least_conn", func() Policy { return &LeastConn{} }) - RegisterPolicy("round_robin", func() Policy { return &RoundRobin{} }) -} - -// Random is a policy that selects up hosts from a pool at random. -type Random struct{} - -// Select selects an up host at random from the specified pool. -func (r *Random) Select(pool HostPool) *UpstreamHost { - // instead of just generating a random index - // this is done to prevent selecting a down host - var randHost *UpstreamHost - count := 0 - for _, host := range pool { - if host.Down() { - continue - } - count++ - if count == 1 { - randHost = host - } else { - r := rand.Int() % count - if r == (count - 1) { - randHost = host - } - } - } - return randHost -} - -// Spray is a policy that selects a host from a pool at random. This should be used as a last ditch -// attempt to get a host when all hosts are reporting unhealthy. -type Spray struct{} - -// Select selects an up host at random from the specified pool. -func (r *Spray) Select(pool HostPool) *UpstreamHost { - rnd := rand.Int() % len(pool) - randHost := pool[rnd] - log.Printf("[WARNING] All hosts reported as down, spraying to target: %s", randHost.Name) - return randHost -} - -// LeastConn is a policy that selects the host with the least connections. -type LeastConn struct{} - -// Select selects the up host with the least number of connections in the -// pool. If more than one host has the same least number of connections, -// one of the hosts is chosen at random. -func (r *LeastConn) Select(pool HostPool) *UpstreamHost { - var bestHost *UpstreamHost - count := 0 - leastConn := int64(1<<63 - 1) - for _, host := range pool { - if host.Down() { - continue - } - hostConns := host.Conns - if hostConns < leastConn { - bestHost = host - leastConn = hostConns - count = 1 - } else if hostConns == leastConn { - // randomly select host among hosts with least connections - count++ - if count == 1 { - bestHost = host - } else { - r := rand.Int() % count - if r == (count - 1) { - bestHost = host - } - } - } - } - return bestHost -} - -// RoundRobin is a policy that selects hosts based on round robin ordering. -type RoundRobin struct { - Robin uint32 -} - -// Select selects an up host from the pool using a round robin ordering scheme. -func (r *RoundRobin) Select(pool HostPool) *UpstreamHost { - poolLen := uint32(len(pool)) - selection := atomic.AddUint32(&r.Robin, 1) % poolLen - host := pool[selection] - // if the currently selected host is down, just ffwd to up host - for i := uint32(1); host.Down() && i < poolLen; i++ { - host = pool[(selection+i)%poolLen] - } - return host -} diff --git a/middleware/proxy/policy_test.go b/middleware/proxy/policy_test.go deleted file mode 100644 index 24fd3efdc..000000000 --- a/middleware/proxy/policy_test.go +++ /dev/null @@ -1,88 +0,0 @@ -package proxy - -import ( - "net/http" - "net/http/httptest" - "os" - "testing" - "time" -) - -var workableServer *httptest.Server - -func TestMain(m *testing.M) { - workableServer = httptest.NewServer(http.HandlerFunc( - func(w http.ResponseWriter, r *http.Request) { - // do nothing - })) - r := m.Run() - workableServer.Close() - os.Exit(r) -} - -type customPolicy struct{} - -func (r *customPolicy) Select(pool HostPool) *UpstreamHost { - return pool[0] -} - -func testPool() HostPool { - pool := []*UpstreamHost{ - { - Name: workableServer.URL, // this should resolve (healthcheck test) - }, - { - Name: "http://shouldnot.resolve", // this shouldn't - }, - { - Name: "http://C", - }, - } - return HostPool(pool) -} - -func TestRoundRobinPolicy(t *testing.T) { - pool := testPool() - rrPolicy := &RoundRobin{} - h := rrPolicy.Select(pool) - // First selected host is 1, because counter starts at 0 - // and increments before host is selected - if h != pool[1] { - t.Error("Expected first round robin host to be second host in the pool.") - } - h = rrPolicy.Select(pool) - if h != pool[2] { - t.Error("Expected second round robin host to be third host in the pool.") - } - // mark host as down - pool[0].OkUntil = time.Unix(0, 0) - h = rrPolicy.Select(pool) - if h != pool[1] { - t.Error("Expected third round robin host to be first host in the pool.") - } -} - -func TestLeastConnPolicy(t *testing.T) { - pool := testPool() - lcPolicy := &LeastConn{} - pool[0].Conns = 10 - pool[1].Conns = 10 - h := lcPolicy.Select(pool) - if h != pool[2] { - t.Error("Expected least connection host to be third host.") - } - pool[2].Conns = 100 - h = lcPolicy.Select(pool) - if h != pool[0] && h != pool[1] { - t.Error("Expected least connection host to be first or second host.") - } -} - -func TestCustomPolicy(t *testing.T) { - pool := testPool() - customPolicy := &customPolicy{} - h := customPolicy.Select(pool) - if h != pool[0] { - t.Error("Expected custom policy host to be the first host.") - } -} diff --git a/middleware/proxy/proxy.go b/middleware/proxy/proxy.go index 8780330ed..7e662c42e 100644 --- a/middleware/proxy/proxy.go +++ b/middleware/proxy/proxy.go @@ -3,11 +3,11 @@ package proxy import ( "errors" - "sync" "sync/atomic" "time" "github.com/coredns/coredns/middleware" + "github.com/coredns/coredns/middleware/pkg/healthcheck" "github.com/coredns/coredns/request" "github.com/miekg/dns" @@ -41,7 +41,7 @@ type Upstream interface { // The domain name this upstream host should be routed on. From() string // Selects an upstream host to be routed to. - Select() *UpstreamHost + Select() *healthcheck.UpstreamHost // Checks if subpdomain is not an ignored. IsAllowedDomain(string) bool // Exchanger returns the exchanger to be used for this upstream. @@ -50,45 +50,6 @@ type Upstream interface { Stop() error } -// UpstreamHostDownFunc can be used to customize how Down behaves. -type UpstreamHostDownFunc func(*UpstreamHost) bool - -// UpstreamHost represents a single proxy upstream -type UpstreamHost struct { - Conns int64 // must be first field to be 64-bit aligned on 32-bit systems - Name string // IP address (and port) of this upstream host - Fails int32 - FailTimeout time.Duration - OkUntil time.Time - CheckDown UpstreamHostDownFunc - CheckURL string - WithoutPathPrefix string - Checking bool - checkMu sync.Mutex -} - -// Down checks whether the upstream host is down or not. -// Down will try to use uh.CheckDown first, and will fall -// back to some default criteria if necessary. -func (uh *UpstreamHost) Down() bool { - if uh.CheckDown == nil { - // Default settings - fails := atomic.LoadInt32(&uh.Fails) - after := false - - uh.checkMu.Lock() - until := uh.OkUntil - uh.checkMu.Unlock() - - if !until.IsZero() && time.Now().After(until) { - after = true - } - - return after || fails > 0 - } - return uh.CheckDown(uh) -} - // tryDuration is how long to try upstream hosts; failures result in // immediate retries until this duration ends or we get a nil host. var tryDuration = 60 * time.Second @@ -145,7 +106,7 @@ func (p Proxy) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg) ( timeout = 10 * time.Second } atomic.AddInt32(&host.Fails, 1) - go func(host *UpstreamHost, timeout time.Duration) { + go func(host *healthcheck.UpstreamHost, timeout time.Duration) { time.Sleep(timeout) atomic.AddInt32(&host.Fails, -1) }(host, timeout) diff --git a/middleware/proxy/upstream.go b/middleware/proxy/upstream.go index 380b585be..93ef0e32d 100644 --- a/middleware/proxy/upstream.go +++ b/middleware/proxy/upstream.go @@ -2,46 +2,25 @@ package proxy import ( "fmt" - "io" - "io/ioutil" - "log" "net" - "net/http" - "net/url" "strconv" "strings" - "sync" "sync/atomic" "time" "github.com/coredns/coredns/middleware" "github.com/coredns/coredns/middleware/pkg/dnsutil" + "github.com/coredns/coredns/middleware/pkg/healthcheck" "github.com/coredns/coredns/middleware/pkg/tls" "github.com/mholt/caddy/caddyfile" "github.com/miekg/dns" ) -var ( - supportedPolicies = make(map[string]func() Policy) -) - type staticUpstream struct { from string - stop chan struct{} // Signals running goroutines to stop. - wg sync.WaitGroup // Used to wait for running goroutines to stop. - Hosts HostPool - Policy Policy - Spray Policy + healthcheck.HealthCheck - FailTimeout time.Duration - MaxFails int32 - Future time.Duration - HealthCheck struct { - Path string - Port string - Interval time.Duration - } WithoutPathPrefix string IgnoredSubDomains []string ex Exchanger @@ -53,15 +32,13 @@ func NewStaticUpstreams(c *caddyfile.Dispenser) ([]Upstream, error) { var upstreams []Upstream for c.Next() { upstream := &staticUpstream{ - from: ".", - stop: make(chan struct{}), - Hosts: nil, - Policy: &Random{}, - Spray: nil, - FailTimeout: 10 * time.Second, - MaxFails: 1, - Future: 60 * time.Second, - ex: newDNSEx(), + from: ".", + HealthCheck: healthcheck.HealthCheck{ + FailTimeout: 10 * time.Second, + MaxFails: 1, + Future: 60 * time.Second, + }, + ex: newDNSEx(), } if !c.Args(&upstream.from) { @@ -84,22 +61,22 @@ func NewStaticUpstreams(c *caddyfile.Dispenser) ([]Upstream, error) { } } - upstream.Hosts = make([]*UpstreamHost, len(toHosts)) + upstream.Hosts = make([]*healthcheck.UpstreamHost, len(toHosts)) for i, host := range toHosts { - uh := &UpstreamHost{ + uh := &healthcheck.UpstreamHost{ Name: host, Conns: 0, Fails: 0, FailTimeout: upstream.FailTimeout, - CheckDown: func(upstream *staticUpstream) UpstreamHostDownFunc { - return func(uh *UpstreamHost) bool { + CheckDown: func(upstream *staticUpstream) healthcheck.UpstreamHostDownFunc { + return func(uh *healthcheck.UpstreamHost) bool { down := false - uh.checkMu.Lock() + uh.CheckMu.Lock() until := uh.OkUntil - uh.checkMu.Unlock() + uh.CheckMu.Unlock() if !until.IsZero() && time.Now().After(until) { down = true @@ -117,32 +94,13 @@ func NewStaticUpstreams(c *caddyfile.Dispenser) ([]Upstream, error) { upstream.Hosts[i] = uh } + upstream.Start() - if upstream.HealthCheck.Path != "" { - upstream.wg.Add(1) - go func() { - defer upstream.wg.Done() - upstream.HealthCheckWorker(upstream.stop) - }() - } upstreams = append(upstreams, upstream) } return upstreams, nil } -// Stop sends a signal to all goroutines started by this staticUpstream to exit -// and waits for them to finish before returning. -func (u *staticUpstream) Stop() error { - close(u.stop) - u.wg.Wait() - return nil -} - -// RegisterPolicy adds a custom policy to the proxy. -func RegisterPolicy(name string, policy func() Policy) { - supportedPolicies[name] = policy -} - func (u *staticUpstream) From() string { return u.from } @@ -153,7 +111,7 @@ func parseBlock(c *caddyfile.Dispenser, u *staticUpstream) error { if !c.NextArg() { return c.ArgErr() } - policyCreateFunc, ok := supportedPolicies[c.Val()] + policyCreateFunc, ok := healthcheck.SupportedPolicies[c.Val()] if !ok { return c.ArgErr() } @@ -214,7 +172,7 @@ func parseBlock(c *caddyfile.Dispenser, u *staticUpstream) error { } u.IgnoredSubDomains = ignoredDomains case "spray": - u.Spray = &Spray{} + u.Spray = &healthcheck.Spray{} case "protocol": encArgs := c.RemainingArgs() if len(encArgs) == 0 { @@ -259,154 +217,6 @@ func parseBlock(c *caddyfile.Dispenser, u *staticUpstream) error { return nil } -// This was moved into a thread so that each host could throw a health -// check at the same time. The reason for this is that if we are checking -// 3 hosts, and the first one is gone, and we spend minutes timing out to -// fail it, we would not have been doing any other health checks in that -// time. So we now have a per-host lock and a threaded health check. -// -// We use the Checking bool to avoid concurrent checks against the same -// host; if one is taking a long time, the next one will find a check in -// progress and simply return before trying. -// -// We are carefully avoiding having the mutex locked while we check, -// otherwise checks will back up, potentially a lot of them if a host is -// absent for a long time. This arrangement makes checks quickly see if -// they are the only one running and abort otherwise. -func healthCheckURL(nextTs time.Time, host *UpstreamHost) { - - // lock for our bool check. We don't just defer the unlock because - // we don't want the lock held while http.Get runs - host.checkMu.Lock() - - // are we mid check? Don't run another one - if host.Checking { - host.checkMu.Unlock() - return - } - - host.Checking = true - host.checkMu.Unlock() - - //log.Printf("[DEBUG] Healthchecking %s, nextTs is %s\n", url, nextTs.Local()) - - // fetch that url. This has been moved into a go func because - // when the remote host is not merely not serving, but actually - // absent, then tcp syn timeouts can be very long, and so one - // fetch could last several check intervals - if r, err := http.Get(host.CheckURL); err == nil { - io.Copy(ioutil.Discard, r.Body) - r.Body.Close() - - if r.StatusCode < 200 || r.StatusCode >= 400 { - log.Printf("[WARNING] Host %s health check returned HTTP code %d\n", - host.Name, r.StatusCode) - nextTs = time.Unix(0, 0) - } - } else { - log.Printf("[WARNING] Host %s health check probe failed: %v\n", host.Name, err) - nextTs = time.Unix(0, 0) - } - - host.checkMu.Lock() - host.Checking = false - host.OkUntil = nextTs - host.checkMu.Unlock() -} - -func (u *staticUpstream) healthCheck() { - for _, host := range u.Hosts { - - if host.CheckURL == "" { - var hostName, checkPort string - - // The DNS server might be an HTTP server. If so, extract its name. - ret, err := url.Parse(host.Name) - if err == nil && len(ret.Host) > 0 { - hostName = ret.Host - } else { - hostName = host.Name - } - - // Extract the port number from the parsed server name. - checkHostName, checkPort, err := net.SplitHostPort(hostName) - if err != nil { - checkHostName = hostName - } - - if u.HealthCheck.Port != "" { - checkPort = u.HealthCheck.Port - } - - host.CheckURL = "http://" + net.JoinHostPort(checkHostName, checkPort) + u.HealthCheck.Path - } - - // calculate this before the get - nextTs := time.Now().Add(u.Future) - - // locks/bools should prevent requests backing up - go healthCheckURL(nextTs, host) - } -} - -func (u *staticUpstream) HealthCheckWorker(stop chan struct{}) { - ticker := time.NewTicker(u.HealthCheck.Interval) - u.healthCheck() - for { - select { - case <-ticker.C: - u.healthCheck() - case <-stop: - ticker.Stop() - return - } - } -} - -func (u *staticUpstream) Select() *UpstreamHost { - pool := u.Hosts - if len(pool) == 1 { - if pool[0].Down() && u.Spray == nil { - return nil - } - return pool[0] - } - allDown := true - for _, host := range pool { - if !host.Down() { - allDown = false - break - } - } - if allDown { - if u.Spray == nil { - return nil - } - return u.Spray.Select(pool) - } - - if u.Policy == nil { - h := (&Random{}).Select(pool) - if h != nil { - return h - } - if h == nil && u.Spray == nil { - return nil - } - return u.Spray.Select(pool) - } - - h := u.Policy.Select(pool) - if h != nil { - return h - } - - if u.Spray == nil { - return nil - } - return u.Spray.Select(pool) -} - func (u *staticUpstream) IsAllowedDomain(name string) bool { if dns.Name(name) == dns.Name(u.From()) { return true diff --git a/middleware/proxy/upstream_test.go b/middleware/proxy/upstream_test.go index 06c229f39..3aa4104e8 100644 --- a/middleware/proxy/upstream_test.go +++ b/middleware/proxy/upstream_test.go @@ -2,74 +2,16 @@ package proxy import ( "io/ioutil" - "log" "os" "path/filepath" "strings" "testing" - "time" "github.com/coredns/coredns/middleware/test" "github.com/mholt/caddy" ) -func TestHealthCheck(t *testing.T) { - log.SetOutput(ioutil.Discard) - - upstream := &staticUpstream{ - from: ".", - Hosts: testPool(), - Policy: &Random{}, - Spray: nil, - FailTimeout: 10 * time.Second, - Future: 60 * time.Second, - MaxFails: 1, - } - - upstream.healthCheck() - // sleep a bit, it's async now - time.Sleep(time.Duration(2 * time.Second)) - - if upstream.Hosts[0].Down() { - t.Error("Expected first host in testpool to not fail healthcheck.") - } - if !upstream.Hosts[1].Down() { - t.Error("Expected second host in testpool to fail healthcheck.") - } -} - -func TestSelect(t *testing.T) { - upstream := &staticUpstream{ - from: ".", - Hosts: testPool()[:3], - Policy: &Random{}, - FailTimeout: 10 * time.Second, - Future: 60 * time.Second, - MaxFails: 1, - } - upstream.Hosts[0].OkUntil = time.Unix(0, 0) - upstream.Hosts[1].OkUntil = time.Unix(0, 0) - upstream.Hosts[2].OkUntil = time.Unix(0, 0) - if h := upstream.Select(); h != nil { - t.Error("Expected select to return nil as all host are down") - } - upstream.Hosts[2].OkUntil = time.Time{} - if h := upstream.Select(); h == nil { - t.Error("Expected select to not return nil") - } -} - -func TestRegisterPolicy(t *testing.T) { - name := "custom" - customPolicy := &customPolicy{} - RegisterPolicy(name, func() Policy { return customPolicy }) - if _, ok := supportedPolicies[name]; !ok { - t.Error("Expected supportedPolicies to have a custom policy.") - } - -} - func TestAllowedDomain(t *testing.T) { upstream := &staticUpstream{ from: "miek.nl.", -- cgit v1.2.3