diff options
Diffstat (limited to 'middleware/proxy/upstream.go')
-rw-r--r-- | middleware/proxy/upstream.go | 226 |
1 files changed, 18 insertions, 208 deletions
diff --git a/middleware/proxy/upstream.go b/middleware/proxy/upstream.go index 380b585be..93ef0e32d 100644 --- a/middleware/proxy/upstream.go +++ b/middleware/proxy/upstream.go @@ -2,46 +2,25 @@ package proxy import ( "fmt" - "io" - "io/ioutil" - "log" "net" - "net/http" - "net/url" "strconv" "strings" - "sync" "sync/atomic" "time" "github.com/coredns/coredns/middleware" "github.com/coredns/coredns/middleware/pkg/dnsutil" + "github.com/coredns/coredns/middleware/pkg/healthcheck" "github.com/coredns/coredns/middleware/pkg/tls" "github.com/mholt/caddy/caddyfile" "github.com/miekg/dns" ) -var ( - supportedPolicies = make(map[string]func() Policy) -) - type staticUpstream struct { from string - stop chan struct{} // Signals running goroutines to stop. - wg sync.WaitGroup // Used to wait for running goroutines to stop. - Hosts HostPool - Policy Policy - Spray Policy + healthcheck.HealthCheck - FailTimeout time.Duration - MaxFails int32 - Future time.Duration - HealthCheck struct { - Path string - Port string - Interval time.Duration - } WithoutPathPrefix string IgnoredSubDomains []string ex Exchanger @@ -53,15 +32,13 @@ func NewStaticUpstreams(c *caddyfile.Dispenser) ([]Upstream, error) { var upstreams []Upstream for c.Next() { upstream := &staticUpstream{ - from: ".", - stop: make(chan struct{}), - Hosts: nil, - Policy: &Random{}, - Spray: nil, - FailTimeout: 10 * time.Second, - MaxFails: 1, - Future: 60 * time.Second, - ex: newDNSEx(), + from: ".", + HealthCheck: healthcheck.HealthCheck{ + FailTimeout: 10 * time.Second, + MaxFails: 1, + Future: 60 * time.Second, + }, + ex: newDNSEx(), } if !c.Args(&upstream.from) { @@ -84,22 +61,22 @@ func NewStaticUpstreams(c *caddyfile.Dispenser) ([]Upstream, error) { } } - upstream.Hosts = make([]*UpstreamHost, len(toHosts)) + upstream.Hosts = make([]*healthcheck.UpstreamHost, len(toHosts)) for i, host := range toHosts { - uh := &UpstreamHost{ + uh := &healthcheck.UpstreamHost{ Name: host, Conns: 0, Fails: 0, FailTimeout: upstream.FailTimeout, - CheckDown: func(upstream *staticUpstream) UpstreamHostDownFunc { - return func(uh *UpstreamHost) bool { + CheckDown: func(upstream *staticUpstream) healthcheck.UpstreamHostDownFunc { + return func(uh *healthcheck.UpstreamHost) bool { down := false - uh.checkMu.Lock() + uh.CheckMu.Lock() until := uh.OkUntil - uh.checkMu.Unlock() + uh.CheckMu.Unlock() if !until.IsZero() && time.Now().After(until) { down = true @@ -117,32 +94,13 @@ func NewStaticUpstreams(c *caddyfile.Dispenser) ([]Upstream, error) { upstream.Hosts[i] = uh } + upstream.Start() - if upstream.HealthCheck.Path != "" { - upstream.wg.Add(1) - go func() { - defer upstream.wg.Done() - upstream.HealthCheckWorker(upstream.stop) - }() - } upstreams = append(upstreams, upstream) } return upstreams, nil } -// Stop sends a signal to all goroutines started by this staticUpstream to exit -// and waits for them to finish before returning. -func (u *staticUpstream) Stop() error { - close(u.stop) - u.wg.Wait() - return nil -} - -// RegisterPolicy adds a custom policy to the proxy. -func RegisterPolicy(name string, policy func() Policy) { - supportedPolicies[name] = policy -} - func (u *staticUpstream) From() string { return u.from } @@ -153,7 +111,7 @@ func parseBlock(c *caddyfile.Dispenser, u *staticUpstream) error { if !c.NextArg() { return c.ArgErr() } - policyCreateFunc, ok := supportedPolicies[c.Val()] + policyCreateFunc, ok := healthcheck.SupportedPolicies[c.Val()] if !ok { return c.ArgErr() } @@ -214,7 +172,7 @@ func parseBlock(c *caddyfile.Dispenser, u *staticUpstream) error { } u.IgnoredSubDomains = ignoredDomains case "spray": - u.Spray = &Spray{} + u.Spray = &healthcheck.Spray{} case "protocol": encArgs := c.RemainingArgs() if len(encArgs) == 0 { @@ -259,154 +217,6 @@ func parseBlock(c *caddyfile.Dispenser, u *staticUpstream) error { return nil } -// This was moved into a thread so that each host could throw a health -// check at the same time. The reason for this is that if we are checking -// 3 hosts, and the first one is gone, and we spend minutes timing out to -// fail it, we would not have been doing any other health checks in that -// time. So we now have a per-host lock and a threaded health check. -// -// We use the Checking bool to avoid concurrent checks against the same -// host; if one is taking a long time, the next one will find a check in -// progress and simply return before trying. -// -// We are carefully avoiding having the mutex locked while we check, -// otherwise checks will back up, potentially a lot of them if a host is -// absent for a long time. This arrangement makes checks quickly see if -// they are the only one running and abort otherwise. -func healthCheckURL(nextTs time.Time, host *UpstreamHost) { - - // lock for our bool check. We don't just defer the unlock because - // we don't want the lock held while http.Get runs - host.checkMu.Lock() - - // are we mid check? Don't run another one - if host.Checking { - host.checkMu.Unlock() - return - } - - host.Checking = true - host.checkMu.Unlock() - - //log.Printf("[DEBUG] Healthchecking %s, nextTs is %s\n", url, nextTs.Local()) - - // fetch that url. This has been moved into a go func because - // when the remote host is not merely not serving, but actually - // absent, then tcp syn timeouts can be very long, and so one - // fetch could last several check intervals - if r, err := http.Get(host.CheckURL); err == nil { - io.Copy(ioutil.Discard, r.Body) - r.Body.Close() - - if r.StatusCode < 200 || r.StatusCode >= 400 { - log.Printf("[WARNING] Host %s health check returned HTTP code %d\n", - host.Name, r.StatusCode) - nextTs = time.Unix(0, 0) - } - } else { - log.Printf("[WARNING] Host %s health check probe failed: %v\n", host.Name, err) - nextTs = time.Unix(0, 0) - } - - host.checkMu.Lock() - host.Checking = false - host.OkUntil = nextTs - host.checkMu.Unlock() -} - -func (u *staticUpstream) healthCheck() { - for _, host := range u.Hosts { - - if host.CheckURL == "" { - var hostName, checkPort string - - // The DNS server might be an HTTP server. If so, extract its name. - ret, err := url.Parse(host.Name) - if err == nil && len(ret.Host) > 0 { - hostName = ret.Host - } else { - hostName = host.Name - } - - // Extract the port number from the parsed server name. - checkHostName, checkPort, err := net.SplitHostPort(hostName) - if err != nil { - checkHostName = hostName - } - - if u.HealthCheck.Port != "" { - checkPort = u.HealthCheck.Port - } - - host.CheckURL = "http://" + net.JoinHostPort(checkHostName, checkPort) + u.HealthCheck.Path - } - - // calculate this before the get - nextTs := time.Now().Add(u.Future) - - // locks/bools should prevent requests backing up - go healthCheckURL(nextTs, host) - } -} - -func (u *staticUpstream) HealthCheckWorker(stop chan struct{}) { - ticker := time.NewTicker(u.HealthCheck.Interval) - u.healthCheck() - for { - select { - case <-ticker.C: - u.healthCheck() - case <-stop: - ticker.Stop() - return - } - } -} - -func (u *staticUpstream) Select() *UpstreamHost { - pool := u.Hosts - if len(pool) == 1 { - if pool[0].Down() && u.Spray == nil { - return nil - } - return pool[0] - } - allDown := true - for _, host := range pool { - if !host.Down() { - allDown = false - break - } - } - if allDown { - if u.Spray == nil { - return nil - } - return u.Spray.Select(pool) - } - - if u.Policy == nil { - h := (&Random{}).Select(pool) - if h != nil { - return h - } - if h == nil && u.Spray == nil { - return nil - } - return u.Spray.Select(pool) - } - - h := u.Policy.Select(pool) - if h != nil { - return h - } - - if u.Spray == nil { - return nil - } - return u.Spray.Select(pool) -} - func (u *staticUpstream) IsAllowedDomain(name string) bool { if dns.Name(name) == dns.Name(u.From()) { return true |