diff options
-rw-r--r-- | plugin/pkg/healthcheck/healthcheck.go | 241 | ||||
-rw-r--r-- | plugin/pkg/healthcheck/log_test.go | 5 | ||||
-rw-r--r-- | plugin/pkg/healthcheck/policy.go | 141 | ||||
-rw-r--r-- | plugin/pkg/healthcheck/policy_test.go | 155 |
4 files changed, 0 insertions, 542 deletions
diff --git a/plugin/pkg/healthcheck/healthcheck.go b/plugin/pkg/healthcheck/healthcheck.go deleted file mode 100644 index 5a2e229cf..000000000 --- a/plugin/pkg/healthcheck/healthcheck.go +++ /dev/null @@ -1,241 +0,0 @@ -package healthcheck - -import ( - "io" - "io/ioutil" - "net" - "net/http" - "net/url" - "sync" - "sync/atomic" - "time" - - "github.com/coredns/coredns/plugin/pkg/log" -) - -// UpstreamHostDownFunc can be used to customize how Down behaves. -type UpstreamHostDownFunc func(*UpstreamHost) bool - -// UpstreamHost represents a single proxy upstream -type UpstreamHost struct { - Conns int64 // must be first field to be 64-bit aligned on 32-bit systems - Name string // IP address (and port) of this upstream host - Fails int32 - FailTimeout time.Duration - CheckDown UpstreamHostDownFunc - CheckURL string - Checking bool - sync.Mutex -} - -// Down checks whether the upstream host is down or not. -// Down will try to use uh.CheckDown first, and will fall -// back to some default criteria if necessary. -func (uh *UpstreamHost) Down() bool { - if uh.CheckDown == nil { - fails := atomic.LoadInt32(&uh.Fails) - return fails > 0 - } - return uh.CheckDown(uh) -} - -// HostPool is a collection of UpstreamHosts. -type HostPool []*UpstreamHost - -// HealthCheck is used for performing healthcheck -// on a collection of upstream hosts and select -// one based on the policy. -type HealthCheck struct { - wg sync.WaitGroup // Used to wait for running goroutines to stop. - stop chan struct{} // Signals running goroutines to stop. - Hosts HostPool - Policy Policy - Spray Policy - FailTimeout time.Duration - MaxFails int32 - Path string - Port string - Interval time.Duration -} - -// Start starts the healthcheck -func (u *HealthCheck) Start() { - for i, h := range u.Hosts { - u.Hosts[i].CheckURL = u.normalizeCheckURL(h.Name) - } - - u.stop = make(chan struct{}) - if u.Path != "" { - u.wg.Add(1) - go func() { - defer u.wg.Done() - u.healthCheckWorker(u.stop) - }() - } -} - -// Stop sends a signal to all goroutines started by this staticUpstream to exit -// and waits for them to finish before returning. -func (u *HealthCheck) Stop() error { - close(u.stop) - u.wg.Wait() - return nil -} - -// This was moved into a thread so that each host could throw a health -// check at the same time. The reason for this is that if we are checking -// 3 hosts, and the first one is gone, and we spend minutes timing out to -// fail it, we would not have been doing any other health checks in that -// time. So we now have a per-host lock and a threaded health check. -// -// We use the Checking bool to avoid concurrent checks against the same -// host; if one is taking a long time, the next one will find a check in -// progress and simply return before trying. -// -// We are carefully avoiding having the mutex locked while we check, -// otherwise checks will back up, potentially a lot of them if a host is -// absent for a long time. This arrangement makes checks quickly see if -// they are the only one running and abort otherwise. - -// HealthCheckURL performs the http.Get that implements healthcheck. -func (uh *UpstreamHost) HealthCheckURL() { - // Lock for our bool check. We don't just defer the unlock because - // we don't want the lock held while http.Get runs. - uh.Lock() - - // We call HealthCheckURL from proxy.go and lookup.go, bail out when nothing - // is configured to healthcheck. Or we mid check? Don't run another one. - if uh.CheckURL == "" || uh.Checking { // nothing configured - uh.Unlock() - return - } - - uh.Checking = true - uh.Unlock() - - // default timeout (5s) - r, err := healthClient.Get(uh.CheckURL) - - defer func() { - uh.Lock() - uh.Checking = false - uh.Unlock() - }() - - if err != nil { - log.Warningf("Host %s health check probe failed: %v", uh.Name, err) - atomic.AddInt32(&uh.Fails, 1) - return - } - - if err == nil { - io.Copy(ioutil.Discard, r.Body) - r.Body.Close() - - if r.StatusCode < 200 || r.StatusCode >= 400 { - log.Warningf("Host %s health check returned HTTP code %d", uh.Name, r.StatusCode) - atomic.AddInt32(&uh.Fails, 1) - return - } - - // We are healthy again, reset fails. - atomic.StoreInt32(&uh.Fails, 0) - return - } -} - -func (u *HealthCheck) healthCheck() { - for _, host := range u.Hosts { - // locks/bools should prevent requests backing up - go host.HealthCheckURL() - } -} - -func (u *HealthCheck) healthCheckWorker(stop chan struct{}) { - ticker := time.NewTicker(u.Interval) - u.healthCheck() - for { - select { - case <-ticker.C: - u.healthCheck() - case <-stop: - ticker.Stop() - return - } - } -} - -// Select selects an upstream host based on the policy -// and the healthcheck result. -func (u *HealthCheck) Select() *UpstreamHost { - pool := u.Hosts - if len(pool) == 1 { - if pool[0].Down() && u.Spray == nil { - return nil - } - return pool[0] - } - allDown := true - for _, host := range pool { - if !host.Down() { - allDown = false - break - } - } - if allDown { - if u.Spray == nil { - return nil - } - return u.Spray.Select(pool) - } - - if u.Policy == nil { - h := (&Random{}).Select(pool) - if h != nil { - return h - } - if h == nil && u.Spray == nil { - return nil - } - return u.Spray.Select(pool) - } - - h := u.Policy.Select(pool) - if h != nil { - return h - } - - if u.Spray == nil { - return nil - } - return u.Spray.Select(pool) -} - -// normalizeCheckURL creates a proper URL for the health check. -func (u *HealthCheck) normalizeCheckURL(name string) string { - if u.Path == "" { - return "" - } - - // The DNS server might be an HTTP server. If so, extract its name. - hostName := name - ret, err := url.Parse(name) - if err == nil && len(ret.Host) > 0 { - hostName = ret.Host - } - - // Extract the port number from the parsed server name. - checkHostName, checkPort, err := net.SplitHostPort(hostName) - if err != nil { - checkHostName = hostName - } - - if u.Port != "" { - checkPort = u.Port - } - - checkURL := "http://" + net.JoinHostPort(checkHostName, checkPort) + u.Path - return checkURL -} - -var healthClient = func() *http.Client { return &http.Client{Timeout: 5 * time.Second} }() diff --git a/plugin/pkg/healthcheck/log_test.go b/plugin/pkg/healthcheck/log_test.go deleted file mode 100644 index 95808771c..000000000 --- a/plugin/pkg/healthcheck/log_test.go +++ /dev/null @@ -1,5 +0,0 @@ -package healthcheck - -import clog "github.com/coredns/coredns/plugin/pkg/log" - -func init() { clog.Discard() } diff --git a/plugin/pkg/healthcheck/policy.go b/plugin/pkg/healthcheck/policy.go deleted file mode 100644 index beb95e7d5..000000000 --- a/plugin/pkg/healthcheck/policy.go +++ /dev/null @@ -1,141 +0,0 @@ -package healthcheck - -import ( - "math/rand" - "sync/atomic" - - "github.com/coredns/coredns/plugin/pkg/log" -) - -var ( - // SupportedPolicies is the collection of policies registered - SupportedPolicies = make(map[string]func() Policy) -) - -// RegisterPolicy adds a custom policy to the proxy. -func RegisterPolicy(name string, policy func() Policy) { - SupportedPolicies[name] = policy -} - -// Policy decides how a host will be selected from a pool. When all hosts are unhealthy, it is assumed the -// healthchecking failed. In this case each policy will *randomly* return a host from the pool to prevent -// no traffic to go through at all. -type Policy interface { - Select(pool HostPool) *UpstreamHost -} - -func init() { - RegisterPolicy("random", func() Policy { return &Random{} }) - RegisterPolicy("least_conn", func() Policy { return &LeastConn{} }) - RegisterPolicy("round_robin", func() Policy { return &RoundRobin{} }) - RegisterPolicy("first", func() Policy { return &First{} }) - // 'sequential' is an alias to 'first' to maintain consistency with the forward plugin - // should probably remove 'first' in a future release - RegisterPolicy("sequential", func() Policy { return &First{} }) -} - -// Random is a policy that selects up hosts from a pool at random. -type Random struct{} - -// Select selects an up host at random from the specified pool. -func (r *Random) Select(pool HostPool) *UpstreamHost { - // instead of just generating a random index - // this is done to prevent selecting a down host - var randHost *UpstreamHost - count := 0 - for _, host := range pool { - if host.Down() { - continue - } - count++ - if count == 1 { - randHost = host - } else { - r := rand.Int() % count - if r == (count - 1) { - randHost = host - } - } - } - return randHost -} - -// Spray is a policy that selects a host from a pool at random. This should be used as a last ditch -// attempt to get a host when all hosts are reporting unhealthy. -type Spray struct{} - -// Select selects an up host at random from the specified pool. -func (r *Spray) Select(pool HostPool) *UpstreamHost { - rnd := rand.Int() % len(pool) - randHost := pool[rnd] - log.Warningf("All hosts reported as down, spraying to target: %s", randHost.Name) - return randHost -} - -// LeastConn is a policy that selects the host with the least connections. -type LeastConn struct{} - -// Select selects the up host with the least number of connections in the -// pool. If more than one host has the same least number of connections, -// one of the hosts is chosen at random. -func (r *LeastConn) Select(pool HostPool) *UpstreamHost { - var bestHost *UpstreamHost - count := 0 - leastConn := int64(1<<63 - 1) - for _, host := range pool { - if host.Down() { - continue - } - hostConns := host.Conns - if hostConns < leastConn { - bestHost = host - leastConn = hostConns - count = 1 - } else if hostConns == leastConn { - // randomly select host among hosts with least connections - count++ - if count == 1 { - bestHost = host - } else { - r := rand.Int() % count - if r == (count - 1) { - bestHost = host - } - } - } - } - return bestHost -} - -// RoundRobin is a policy that selects hosts based on round robin ordering. -type RoundRobin struct { - Robin uint32 -} - -// Select selects an up host from the pool using a round robin ordering scheme. -func (r *RoundRobin) Select(pool HostPool) *UpstreamHost { - poolLen := uint32(len(pool)) - selection := atomic.AddUint32(&r.Robin, 1) % poolLen - host := pool[selection] - // if the currently selected host is down, just ffwd to up host - for i := uint32(1); host.Down() && i < poolLen; i++ { - host = pool[(selection+i)%poolLen] - } - return host -} - -// First is a policy that selects always the first healthy host in the list order. -type First struct{} - -// Select always the first that is not Down. -func (r *First) Select(pool HostPool) *UpstreamHost { - for i := 0; i < len(pool); i++ { - host := pool[i] - if host.Down() { - continue - } - return host - } - // return the first one, anyway none is correct - return nil -} diff --git a/plugin/pkg/healthcheck/policy_test.go b/plugin/pkg/healthcheck/policy_test.go deleted file mode 100644 index a9b2dc51b..000000000 --- a/plugin/pkg/healthcheck/policy_test.go +++ /dev/null @@ -1,155 +0,0 @@ -package healthcheck - -import ( - "net/http" - "net/http/httptest" - "os" - "testing" - "time" -) - -var workableServer *httptest.Server - -func TestMain(m *testing.M) { - workableServer = httptest.NewServer(http.HandlerFunc( - func(w http.ResponseWriter, r *http.Request) { - // do nothing - })) - r := m.Run() - workableServer.Close() - os.Exit(r) -} - -type customPolicy struct{} - -func (r *customPolicy) Select(pool HostPool) *UpstreamHost { - return pool[0] -} - -func testPool() HostPool { - pool := []*UpstreamHost{ - {Name: workableServer.URL}, // this should resolve (healthcheck test) - {Name: "http://shouldnot.resolve:85"}, // this shouldn't, especially on port other than 80 - {Name: "http://C"}, - } - return HostPool(pool) -} - -func TestRegisterPolicy(t *testing.T) { - name := "custom" - customPolicy := &customPolicy{} - RegisterPolicy(name, func() Policy { return customPolicy }) - if _, ok := SupportedPolicies[name]; !ok { - t.Error("Expected supportedPolicies to have a custom policy.") - } - -} - -func TestHealthCheck(t *testing.T) { - u := &HealthCheck{ - Hosts: testPool(), - Path: "/", - FailTimeout: 10 * time.Second, - MaxFails: 1, - } - - for i, h := range u.Hosts { - u.Hosts[i].CheckURL = u.normalizeCheckURL(h.Name) - } - - u.healthCheck() - time.Sleep(time.Duration(1 * time.Second)) // sleep a bit, it's async now - - if u.Hosts[0].Down() { - t.Error("Expected first host in testpool to not fail healthcheck.") - } - if !u.Hosts[1].Down() { - t.Error("Expected second host in testpool to fail healthcheck.") - } -} - -func TestHealthCheckDisabled(t *testing.T) { - u := &HealthCheck{ - Hosts: testPool(), - FailTimeout: 10 * time.Second, - MaxFails: 1, - } - - for i, h := range u.Hosts { - u.Hosts[i].CheckURL = u.normalizeCheckURL(h.Name) - } - - u.healthCheck() - time.Sleep(time.Duration(1 * time.Second)) // sleep a bit, it's async now - - for i, h := range u.Hosts { - if h.Down() { - t.Errorf("Expected host %d in testpool to not be down with healthchecks disabled.", i+1) - } - } -} - -func TestRoundRobinPolicy(t *testing.T) { - pool := testPool() - rrPolicy := &RoundRobin{} - h := rrPolicy.Select(pool) - // First selected host is 1, because counter starts at 0 - // and increments before host is selected - if h != pool[1] { - t.Error("Expected first round robin host to be second host in the pool.") - } - h = rrPolicy.Select(pool) - if h != pool[2] { - t.Error("Expected second round robin host to be third host in the pool.") - } - h = rrPolicy.Select(pool) - if h != pool[0] { - t.Error("Expected third round robin host to be first host in the pool.") - } -} - -func TestLeastConnPolicy(t *testing.T) { - pool := testPool() - lcPolicy := &LeastConn{} - pool[0].Conns = 10 - pool[1].Conns = 10 - h := lcPolicy.Select(pool) - if h != pool[2] { - t.Error("Expected least connection host to be third host.") - } - pool[2].Conns = 100 - h = lcPolicy.Select(pool) - if h != pool[0] && h != pool[1] { - t.Error("Expected least connection host to be first or second host.") - } -} - -func TestCustomPolicy(t *testing.T) { - pool := testPool() - customPolicy := &customPolicy{} - h := customPolicy.Select(pool) - if h != pool[0] { - t.Error("Expected custom policy host to be the first host.") - } -} - -func TestFirstPolicy(t *testing.T) { - pool := testPool() - rrPolicy := &First{} - h := rrPolicy.Select(pool) - // First selected host is 1, because counter starts at 0 - // and increments before host is selected - if h != pool[0] { - t.Error("Expected always first to be first host in the pool.") - } - h = rrPolicy.Select(pool) - if h != pool[0] { - t.Error("Expected always first to be first host in the pool, even in second call") - } - // set this first in pool as failed - pool[0].Fails = 1 - h = rrPolicy.Select(pool) - if h != pool[1] { - t.Error("Expected first to be he second in pool if the first one is down.") - } -} |