aboutsummaryrefslogtreecommitdiff
path: root/plugin/pkg/healthcheck
diff options
context:
space:
mode:
Diffstat (limited to 'plugin/pkg/healthcheck')
-rw-r--r--plugin/pkg/healthcheck/healthcheck.go243
-rw-r--r--plugin/pkg/healthcheck/policy.go120
-rw-r--r--plugin/pkg/healthcheck/policy_test.go143
3 files changed, 506 insertions, 0 deletions
diff --git a/plugin/pkg/healthcheck/healthcheck.go b/plugin/pkg/healthcheck/healthcheck.go
new file mode 100644
index 000000000..18f09087c
--- /dev/null
+++ b/plugin/pkg/healthcheck/healthcheck.go
@@ -0,0 +1,243 @@
+package healthcheck
+
+import (
+ "io"
+ "io/ioutil"
+ "log"
+ "net"
+ "net/http"
+ "net/url"
+ "sync"
+ "sync/atomic"
+ "time"
+)
+
+// UpstreamHostDownFunc can be used to customize how Down behaves.
+type UpstreamHostDownFunc func(*UpstreamHost) bool
+
+// UpstreamHost represents a single proxy upstream
+type UpstreamHost struct {
+ Conns int64 // must be first field to be 64-bit aligned on 32-bit systems
+ Name string // IP address (and port) of this upstream host
+ Network string // Network (tcp, unix, etc) of the host, default "" is "tcp"
+ Fails int32
+ FailTimeout time.Duration
+ OkUntil time.Time
+ CheckDown UpstreamHostDownFunc
+ CheckURL string
+ WithoutPathPrefix string
+ Checking bool
+ CheckMu sync.Mutex
+}
+
+// Down checks whether the upstream host is down or not.
+// Down will try to use uh.CheckDown first, and will fall
+// back to some default criteria if necessary.
+func (uh *UpstreamHost) Down() bool {
+ if uh.CheckDown == nil {
+ // Default settings
+ fails := atomic.LoadInt32(&uh.Fails)
+ after := false
+
+ uh.CheckMu.Lock()
+ until := uh.OkUntil
+ uh.CheckMu.Unlock()
+
+ if !until.IsZero() && time.Now().After(until) {
+ after = true
+ }
+
+ return after || fails > 0
+ }
+ return uh.CheckDown(uh)
+}
+
+// HostPool is a collection of UpstreamHosts.
+type HostPool []*UpstreamHost
+
+// HealthCheck is used for performing healthcheck
+// on a collection of upstream hosts and select
+// one based on the policy.
+type HealthCheck struct {
+ wg sync.WaitGroup // Used to wait for running goroutines to stop.
+ stop chan struct{} // Signals running goroutines to stop.
+ Hosts HostPool
+ Policy Policy
+ Spray Policy
+ FailTimeout time.Duration
+ MaxFails int32
+ Future time.Duration
+ Path string
+ Port string
+ Interval time.Duration
+}
+
+// Start starts the healthcheck
+func (u *HealthCheck) Start() {
+ u.stop = make(chan struct{})
+ if u.Path != "" {
+ u.wg.Add(1)
+ go func() {
+ defer u.wg.Done()
+ u.healthCheckWorker(u.stop)
+ }()
+ }
+}
+
+// Stop sends a signal to all goroutines started by this staticUpstream to exit
+// and waits for them to finish before returning.
+func (u *HealthCheck) Stop() error {
+ close(u.stop)
+ u.wg.Wait()
+ return nil
+}
+
+// This was moved into a thread so that each host could throw a health
+// check at the same time. The reason for this is that if we are checking
+// 3 hosts, and the first one is gone, and we spend minutes timing out to
+// fail it, we would not have been doing any other health checks in that
+// time. So we now have a per-host lock and a threaded health check.
+//
+// We use the Checking bool to avoid concurrent checks against the same
+// host; if one is taking a long time, the next one will find a check in
+// progress and simply return before trying.
+//
+// We are carefully avoiding having the mutex locked while we check,
+// otherwise checks will back up, potentially a lot of them if a host is
+// absent for a long time. This arrangement makes checks quickly see if
+// they are the only one running and abort otherwise.
+func healthCheckURL(nextTs time.Time, host *UpstreamHost) {
+
+ // lock for our bool check. We don't just defer the unlock because
+ // we don't want the lock held while http.Get runs
+ host.CheckMu.Lock()
+
+ // are we mid check? Don't run another one
+ if host.Checking {
+ host.CheckMu.Unlock()
+ return
+ }
+
+ host.Checking = true
+ host.CheckMu.Unlock()
+
+ //log.Printf("[DEBUG] Healthchecking %s, nextTs is %s\n", url, nextTs.Local())
+
+ // fetch that url. This has been moved into a go func because
+ // when the remote host is not merely not serving, but actually
+ // absent, then tcp syn timeouts can be very long, and so one
+ // fetch could last several check intervals
+ if r, err := http.Get(host.CheckURL); err == nil {
+ io.Copy(ioutil.Discard, r.Body)
+ r.Body.Close()
+
+ if r.StatusCode < 200 || r.StatusCode >= 400 {
+ log.Printf("[WARNING] Host %s health check returned HTTP code %d\n",
+ host.Name, r.StatusCode)
+ nextTs = time.Unix(0, 0)
+ }
+ } else {
+ log.Printf("[WARNING] Host %s health check probe failed: %v\n", host.Name, err)
+ nextTs = time.Unix(0, 0)
+ }
+
+ host.CheckMu.Lock()
+ host.Checking = false
+ host.OkUntil = nextTs
+ host.CheckMu.Unlock()
+}
+
+func (u *HealthCheck) healthCheck() {
+ for _, host := range u.Hosts {
+
+ if host.CheckURL == "" {
+ var hostName, checkPort string
+
+ // The DNS server might be an HTTP server. If so, extract its name.
+ ret, err := url.Parse(host.Name)
+ if err == nil && len(ret.Host) > 0 {
+ hostName = ret.Host
+ } else {
+ hostName = host.Name
+ }
+
+ // Extract the port number from the parsed server name.
+ checkHostName, checkPort, err := net.SplitHostPort(hostName)
+ if err != nil {
+ checkHostName = hostName
+ }
+
+ if u.Port != "" {
+ checkPort = u.Port
+ }
+
+ host.CheckURL = "http://" + net.JoinHostPort(checkHostName, checkPort) + u.Path
+ }
+
+ // calculate this before the get
+ nextTs := time.Now().Add(u.Future)
+
+ // locks/bools should prevent requests backing up
+ go healthCheckURL(nextTs, host)
+ }
+}
+
+func (u *HealthCheck) healthCheckWorker(stop chan struct{}) {
+ ticker := time.NewTicker(u.Interval)
+ u.healthCheck()
+ for {
+ select {
+ case <-ticker.C:
+ u.healthCheck()
+ case <-stop:
+ ticker.Stop()
+ return
+ }
+ }
+}
+
+// Select selects an upstream host based on the policy
+// and the healthcheck result.
+func (u *HealthCheck) Select() *UpstreamHost {
+ pool := u.Hosts
+ if len(pool) == 1 {
+ if pool[0].Down() && u.Spray == nil {
+ return nil
+ }
+ return pool[0]
+ }
+ allDown := true
+ for _, host := range pool {
+ if !host.Down() {
+ allDown = false
+ break
+ }
+ }
+ if allDown {
+ if u.Spray == nil {
+ return nil
+ }
+ return u.Spray.Select(pool)
+ }
+
+ if u.Policy == nil {
+ h := (&Random{}).Select(pool)
+ if h != nil {
+ return h
+ }
+ if h == nil && u.Spray == nil {
+ return nil
+ }
+ return u.Spray.Select(pool)
+ }
+
+ h := u.Policy.Select(pool)
+ if h != nil {
+ return h
+ }
+
+ if u.Spray == nil {
+ return nil
+ }
+ return u.Spray.Select(pool)
+}
diff --git a/plugin/pkg/healthcheck/policy.go b/plugin/pkg/healthcheck/policy.go
new file mode 100644
index 000000000..6a828fc4d
--- /dev/null
+++ b/plugin/pkg/healthcheck/policy.go
@@ -0,0 +1,120 @@
+package healthcheck
+
+import (
+ "log"
+ "math/rand"
+ "sync/atomic"
+)
+
+var (
+ // SupportedPolicies is the collection of policies registered
+ SupportedPolicies = make(map[string]func() Policy)
+)
+
+// RegisterPolicy adds a custom policy to the proxy.
+func RegisterPolicy(name string, policy func() Policy) {
+ SupportedPolicies[name] = policy
+}
+
+// Policy decides how a host will be selected from a pool. When all hosts are unhealthy, it is assumed the
+// healthchecking failed. In this case each policy will *randomly* return a host from the pool to prevent
+// no traffic to go through at all.
+type Policy interface {
+ Select(pool HostPool) *UpstreamHost
+}
+
+func init() {
+ RegisterPolicy("random", func() Policy { return &Random{} })
+ RegisterPolicy("least_conn", func() Policy { return &LeastConn{} })
+ RegisterPolicy("round_robin", func() Policy { return &RoundRobin{} })
+}
+
+// Random is a policy that selects up hosts from a pool at random.
+type Random struct{}
+
+// Select selects an up host at random from the specified pool.
+func (r *Random) Select(pool HostPool) *UpstreamHost {
+ // instead of just generating a random index
+ // this is done to prevent selecting a down host
+ var randHost *UpstreamHost
+ count := 0
+ for _, host := range pool {
+ if host.Down() {
+ continue
+ }
+ count++
+ if count == 1 {
+ randHost = host
+ } else {
+ r := rand.Int() % count
+ if r == (count - 1) {
+ randHost = host
+ }
+ }
+ }
+ return randHost
+}
+
+// Spray is a policy that selects a host from a pool at random. This should be used as a last ditch
+// attempt to get a host when all hosts are reporting unhealthy.
+type Spray struct{}
+
+// Select selects an up host at random from the specified pool.
+func (r *Spray) Select(pool HostPool) *UpstreamHost {
+ rnd := rand.Int() % len(pool)
+ randHost := pool[rnd]
+ log.Printf("[WARNING] All hosts reported as down, spraying to target: %s", randHost.Name)
+ return randHost
+}
+
+// LeastConn is a policy that selects the host with the least connections.
+type LeastConn struct{}
+
+// Select selects the up host with the least number of connections in the
+// pool. If more than one host has the same least number of connections,
+// one of the hosts is chosen at random.
+func (r *LeastConn) Select(pool HostPool) *UpstreamHost {
+ var bestHost *UpstreamHost
+ count := 0
+ leastConn := int64(1<<63 - 1)
+ for _, host := range pool {
+ if host.Down() {
+ continue
+ }
+ hostConns := host.Conns
+ if hostConns < leastConn {
+ bestHost = host
+ leastConn = hostConns
+ count = 1
+ } else if hostConns == leastConn {
+ // randomly select host among hosts with least connections
+ count++
+ if count == 1 {
+ bestHost = host
+ } else {
+ r := rand.Int() % count
+ if r == (count - 1) {
+ bestHost = host
+ }
+ }
+ }
+ }
+ return bestHost
+}
+
+// RoundRobin is a policy that selects hosts based on round robin ordering.
+type RoundRobin struct {
+ Robin uint32
+}
+
+// Select selects an up host from the pool using a round robin ordering scheme.
+func (r *RoundRobin) Select(pool HostPool) *UpstreamHost {
+ poolLen := uint32(len(pool))
+ selection := atomic.AddUint32(&r.Robin, 1) % poolLen
+ host := pool[selection]
+ // if the currently selected host is down, just ffwd to up host
+ for i := uint32(1); host.Down() && i < poolLen; i++ {
+ host = pool[(selection+i)%poolLen]
+ }
+ return host
+}
diff --git a/plugin/pkg/healthcheck/policy_test.go b/plugin/pkg/healthcheck/policy_test.go
new file mode 100644
index 000000000..4c667952c
--- /dev/null
+++ b/plugin/pkg/healthcheck/policy_test.go
@@ -0,0 +1,143 @@
+package healthcheck
+
+import (
+ "io/ioutil"
+ "log"
+ "net/http"
+ "net/http/httptest"
+ "os"
+ "testing"
+ "time"
+)
+
+var workableServer *httptest.Server
+
+func TestMain(m *testing.M) {
+ workableServer = httptest.NewServer(http.HandlerFunc(
+ func(w http.ResponseWriter, r *http.Request) {
+ // do nothing
+ }))
+ r := m.Run()
+ workableServer.Close()
+ os.Exit(r)
+}
+
+type customPolicy struct{}
+
+func (r *customPolicy) Select(pool HostPool) *UpstreamHost {
+ return pool[0]
+}
+
+func testPool() HostPool {
+ pool := []*UpstreamHost{
+ {
+ Name: workableServer.URL, // this should resolve (healthcheck test)
+ },
+ {
+ Name: "http://shouldnot.resolve", // this shouldn't
+ },
+ {
+ Name: "http://C",
+ },
+ }
+ return HostPool(pool)
+}
+
+func TestRegisterPolicy(t *testing.T) {
+ name := "custom"
+ customPolicy := &customPolicy{}
+ RegisterPolicy(name, func() Policy { return customPolicy })
+ if _, ok := SupportedPolicies[name]; !ok {
+ t.Error("Expected supportedPolicies to have a custom policy.")
+ }
+
+}
+
+// TODO(miek): Disabled for now, we should get out of the habit of using
+// realtime in these tests .
+func testHealthCheck(t *testing.T) {
+ log.SetOutput(ioutil.Discard)
+
+ u := &HealthCheck{
+ Hosts: testPool(),
+ FailTimeout: 10 * time.Second,
+ Future: 60 * time.Second,
+ MaxFails: 1,
+ }
+
+ u.healthCheck()
+ // sleep a bit, it's async now
+ time.Sleep(time.Duration(2 * time.Second))
+
+ if u.Hosts[0].Down() {
+ t.Error("Expected first host in testpool to not fail healthcheck.")
+ }
+ if !u.Hosts[1].Down() {
+ t.Error("Expected second host in testpool to fail healthcheck.")
+ }
+}
+
+func TestSelect(t *testing.T) {
+ u := &HealthCheck{
+ Hosts: testPool()[:3],
+ FailTimeout: 10 * time.Second,
+ Future: 60 * time.Second,
+ MaxFails: 1,
+ }
+ u.Hosts[0].OkUntil = time.Unix(0, 0)
+ u.Hosts[1].OkUntil = time.Unix(0, 0)
+ u.Hosts[2].OkUntil = time.Unix(0, 0)
+ if h := u.Select(); h != nil {
+ t.Error("Expected select to return nil as all host are down")
+ }
+ u.Hosts[2].OkUntil = time.Time{}
+ if h := u.Select(); h == nil {
+ t.Error("Expected select to not return nil")
+ }
+}
+
+func TestRoundRobinPolicy(t *testing.T) {
+ pool := testPool()
+ rrPolicy := &RoundRobin{}
+ h := rrPolicy.Select(pool)
+ // First selected host is 1, because counter starts at 0
+ // and increments before host is selected
+ if h != pool[1] {
+ t.Error("Expected first round robin host to be second host in the pool.")
+ }
+ h = rrPolicy.Select(pool)
+ if h != pool[2] {
+ t.Error("Expected second round robin host to be third host in the pool.")
+ }
+ // mark host as down
+ pool[0].OkUntil = time.Unix(0, 0)
+ h = rrPolicy.Select(pool)
+ if h != pool[1] {
+ t.Error("Expected third round robin host to be first host in the pool.")
+ }
+}
+
+func TestLeastConnPolicy(t *testing.T) {
+ pool := testPool()
+ lcPolicy := &LeastConn{}
+ pool[0].Conns = 10
+ pool[1].Conns = 10
+ h := lcPolicy.Select(pool)
+ if h != pool[2] {
+ t.Error("Expected least connection host to be third host.")
+ }
+ pool[2].Conns = 100
+ h = lcPolicy.Select(pool)
+ if h != pool[0] && h != pool[1] {
+ t.Error("Expected least connection host to be first or second host.")
+ }
+}
+
+func TestCustomPolicy(t *testing.T) {
+ pool := testPool()
+ customPolicy := &customPolicy{}
+ h := customPolicy.Select(pool)
+ if h != pool[0] {
+ t.Error("Expected custom policy host to be the first host.")
+ }
+}