aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--middleware/pkg/healthcheck/healthcheck.go236
-rw-r--r--middleware/pkg/healthcheck/policy.go (renamed from middleware/proxy/policy.go)12
-rw-r--r--middleware/pkg/healthcheck/policy_test.go (renamed from middleware/proxy/policy_test.go)55
-rw-r--r--middleware/proxy/google.go26
-rw-r--r--middleware/proxy/grpc_test.go20
-rw-r--r--middleware/proxy/lookup.go29
-rw-r--r--middleware/proxy/proxy.go45
-rw-r--r--middleware/proxy/upstream.go226
-rw-r--r--middleware/proxy/upstream_test.go58
9 files changed, 359 insertions, 348 deletions
diff --git a/middleware/pkg/healthcheck/healthcheck.go b/middleware/pkg/healthcheck/healthcheck.go
new file mode 100644
index 000000000..e0152a47b
--- /dev/null
+++ b/middleware/pkg/healthcheck/healthcheck.go
@@ -0,0 +1,236 @@
+package healthcheck
+
+import (
+ "io"
+ "io/ioutil"
+ "log"
+ "net"
+ "net/http"
+ "net/url"
+ "sync"
+ "sync/atomic"
+ "time"
+)
+
+// UpstreamHostDownFunc can be used to customize how Down behaves.
+type UpstreamHostDownFunc func(*UpstreamHost) bool
+
+// UpstreamHost represents a single proxy upstream
+type UpstreamHost struct {
+ Conns int64 // must be first field to be 64-bit aligned on 32-bit systems
+ Name string // IP address (and port) of this upstream host
+ Fails int32
+ FailTimeout time.Duration
+ OkUntil time.Time
+ CheckDown UpstreamHostDownFunc
+ CheckURL string
+ WithoutPathPrefix string
+ Checking bool
+ CheckMu sync.Mutex
+}
+
+// Down checks whether the upstream host is down or not.
+// Down will try to use uh.CheckDown first, and will fall
+// back to some default criteria if necessary.
+func (uh *UpstreamHost) Down() bool {
+ if uh.CheckDown == nil {
+ // Default settings
+ fails := atomic.LoadInt32(&uh.Fails)
+ after := false
+
+ uh.CheckMu.Lock()
+ until := uh.OkUntil
+ uh.CheckMu.Unlock()
+
+ if !until.IsZero() && time.Now().After(until) {
+ after = true
+ }
+
+ return after || fails > 0
+ }
+ return uh.CheckDown(uh)
+}
+
+// HostPool is a collection of UpstreamHosts.
+type HostPool []*UpstreamHost
+
+type HealthCheck struct {
+ wg sync.WaitGroup // Used to wait for running goroutines to stop.
+ stop chan struct{} // Signals running goroutines to stop.
+ Hosts HostPool
+ Policy Policy
+ Spray Policy
+ FailTimeout time.Duration
+ MaxFails int32
+ Future time.Duration
+ Path string
+ Port string
+ Interval time.Duration
+}
+
+func (u *HealthCheck) Start() {
+ u.stop = make(chan struct{})
+ if u.Path != "" {
+ u.wg.Add(1)
+ go func() {
+ defer u.wg.Done()
+ u.HealthCheckWorker(u.stop)
+ }()
+ }
+}
+
+// Stop sends a signal to all goroutines started by this staticUpstream to exit
+// and waits for them to finish before returning.
+func (u *HealthCheck) Stop() error {
+ close(u.stop)
+ u.wg.Wait()
+ return nil
+}
+
+// This was moved into a thread so that each host could throw a health
+// check at the same time. The reason for this is that if we are checking
+// 3 hosts, and the first one is gone, and we spend minutes timing out to
+// fail it, we would not have been doing any other health checks in that
+// time. So we now have a per-host lock and a threaded health check.
+//
+// We use the Checking bool to avoid concurrent checks against the same
+// host; if one is taking a long time, the next one will find a check in
+// progress and simply return before trying.
+//
+// We are carefully avoiding having the mutex locked while we check,
+// otherwise checks will back up, potentially a lot of them if a host is
+// absent for a long time. This arrangement makes checks quickly see if
+// they are the only one running and abort otherwise.
+func healthCheckURL(nextTs time.Time, host *UpstreamHost) {
+
+ // lock for our bool check. We don't just defer the unlock because
+ // we don't want the lock held while http.Get runs
+ host.CheckMu.Lock()
+
+ // are we mid check? Don't run another one
+ if host.Checking {
+ host.CheckMu.Unlock()
+ return
+ }
+
+ host.Checking = true
+ host.CheckMu.Unlock()
+
+ //log.Printf("[DEBUG] Healthchecking %s, nextTs is %s\n", url, nextTs.Local())
+
+ // fetch that url. This has been moved into a go func because
+ // when the remote host is not merely not serving, but actually
+ // absent, then tcp syn timeouts can be very long, and so one
+ // fetch could last several check intervals
+ if r, err := http.Get(host.CheckURL); err == nil {
+ io.Copy(ioutil.Discard, r.Body)
+ r.Body.Close()
+
+ if r.StatusCode < 200 || r.StatusCode >= 400 {
+ log.Printf("[WARNING] Host %s health check returned HTTP code %d\n",
+ host.Name, r.StatusCode)
+ nextTs = time.Unix(0, 0)
+ }
+ } else {
+ log.Printf("[WARNING] Host %s health check probe failed: %v\n", host.Name, err)
+ nextTs = time.Unix(0, 0)
+ }
+
+ host.CheckMu.Lock()
+ host.Checking = false
+ host.OkUntil = nextTs
+ host.CheckMu.Unlock()
+}
+
+func (u *HealthCheck) healthCheck() {
+ for _, host := range u.Hosts {
+
+ if host.CheckURL == "" {
+ var hostName, checkPort string
+
+ // The DNS server might be an HTTP server. If so, extract its name.
+ ret, err := url.Parse(host.Name)
+ if err == nil && len(ret.Host) > 0 {
+ hostName = ret.Host
+ } else {
+ hostName = host.Name
+ }
+
+ // Extract the port number from the parsed server name.
+ checkHostName, checkPort, err := net.SplitHostPort(hostName)
+ if err != nil {
+ checkHostName = hostName
+ }
+
+ if u.Port != "" {
+ checkPort = u.Port
+ }
+
+ host.CheckURL = "http://" + net.JoinHostPort(checkHostName, checkPort) + u.Path
+ }
+
+ // calculate this before the get
+ nextTs := time.Now().Add(u.Future)
+
+ // locks/bools should prevent requests backing up
+ go healthCheckURL(nextTs, host)
+ }
+}
+
+func (u *HealthCheck) HealthCheckWorker(stop chan struct{}) {
+ ticker := time.NewTicker(u.Interval)
+ u.healthCheck()
+ for {
+ select {
+ case <-ticker.C:
+ u.healthCheck()
+ case <-stop:
+ ticker.Stop()
+ return
+ }
+ }
+}
+
+func (u *HealthCheck) Select() *UpstreamHost {
+ pool := u.Hosts
+ if len(pool) == 1 {
+ if pool[0].Down() && u.Spray == nil {
+ return nil
+ }
+ return pool[0]
+ }
+ allDown := true
+ for _, host := range pool {
+ if !host.Down() {
+ allDown = false
+ break
+ }
+ }
+ if allDown {
+ if u.Spray == nil {
+ return nil
+ }
+ return u.Spray.Select(pool)
+ }
+
+ if u.Policy == nil {
+ h := (&Random{}).Select(pool)
+ if h != nil {
+ return h
+ }
+ if h == nil && u.Spray == nil {
+ return nil
+ }
+ return u.Spray.Select(pool)
+ }
+
+ h := u.Policy.Select(pool)
+ if h != nil {
+ return h
+ }
+
+ if u.Spray == nil {
+ return nil
+ }
+ return u.Spray.Select(pool)
+}
diff --git a/middleware/proxy/policy.go b/middleware/pkg/healthcheck/policy.go
index e0c9d7e2b..0cef8d79a 100644
--- a/middleware/proxy/policy.go
+++ b/middleware/pkg/healthcheck/policy.go
@@ -1,4 +1,4 @@
-package proxy
+package healthcheck
import (
"log"
@@ -6,8 +6,14 @@ import (
"sync/atomic"
)
-// HostPool is a collection of UpstreamHosts.
-type HostPool []*UpstreamHost
+var (
+ SupportedPolicies = make(map[string]func() Policy)
+)
+
+// RegisterPolicy adds a custom policy to the proxy.
+func RegisterPolicy(name string, policy func() Policy) {
+ SupportedPolicies[name] = policy
+}
// Policy decides how a host will be selected from a pool. When all hosts are unhealthy, it is assumed the
// healthchecking failed. In this case each policy will *randomly* return a host from the pool to prevent
diff --git a/middleware/proxy/policy_test.go b/middleware/pkg/healthcheck/policy_test.go
index 24fd3efdc..16cae7266 100644
--- a/middleware/proxy/policy_test.go
+++ b/middleware/pkg/healthcheck/policy_test.go
@@ -1,6 +1,8 @@
-package proxy
+package healthcheck
import (
+ "io/ioutil"
+ "log"
"net/http"
"net/http/httptest"
"os"
@@ -41,6 +43,57 @@ func testPool() HostPool {
return HostPool(pool)
}
+func TestRegisterPolicy(t *testing.T) {
+ name := "custom"
+ customPolicy := &customPolicy{}
+ RegisterPolicy(name, func() Policy { return customPolicy })
+ if _, ok := SupportedPolicies[name]; !ok {
+ t.Error("Expected supportedPolicies to have a custom policy.")
+ }
+
+}
+
+func TestHealthCheck(t *testing.T) {
+ log.SetOutput(ioutil.Discard)
+
+ u := &HealthCheck{
+ Hosts: testPool(),
+ FailTimeout: 10 * time.Second,
+ Future: 60 * time.Second,
+ MaxFails: 1,
+ }
+
+ u.healthCheck()
+ // sleep a bit, it's async now
+ time.Sleep(time.Duration(2 * time.Second))
+
+ if u.Hosts[0].Down() {
+ t.Error("Expected first host in testpool to not fail healthcheck.")
+ }
+ if !u.Hosts[1].Down() {
+ t.Error("Expected second host in testpool to fail healthcheck.")
+ }
+}
+
+func TestSelect(t *testing.T) {
+ u := &HealthCheck{
+ Hosts: testPool()[:3],
+ FailTimeout: 10 * time.Second,
+ Future: 60 * time.Second,
+ MaxFails: 1,
+ }
+ u.Hosts[0].OkUntil = time.Unix(0, 0)
+ u.Hosts[1].OkUntil = time.Unix(0, 0)
+ u.Hosts[2].OkUntil = time.Unix(0, 0)
+ if h := u.Select(); h != nil {
+ t.Error("Expected select to return nil as all host are down")
+ }
+ u.Hosts[2].OkUntil = time.Time{}
+ if h := u.Select(); h == nil {
+ t.Error("Expected select to not return nil")
+ }
+}
+
func TestRoundRobinPolicy(t *testing.T) {
pool := testPool()
rrPolicy := &RoundRobin{}
diff --git a/middleware/proxy/google.go b/middleware/proxy/google.go
index f021bb2b3..b71d0fb1b 100644
--- a/middleware/proxy/google.go
+++ b/middleware/proxy/google.go
@@ -14,6 +14,7 @@ import (
"time"
"github.com/coredns/coredns/middleware/pkg/debug"
+ "github.com/coredns/coredns/middleware/pkg/healthcheck"
"github.com/coredns/coredns/request"
"github.com/miekg/dns"
@@ -200,34 +201,33 @@ func extractAnswer(m *dns.Msg) ([]string, error) {
// newUpstream returns an upstream initialized with hosts.
func newUpstream(hosts []string, old *staticUpstream) Upstream {
upstream := &staticUpstream{
- from: old.from,
- Hosts: nil,
- Policy: &Random{},
- Spray: nil,
- FailTimeout: 10 * time.Second,
- MaxFails: 3,
- Future: 60 * time.Second,
+ from: old.from,
+ HealthCheck: healthcheck.HealthCheck{
+ FailTimeout: 10 * time.Second,
+ MaxFails: 3,
+ Future: 60 * time.Second,
+ },
ex: old.ex,
WithoutPathPrefix: old.WithoutPathPrefix,
IgnoredSubDomains: old.IgnoredSubDomains,
}
- upstream.Hosts = make([]*UpstreamHost, len(hosts))
+ upstream.Hosts = make([]*healthcheck.UpstreamHost, len(hosts))
for i, h := range hosts {
- uh := &UpstreamHost{
+ uh := &healthcheck.UpstreamHost{
Name: h,
Conns: 0,
Fails: 0,
FailTimeout: upstream.FailTimeout,
- CheckDown: func(upstream *staticUpstream) UpstreamHostDownFunc {
- return func(uh *UpstreamHost) bool {
+ CheckDown: func(upstream *staticUpstream) healthcheck.UpstreamHostDownFunc {
+ return func(uh *healthcheck.UpstreamHost) bool {
down := false
- uh.checkMu.Lock()
+ uh.CheckMu.Lock()
until := uh.OkUntil
- uh.checkMu.Unlock()
+ uh.CheckMu.Unlock()
if !until.IsZero() && time.Now().After(until) {
down = true
diff --git a/middleware/proxy/grpc_test.go b/middleware/proxy/grpc_test.go
index e303e1594..dcde7cc0e 100644
--- a/middleware/proxy/grpc_test.go
+++ b/middleware/proxy/grpc_test.go
@@ -4,11 +4,13 @@ import (
"testing"
"time"
+ "github.com/coredns/coredns/middleware/pkg/healthcheck"
+
"google.golang.org/grpc/grpclog"
)
-func pool() []*UpstreamHost {
- return []*UpstreamHost{
+func pool() []*healthcheck.UpstreamHost {
+ return []*healthcheck.UpstreamHost{
{
Name: "localhost:10053",
},
@@ -22,13 +24,13 @@ func TestStartupShutdown(t *testing.T) {
grpclog.SetLogger(discard{})
upstream := &staticUpstream{
- from: ".",
- Hosts: pool(),
- Policy: &Random{},
- Spray: nil,
- FailTimeout: 10 * time.Second,
- Future: 60 * time.Second,
- MaxFails: 1,
+ from: ".",
+ HealthCheck: healthcheck.HealthCheck{
+ Hosts: pool(),
+ FailTimeout: 10 * time.Second,
+ Future: 60 * time.Second,
+ MaxFails: 1,
+ },
}
g := newGrpcClient(nil, upstream)
upstream.ex = g
diff --git a/middleware/proxy/lookup.go b/middleware/proxy/lookup.go
index a6c714f39..eda0d0a0c 100644
--- a/middleware/proxy/lookup.go
+++ b/middleware/proxy/lookup.go
@@ -7,6 +7,7 @@ import (
"sync/atomic"
"time"
+ "github.com/coredns/coredns/middleware/pkg/healthcheck"
"github.com/coredns/coredns/request"
"github.com/miekg/dns"
@@ -24,31 +25,31 @@ func NewLookupWithOption(hosts []string, opts Options) Proxy {
// TODO(miek): this needs to be unified with upstream.go's NewStaticUpstreams, caddy uses NewHost
// we should copy/make something similar.
upstream := &staticUpstream{
- from: ".",
- Hosts: make([]*UpstreamHost, len(hosts)),
- Policy: &Random{},
- Spray: nil,
- FailTimeout: 10 * time.Second,
- MaxFails: 3, // TODO(miek): disable error checking for simple lookups?
- Future: 60 * time.Second,
- ex: newDNSExWithOption(opts),
+ from: ".",
+ HealthCheck: healthcheck.HealthCheck{
+ FailTimeout: 10 * time.Second,
+ MaxFails: 3, // TODO(miek): disable error checking for simple lookups?
+ Future: 60 * time.Second,
+ },
+ ex: newDNSExWithOption(opts),
}
+ upstream.Hosts = make([]*healthcheck.UpstreamHost, len(hosts))
for i, host := range hosts {
- uh := &UpstreamHost{
+ uh := &healthcheck.UpstreamHost{
Name: host,
Conns: 0,
Fails: 0,
FailTimeout: upstream.FailTimeout,
- CheckDown: func(upstream *staticUpstream) UpstreamHostDownFunc {
- return func(uh *UpstreamHost) bool {
+ CheckDown: func(upstream *staticUpstream) healthcheck.UpstreamHostDownFunc {
+ return func(uh *healthcheck.UpstreamHost) bool {
down := false
- uh.checkMu.Lock()
+ uh.CheckMu.Lock()
until := uh.OkUntil
- uh.checkMu.Unlock()
+ uh.CheckMu.Unlock()
if !until.IsZero() && time.Now().After(until) {
down = true
@@ -120,7 +121,7 @@ func (p Proxy) lookup(state request.Request) (*dns.Msg, error) {
timeout = 10 * time.Second
}
atomic.AddInt32(&host.Fails, 1)
- go func(host *UpstreamHost, timeout time.Duration) {
+ go func(host *healthcheck.UpstreamHost, timeout time.Duration) {
time.Sleep(timeout)
atomic.AddInt32(&host.Fails, -1)
}(host, timeout)
diff --git a/middleware/proxy/proxy.go b/middleware/proxy/proxy.go
index 8780330ed..7e662c42e 100644
--- a/middleware/proxy/proxy.go
+++ b/middleware/proxy/proxy.go
@@ -3,11 +3,11 @@ package proxy
import (
"errors"
- "sync"
"sync/atomic"
"time"
"github.com/coredns/coredns/middleware"
+ "github.com/coredns/coredns/middleware/pkg/healthcheck"
"github.com/coredns/coredns/request"
"github.com/miekg/dns"
@@ -41,7 +41,7 @@ type Upstream interface {
// The domain name this upstream host should be routed on.
From() string
// Selects an upstream host to be routed to.
- Select() *UpstreamHost
+ Select() *healthcheck.UpstreamHost
// Checks if subpdomain is not an ignored.
IsAllowedDomain(string) bool
// Exchanger returns the exchanger to be used for this upstream.
@@ -50,45 +50,6 @@ type Upstream interface {
Stop() error
}
-// UpstreamHostDownFunc can be used to customize how Down behaves.
-type UpstreamHostDownFunc func(*UpstreamHost) bool
-
-// UpstreamHost represents a single proxy upstream
-type UpstreamHost struct {
- Conns int64 // must be first field to be 64-bit aligned on 32-bit systems
- Name string // IP address (and port) of this upstream host
- Fails int32
- FailTimeout time.Duration
- OkUntil time.Time
- CheckDown UpstreamHostDownFunc
- CheckURL string
- WithoutPathPrefix string
- Checking bool
- checkMu sync.Mutex
-}
-
-// Down checks whether the upstream host is down or not.
-// Down will try to use uh.CheckDown first, and will fall
-// back to some default criteria if necessary.
-func (uh *UpstreamHost) Down() bool {
- if uh.CheckDown == nil {
- // Default settings
- fails := atomic.LoadInt32(&uh.Fails)
- after := false
-
- uh.checkMu.Lock()
- until := uh.OkUntil
- uh.checkMu.Unlock()
-
- if !until.IsZero() && time.Now().After(until) {
- after = true
- }
-
- return after || fails > 0
- }
- return uh.CheckDown(uh)
-}
-
// tryDuration is how long to try upstream hosts; failures result in
// immediate retries until this duration ends or we get a nil host.
var tryDuration = 60 * time.Second
@@ -145,7 +106,7 @@ func (p Proxy) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg) (
timeout = 10 * time.Second
}
atomic.AddInt32(&host.Fails, 1)
- go func(host *UpstreamHost, timeout time.Duration) {
+ go func(host *healthcheck.UpstreamHost, timeout time.Duration) {
time.Sleep(timeout)
atomic.AddInt32(&host.Fails, -1)
}(host, timeout)
diff --git a/middleware/proxy/upstream.go b/middleware/proxy/upstream.go
index 380b585be..93ef0e32d 100644
--- a/middleware/proxy/upstream.go
+++ b/middleware/proxy/upstream.go
@@ -2,46 +2,25 @@ package proxy
import (
"fmt"
- "io"
- "io/ioutil"
- "log"
"net"
- "net/http"
- "net/url"
"strconv"
"strings"
- "sync"
"sync/atomic"
"time"
"github.com/coredns/coredns/middleware"
"github.com/coredns/coredns/middleware/pkg/dnsutil"
+ "github.com/coredns/coredns/middleware/pkg/healthcheck"
"github.com/coredns/coredns/middleware/pkg/tls"
"github.com/mholt/caddy/caddyfile"
"github.com/miekg/dns"
)
-var (
- supportedPolicies = make(map[string]func() Policy)
-)
-
type staticUpstream struct {
from string
- stop chan struct{} // Signals running goroutines to stop.
- wg sync.WaitGroup // Used to wait for running goroutines to stop.
- Hosts HostPool
- Policy Policy
- Spray Policy
+ healthcheck.HealthCheck
- FailTimeout time.Duration
- MaxFails int32
- Future time.Duration
- HealthCheck struct {
- Path string
- Port string
- Interval time.Duration
- }
WithoutPathPrefix string
IgnoredSubDomains []string
ex Exchanger
@@ -53,15 +32,13 @@ func NewStaticUpstreams(c *caddyfile.Dispenser) ([]Upstream, error) {
var upstreams []Upstream
for c.Next() {
upstream := &staticUpstream{
- from: ".",
- stop: make(chan struct{}),
- Hosts: nil,
- Policy: &Random{},
- Spray: nil,
- FailTimeout: 10 * time.Second,
- MaxFails: 1,
- Future: 60 * time.Second,
- ex: newDNSEx(),
+ from: ".",
+ HealthCheck: healthcheck.HealthCheck{
+ FailTimeout: 10 * time.Second,
+ MaxFails: 1,
+ Future: 60 * time.Second,
+ },
+ ex: newDNSEx(),
}
if !c.Args(&upstream.from) {
@@ -84,22 +61,22 @@ func NewStaticUpstreams(c *caddyfile.Dispenser) ([]Upstream, error) {
}
}
- upstream.Hosts = make([]*UpstreamHost, len(toHosts))
+ upstream.Hosts = make([]*healthcheck.UpstreamHost, len(toHosts))
for i, host := range toHosts {
- uh := &UpstreamHost{
+ uh := &healthcheck.UpstreamHost{
Name: host,
Conns: 0,
Fails: 0,
FailTimeout: upstream.FailTimeout,
- CheckDown: func(upstream *staticUpstream) UpstreamHostDownFunc {
- return func(uh *UpstreamHost) bool {
+ CheckDown: func(upstream *staticUpstream) healthcheck.UpstreamHostDownFunc {
+ return func(uh *healthcheck.UpstreamHost) bool {
down := false
- uh.checkMu.Lock()
+ uh.CheckMu.Lock()
until := uh.OkUntil
- uh.checkMu.Unlock()
+ uh.CheckMu.Unlock()
if !until.IsZero() && time.Now().After(until) {
down = true
@@ -117,32 +94,13 @@ func NewStaticUpstreams(c *caddyfile.Dispenser) ([]Upstream, error) {
upstream.Hosts[i] = uh
}
+ upstream.Start()
- if upstream.HealthCheck.Path != "" {
- upstream.wg.Add(1)
- go func() {
- defer upstream.wg.Done()
- upstream.HealthCheckWorker(upstream.stop)
- }()
- }
upstreams = append(upstreams, upstream)
}
return upstreams, nil
}
-// Stop sends a signal to all goroutines started by this staticUpstream to exit
-// and waits for them to finish before returning.
-func (u *staticUpstream) Stop() error {
- close(u.stop)
- u.wg.Wait()
- return nil
-}
-
-// RegisterPolicy adds a custom policy to the proxy.
-func RegisterPolicy(name string, policy func() Policy) {
- supportedPolicies[name] = policy
-}
-
func (u *staticUpstream) From() string {
return u.from
}
@@ -153,7 +111,7 @@ func parseBlock(c *caddyfile.Dispenser, u *staticUpstream) error {
if !c.NextArg() {
return c.ArgErr()
}
- policyCreateFunc, ok := supportedPolicies[c.Val()]
+ policyCreateFunc, ok := healthcheck.SupportedPolicies[c.Val()]
if !ok {
return c.ArgErr()
}
@@ -214,7 +172,7 @@ func parseBlock(c *caddyfile.Dispenser, u *staticUpstream) error {
}
u.IgnoredSubDomains = ignoredDomains
case "spray":
- u.Spray = &Spray{}
+ u.Spray = &healthcheck.Spray{}
case "protocol":
encArgs := c.RemainingArgs()
if len(encArgs) == 0 {
@@ -259,154 +217,6 @@ func parseBlock(c *caddyfile.Dispenser, u *staticUpstream) error {
return nil
}
-// This was moved into a thread so that each host could throw a health
-// check at the same time. The reason for this is that if we are checking
-// 3 hosts, and the first one is gone, and we spend minutes timing out to
-// fail it, we would not have been doing any other health checks in that
-// time. So we now have a per-host lock and a threaded health check.
-//
-// We use the Checking bool to avoid concurrent checks against the same
-// host; if one is taking a long time, the next one will find a check in
-// progress and simply return before trying.
-//
-// We are carefully avoiding having the mutex locked while we check,
-// otherwise checks will back up, potentially a lot of them if a host is
-// absent for a long time. This arrangement makes checks quickly see if
-// they are the only one running and abort otherwise.
-func healthCheckURL(nextTs time.Time, host *UpstreamHost) {
-
- // lock for our bool check. We don't just defer the unlock because
- // we don't want the lock held while http.Get runs
- host.checkMu.Lock()
-
- // are we mid check? Don't run another one
- if host.Checking {
- host.checkMu.Unlock()
- return
- }
-
- host.Checking = true
- host.checkMu.Unlock()
-
- //log.Printf("[DEBUG] Healthchecking %s, nextTs is %s\n", url, nextTs.Local())
-
- // fetch that url. This has been moved into a go func because
- // when the remote host is not merely not serving, but actually
- // absent, then tcp syn timeouts can be very long, and so one
- // fetch could last several check intervals
- if r, err := http.Get(host.CheckURL); err == nil {
- io.Copy(ioutil.Discard, r.Body)
- r.Body.Close()
-
- if r.StatusCode < 200 || r.StatusCode >= 400 {
- log.Printf("[WARNING] Host %s health check returned HTTP code %d\n",
- host.Name, r.StatusCode)
- nextTs = time.Unix(0, 0)
- }
- } else {
- log.Printf("[WARNING] Host %s health check probe failed: %v\n", host.Name, err)
- nextTs = time.Unix(0, 0)
- }
-
- host.checkMu.Lock()
- host.Checking = false
- host.OkUntil = nextTs
- host.checkMu.Unlock()
-}
-
-func (u *staticUpstream) healthCheck() {
- for _, host := range u.Hosts {
-
- if host.CheckURL == "" {
- var hostName, checkPort string
-
- // The DNS server might be an HTTP server. If so, extract its name.
- ret, err := url.Parse(host.Name)
- if err == nil && len(ret.Host) > 0 {
- hostName = ret.Host
- } else {
- hostName = host.Name
- }
-
- // Extract the port number from the parsed server name.
- checkHostName, checkPort, err := net.SplitHostPort(hostName)
- if err != nil {
- checkHostName = hostName
- }
-
- if u.HealthCheck.Port != "" {
- checkPort = u.HealthCheck.Port
- }
-
- host.CheckURL = "http://" + net.JoinHostPort(checkHostName, checkPort) + u.HealthCheck.Path
- }
-
- // calculate this before the get
- nextTs := time.Now().Add(u.Future)
-
- // locks/bools should prevent requests backing up
- go healthCheckURL(nextTs, host)
- }
-}
-
-func (u *staticUpstream) HealthCheckWorker(stop chan struct{}) {
- ticker := time.NewTicker(u.HealthCheck.Interval)
- u.healthCheck()
- for {
- select {
- case <-ticker.C:
- u.healthCheck()
- case <-stop:
- ticker.Stop()
- return
- }
- }
-}
-
-func (u *staticUpstream) Select() *UpstreamHost {
- pool := u.Hosts
- if len(pool) == 1 {
- if pool[0].Down() && u.Spray == nil {
- return nil
- }
- return pool[0]
- }
- allDown := true
- for _, host := range pool {
- if !host.Down() {
- allDown = false
- break
- }
- }
- if allDown {
- if u.Spray == nil {
- return nil
- }
- return u.Spray.Select(pool)
- }
-
- if u.Policy == nil {
- h := (&Random{}).Select(pool)
- if h != nil {
- return h
- }
- if h == nil && u.Spray == nil {
- return nil
- }
- return u.Spray.Select(pool)
- }
-
- h := u.Policy.Select(pool)
- if h != nil {
- return h
- }
-
- if u.Spray == nil {
- return nil
- }
- return u.Spray.Select(pool)
-}
-
func (u *staticUpstream) IsAllowedDomain(name string) bool {
if dns.Name(name) == dns.Name(u.From()) {
return true
diff --git a/middleware/proxy/upstream_test.go b/middleware/proxy/upstream_test.go
index 06c229f39..3aa4104e8 100644
--- a/middleware/proxy/upstream_test.go
+++ b/middleware/proxy/upstream_test.go
@@ -2,74 +2,16 @@ package proxy
import (
"io/ioutil"
- "log"
"os"
"path/filepath"
"strings"
"testing"
- "time"
"github.com/coredns/coredns/middleware/test"
"github.com/mholt/caddy"
)
-func TestHealthCheck(t *testing.T) {
- log.SetOutput(ioutil.Discard)
-
- upstream := &staticUpstream{
- from: ".",
- Hosts: testPool(),
- Policy: &Random{},
- Spray: nil,
- FailTimeout: 10 * time.Second,
- Future: 60 * time.Second,
- MaxFails: 1,
- }
-
- upstream.healthCheck()
- // sleep a bit, it's async now
- time.Sleep(time.Duration(2 * time.Second))
-
- if upstream.Hosts[0].Down() {
- t.Error("Expected first host in testpool to not fail healthcheck.")
- }
- if !upstream.Hosts[1].Down() {
- t.Error("Expected second host in testpool to fail healthcheck.")
- }
-}
-
-func TestSelect(t *testing.T) {
- upstream := &staticUpstream{
- from: ".",
- Hosts: testPool()[:3],
- Policy: &Random{},
- FailTimeout: 10 * time.Second,
- Future: 60 * time.Second,
- MaxFails: 1,
- }
- upstream.Hosts[0].OkUntil = time.Unix(0, 0)
- upstream.Hosts[1].OkUntil = time.Unix(0, 0)
- upstream.Hosts[2].OkUntil = time.Unix(0, 0)
- if h := upstream.Select(); h != nil {
- t.Error("Expected select to return nil as all host are down")
- }
- upstream.Hosts[2].OkUntil = time.Time{}
- if h := upstream.Select(); h == nil {
- t.Error("Expected select to not return nil")
- }
-}
-
-func TestRegisterPolicy(t *testing.T) {
- name := "custom"
- customPolicy := &customPolicy{}
- RegisterPolicy(name, func() Policy { return customPolicy })
- if _, ok := supportedPolicies[name]; !ok {
- t.Error("Expected supportedPolicies to have a custom policy.")
- }
-
-}
-
func TestAllowedDomain(t *testing.T) {
upstream := &staticUpstream{
from: "miek.nl.",