diff options
Diffstat (limited to 'plugin/pkg/healthcheck')
-rw-r--r-- | plugin/pkg/healthcheck/healthcheck.go | 243 | ||||
-rw-r--r-- | plugin/pkg/healthcheck/policy.go | 120 | ||||
-rw-r--r-- | plugin/pkg/healthcheck/policy_test.go | 143 |
3 files changed, 506 insertions, 0 deletions
diff --git a/plugin/pkg/healthcheck/healthcheck.go b/plugin/pkg/healthcheck/healthcheck.go new file mode 100644 index 000000000..18f09087c --- /dev/null +++ b/plugin/pkg/healthcheck/healthcheck.go @@ -0,0 +1,243 @@ +package healthcheck + +import ( + "io" + "io/ioutil" + "log" + "net" + "net/http" + "net/url" + "sync" + "sync/atomic" + "time" +) + +// UpstreamHostDownFunc can be used to customize how Down behaves. +type UpstreamHostDownFunc func(*UpstreamHost) bool + +// UpstreamHost represents a single proxy upstream +type UpstreamHost struct { + Conns int64 // must be first field to be 64-bit aligned on 32-bit systems + Name string // IP address (and port) of this upstream host + Network string // Network (tcp, unix, etc) of the host, default "" is "tcp" + Fails int32 + FailTimeout time.Duration + OkUntil time.Time + CheckDown UpstreamHostDownFunc + CheckURL string + WithoutPathPrefix string + Checking bool + CheckMu sync.Mutex +} + +// Down checks whether the upstream host is down or not. +// Down will try to use uh.CheckDown first, and will fall +// back to some default criteria if necessary. +func (uh *UpstreamHost) Down() bool { + if uh.CheckDown == nil { + // Default settings + fails := atomic.LoadInt32(&uh.Fails) + after := false + + uh.CheckMu.Lock() + until := uh.OkUntil + uh.CheckMu.Unlock() + + if !until.IsZero() && time.Now().After(until) { + after = true + } + + return after || fails > 0 + } + return uh.CheckDown(uh) +} + +// HostPool is a collection of UpstreamHosts. +type HostPool []*UpstreamHost + +// HealthCheck is used for performing healthcheck +// on a collection of upstream hosts and select +// one based on the policy. +type HealthCheck struct { + wg sync.WaitGroup // Used to wait for running goroutines to stop. + stop chan struct{} // Signals running goroutines to stop. + Hosts HostPool + Policy Policy + Spray Policy + FailTimeout time.Duration + MaxFails int32 + Future time.Duration + Path string + Port string + Interval time.Duration +} + +// Start starts the healthcheck +func (u *HealthCheck) Start() { + u.stop = make(chan struct{}) + if u.Path != "" { + u.wg.Add(1) + go func() { + defer u.wg.Done() + u.healthCheckWorker(u.stop) + }() + } +} + +// Stop sends a signal to all goroutines started by this staticUpstream to exit +// and waits for them to finish before returning. +func (u *HealthCheck) Stop() error { + close(u.stop) + u.wg.Wait() + return nil +} + +// This was moved into a thread so that each host could throw a health +// check at the same time. The reason for this is that if we are checking +// 3 hosts, and the first one is gone, and we spend minutes timing out to +// fail it, we would not have been doing any other health checks in that +// time. So we now have a per-host lock and a threaded health check. +// +// We use the Checking bool to avoid concurrent checks against the same +// host; if one is taking a long time, the next one will find a check in +// progress and simply return before trying. +// +// We are carefully avoiding having the mutex locked while we check, +// otherwise checks will back up, potentially a lot of them if a host is +// absent for a long time. This arrangement makes checks quickly see if +// they are the only one running and abort otherwise. +func healthCheckURL(nextTs time.Time, host *UpstreamHost) { + + // lock for our bool check. We don't just defer the unlock because + // we don't want the lock held while http.Get runs + host.CheckMu.Lock() + + // are we mid check? Don't run another one + if host.Checking { + host.CheckMu.Unlock() + return + } + + host.Checking = true + host.CheckMu.Unlock() + + //log.Printf("[DEBUG] Healthchecking %s, nextTs is %s\n", url, nextTs.Local()) + + // fetch that url. This has been moved into a go func because + // when the remote host is not merely not serving, but actually + // absent, then tcp syn timeouts can be very long, and so one + // fetch could last several check intervals + if r, err := http.Get(host.CheckURL); err == nil { + io.Copy(ioutil.Discard, r.Body) + r.Body.Close() + + if r.StatusCode < 200 || r.StatusCode >= 400 { + log.Printf("[WARNING] Host %s health check returned HTTP code %d\n", + host.Name, r.StatusCode) + nextTs = time.Unix(0, 0) + } + } else { + log.Printf("[WARNING] Host %s health check probe failed: %v\n", host.Name, err) + nextTs = time.Unix(0, 0) + } + + host.CheckMu.Lock() + host.Checking = false + host.OkUntil = nextTs + host.CheckMu.Unlock() +} + +func (u *HealthCheck) healthCheck() { + for _, host := range u.Hosts { + + if host.CheckURL == "" { + var hostName, checkPort string + + // The DNS server might be an HTTP server. If so, extract its name. + ret, err := url.Parse(host.Name) + if err == nil && len(ret.Host) > 0 { + hostName = ret.Host + } else { + hostName = host.Name + } + + // Extract the port number from the parsed server name. + checkHostName, checkPort, err := net.SplitHostPort(hostName) + if err != nil { + checkHostName = hostName + } + + if u.Port != "" { + checkPort = u.Port + } + + host.CheckURL = "http://" + net.JoinHostPort(checkHostName, checkPort) + u.Path + } + + // calculate this before the get + nextTs := time.Now().Add(u.Future) + + // locks/bools should prevent requests backing up + go healthCheckURL(nextTs, host) + } +} + +func (u *HealthCheck) healthCheckWorker(stop chan struct{}) { + ticker := time.NewTicker(u.Interval) + u.healthCheck() + for { + select { + case <-ticker.C: + u.healthCheck() + case <-stop: + ticker.Stop() + return + } + } +} + +// Select selects an upstream host based on the policy +// and the healthcheck result. +func (u *HealthCheck) Select() *UpstreamHost { + pool := u.Hosts + if len(pool) == 1 { + if pool[0].Down() && u.Spray == nil { + return nil + } + return pool[0] + } + allDown := true + for _, host := range pool { + if !host.Down() { + allDown = false + break + } + } + if allDown { + if u.Spray == nil { + return nil + } + return u.Spray.Select(pool) + } + + if u.Policy == nil { + h := (&Random{}).Select(pool) + if h != nil { + return h + } + if h == nil && u.Spray == nil { + return nil + } + return u.Spray.Select(pool) + } + + h := u.Policy.Select(pool) + if h != nil { + return h + } + + if u.Spray == nil { + return nil + } + return u.Spray.Select(pool) +} diff --git a/plugin/pkg/healthcheck/policy.go b/plugin/pkg/healthcheck/policy.go new file mode 100644 index 000000000..6a828fc4d --- /dev/null +++ b/plugin/pkg/healthcheck/policy.go @@ -0,0 +1,120 @@ +package healthcheck + +import ( + "log" + "math/rand" + "sync/atomic" +) + +var ( + // SupportedPolicies is the collection of policies registered + SupportedPolicies = make(map[string]func() Policy) +) + +// RegisterPolicy adds a custom policy to the proxy. +func RegisterPolicy(name string, policy func() Policy) { + SupportedPolicies[name] = policy +} + +// Policy decides how a host will be selected from a pool. When all hosts are unhealthy, it is assumed the +// healthchecking failed. In this case each policy will *randomly* return a host from the pool to prevent +// no traffic to go through at all. +type Policy interface { + Select(pool HostPool) *UpstreamHost +} + +func init() { + RegisterPolicy("random", func() Policy { return &Random{} }) + RegisterPolicy("least_conn", func() Policy { return &LeastConn{} }) + RegisterPolicy("round_robin", func() Policy { return &RoundRobin{} }) +} + +// Random is a policy that selects up hosts from a pool at random. +type Random struct{} + +// Select selects an up host at random from the specified pool. +func (r *Random) Select(pool HostPool) *UpstreamHost { + // instead of just generating a random index + // this is done to prevent selecting a down host + var randHost *UpstreamHost + count := 0 + for _, host := range pool { + if host.Down() { + continue + } + count++ + if count == 1 { + randHost = host + } else { + r := rand.Int() % count + if r == (count - 1) { + randHost = host + } + } + } + return randHost +} + +// Spray is a policy that selects a host from a pool at random. This should be used as a last ditch +// attempt to get a host when all hosts are reporting unhealthy. +type Spray struct{} + +// Select selects an up host at random from the specified pool. +func (r *Spray) Select(pool HostPool) *UpstreamHost { + rnd := rand.Int() % len(pool) + randHost := pool[rnd] + log.Printf("[WARNING] All hosts reported as down, spraying to target: %s", randHost.Name) + return randHost +} + +// LeastConn is a policy that selects the host with the least connections. +type LeastConn struct{} + +// Select selects the up host with the least number of connections in the +// pool. If more than one host has the same least number of connections, +// one of the hosts is chosen at random. +func (r *LeastConn) Select(pool HostPool) *UpstreamHost { + var bestHost *UpstreamHost + count := 0 + leastConn := int64(1<<63 - 1) + for _, host := range pool { + if host.Down() { + continue + } + hostConns := host.Conns + if hostConns < leastConn { + bestHost = host + leastConn = hostConns + count = 1 + } else if hostConns == leastConn { + // randomly select host among hosts with least connections + count++ + if count == 1 { + bestHost = host + } else { + r := rand.Int() % count + if r == (count - 1) { + bestHost = host + } + } + } + } + return bestHost +} + +// RoundRobin is a policy that selects hosts based on round robin ordering. +type RoundRobin struct { + Robin uint32 +} + +// Select selects an up host from the pool using a round robin ordering scheme. +func (r *RoundRobin) Select(pool HostPool) *UpstreamHost { + poolLen := uint32(len(pool)) + selection := atomic.AddUint32(&r.Robin, 1) % poolLen + host := pool[selection] + // if the currently selected host is down, just ffwd to up host + for i := uint32(1); host.Down() && i < poolLen; i++ { + host = pool[(selection+i)%poolLen] + } + return host +} diff --git a/plugin/pkg/healthcheck/policy_test.go b/plugin/pkg/healthcheck/policy_test.go new file mode 100644 index 000000000..4c667952c --- /dev/null +++ b/plugin/pkg/healthcheck/policy_test.go @@ -0,0 +1,143 @@ +package healthcheck + +import ( + "io/ioutil" + "log" + "net/http" + "net/http/httptest" + "os" + "testing" + "time" +) + +var workableServer *httptest.Server + +func TestMain(m *testing.M) { + workableServer = httptest.NewServer(http.HandlerFunc( + func(w http.ResponseWriter, r *http.Request) { + // do nothing + })) + r := m.Run() + workableServer.Close() + os.Exit(r) +} + +type customPolicy struct{} + +func (r *customPolicy) Select(pool HostPool) *UpstreamHost { + return pool[0] +} + +func testPool() HostPool { + pool := []*UpstreamHost{ + { + Name: workableServer.URL, // this should resolve (healthcheck test) + }, + { + Name: "http://shouldnot.resolve", // this shouldn't + }, + { + Name: "http://C", + }, + } + return HostPool(pool) +} + +func TestRegisterPolicy(t *testing.T) { + name := "custom" + customPolicy := &customPolicy{} + RegisterPolicy(name, func() Policy { return customPolicy }) + if _, ok := SupportedPolicies[name]; !ok { + t.Error("Expected supportedPolicies to have a custom policy.") + } + +} + +// TODO(miek): Disabled for now, we should get out of the habit of using +// realtime in these tests . +func testHealthCheck(t *testing.T) { + log.SetOutput(ioutil.Discard) + + u := &HealthCheck{ + Hosts: testPool(), + FailTimeout: 10 * time.Second, + Future: 60 * time.Second, + MaxFails: 1, + } + + u.healthCheck() + // sleep a bit, it's async now + time.Sleep(time.Duration(2 * time.Second)) + + if u.Hosts[0].Down() { + t.Error("Expected first host in testpool to not fail healthcheck.") + } + if !u.Hosts[1].Down() { + t.Error("Expected second host in testpool to fail healthcheck.") + } +} + +func TestSelect(t *testing.T) { + u := &HealthCheck{ + Hosts: testPool()[:3], + FailTimeout: 10 * time.Second, + Future: 60 * time.Second, + MaxFails: 1, + } + u.Hosts[0].OkUntil = time.Unix(0, 0) + u.Hosts[1].OkUntil = time.Unix(0, 0) + u.Hosts[2].OkUntil = time.Unix(0, 0) + if h := u.Select(); h != nil { + t.Error("Expected select to return nil as all host are down") + } + u.Hosts[2].OkUntil = time.Time{} + if h := u.Select(); h == nil { + t.Error("Expected select to not return nil") + } +} + +func TestRoundRobinPolicy(t *testing.T) { + pool := testPool() + rrPolicy := &RoundRobin{} + h := rrPolicy.Select(pool) + // First selected host is 1, because counter starts at 0 + // and increments before host is selected + if h != pool[1] { + t.Error("Expected first round robin host to be second host in the pool.") + } + h = rrPolicy.Select(pool) + if h != pool[2] { + t.Error("Expected second round robin host to be third host in the pool.") + } + // mark host as down + pool[0].OkUntil = time.Unix(0, 0) + h = rrPolicy.Select(pool) + if h != pool[1] { + t.Error("Expected third round robin host to be first host in the pool.") + } +} + +func TestLeastConnPolicy(t *testing.T) { + pool := testPool() + lcPolicy := &LeastConn{} + pool[0].Conns = 10 + pool[1].Conns = 10 + h := lcPolicy.Select(pool) + if h != pool[2] { + t.Error("Expected least connection host to be third host.") + } + pool[2].Conns = 100 + h = lcPolicy.Select(pool) + if h != pool[0] && h != pool[1] { + t.Error("Expected least connection host to be first or second host.") + } +} + +func TestCustomPolicy(t *testing.T) { + pool := testPool() + customPolicy := &customPolicy{} + h := customPolicy.Select(pool) + if h != pool[0] { + t.Error("Expected custom policy host to be the first host.") + } +} |