aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar Miek Gieben <miek@miek.nl> 2017-04-26 10:58:14 +0100
committerGravatar GitHub <noreply@github.com> 2017-04-26 10:58:14 +0100
commit3b5b6a233fc669dfb47a6dc4eb8899c3e992cc32 (patch)
tree5db5d0834e2d3c6143387c3c96fe904782c1f5d9
parent003b1bf678f6fc1d551fed5184adccde2137e86f (diff)
downloadcoredns-3b5b6a233fc669dfb47a6dc4eb8899c3e992cc32.tar.gz
coredns-3b5b6a233fc669dfb47a6dc4eb8899c3e992cc32.tar.zst
coredns-3b5b6a233fc669dfb47a6dc4eb8899c3e992cc32.zip
middleware/proxy: Kill goroutines on stop (#646)
* middleware/proxy: Kill goroutines on stop Ports caddy's https://github.com/mholt/caddy/commit/59bf71c2932c3b814a6a1211c492a1aa9f71d4a1 Excludes the proxy_test.go test part though. Fixes #644 * Add tests
-rw-r--r--middleware/proxy/lookup.go2
-rw-r--r--middleware/proxy/proxy.go2
-rw-r--r--middleware/proxy/proxy_test.go85
-rw-r--r--middleware/proxy/setup.go2
-rw-r--r--middleware/proxy/upstream.go26
5 files changed, 112 insertions, 5 deletions
diff --git a/middleware/proxy/lookup.go b/middleware/proxy/lookup.go
index e97741fb5..370307cc1 100644
--- a/middleware/proxy/lookup.go
+++ b/middleware/proxy/lookup.go
@@ -21,6 +21,8 @@ func NewLookup(hosts []string) Proxy {
func NewLookupWithOption(hosts []string, opts Options) Proxy {
p := Proxy{Next: nil}
+ // TODO(miek): this needs to be unified with upstream.go's NewStaticUpstreams, caddy uses NewHost
+ // we should copy/make something similar.
upstream := &staticUpstream{
from: ".",
Hosts: make([]*UpstreamHost, len(hosts)),
diff --git a/middleware/proxy/proxy.go b/middleware/proxy/proxy.go
index ce8b99d83..5205bd06f 100644
--- a/middleware/proxy/proxy.go
+++ b/middleware/proxy/proxy.go
@@ -46,6 +46,8 @@ type Upstream interface {
IsAllowedDomain(string) bool
// Exchanger returns the exchanger to be used for this upstream.
Exchanger() Exchanger
+ // Stops the upstream from proxying requests to shutdown goroutines cleanly.
+ Stop() error
}
// UpstreamHostDownFunc can be used to customize how Down behaves.
diff --git a/middleware/proxy/proxy_test.go b/middleware/proxy/proxy_test.go
new file mode 100644
index 000000000..0a44d1b82
--- /dev/null
+++ b/middleware/proxy/proxy_test.go
@@ -0,0 +1,85 @@
+package proxy
+
+import (
+ "fmt"
+ "net/http"
+ "net/http/httptest"
+ "strings"
+ "sync/atomic"
+ "testing"
+ "time"
+
+ "github.com/mholt/caddy/caddyfile"
+)
+
+func TestStop(t *testing.T) {
+ config := "proxy . %s {\n health_check /healthcheck:%s %dms \n}"
+ tests := []struct {
+ name string
+ intervalInMilliseconds int
+ numHealthcheckIntervals int
+ }{
+ {
+ "No Healthchecks After Stop - 5ms, 1 intervals",
+ 5,
+ 1,
+ },
+ {
+ "No Healthchecks After Stop - 5ms, 2 intervals",
+ 5,
+ 2,
+ },
+ {
+ "No Healthchecks After Stop - 5ms, 3 intervals",
+ 5,
+ 3,
+ },
+ }
+
+ for _, test := range tests {
+ t.Run(test.name, func(t *testing.T) {
+
+ // Set up proxy.
+ var counter int64
+ backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ r.Body.Close()
+ atomic.AddInt64(&counter, 1)
+ }))
+
+ defer backend.Close()
+
+ port := backend.URL[17:] // Remove all crap up to the port
+ back := backend.URL[7:] // Remove http://
+ c := caddyfile.NewDispenser("Testfile", strings.NewReader(fmt.Sprintf(config, back, port, test.intervalInMilliseconds)))
+ upstreams, err := NewStaticUpstreams(&c)
+ if err != nil {
+ t.Error("Expected no error. Got:", err.Error())
+ }
+
+ // Give some time for healthchecks to hit the server.
+ time.Sleep(time.Duration(test.intervalInMilliseconds*test.numHealthcheckIntervals) * time.Millisecond)
+
+ for _, upstream := range upstreams {
+ if err := upstream.Stop(); err != nil {
+ t.Error("Expected no error stopping upstream. Got: ", err.Error())
+ }
+ }
+
+ counterValueAfterShutdown := atomic.LoadInt64(&counter)
+
+ // Give some time to see if healthchecks are still hitting the server.
+ time.Sleep(time.Duration(test.intervalInMilliseconds*test.numHealthcheckIntervals) * time.Millisecond)
+
+ if counterValueAfterShutdown == 0 {
+ t.Error("Expected healthchecks to hit test server. Got no healthchecks.")
+ }
+
+ counterValueAfterWaiting := atomic.LoadInt64(&counter)
+ if counterValueAfterWaiting != counterValueAfterShutdown {
+ t.Errorf("Expected no more healthchecks after shutdown. Got: %d healthchecks after shutdown", counterValueAfterWaiting-counterValueAfterShutdown)
+ }
+
+ })
+
+ }
+}
diff --git a/middleware/proxy/setup.go b/middleware/proxy/setup.go
index 36401188f..de979a5df 100644
--- a/middleware/proxy/setup.go
+++ b/middleware/proxy/setup.go
@@ -37,6 +37,8 @@ func setup(c *caddy.Controller) error {
c.OnShutdown(func() error {
return u.Exchanger().OnShutdown(P)
})
+ // Register shutdown handlers.
+ c.OnShutdown(u.Stop)
}
return nil
diff --git a/middleware/proxy/upstream.go b/middleware/proxy/upstream.go
index 278b08d0f..8bc3e5306 100644
--- a/middleware/proxy/upstream.go
+++ b/middleware/proxy/upstream.go
@@ -10,6 +10,7 @@ import (
"net/url"
"strconv"
"strings"
+ "sync"
"sync/atomic"
"time"
@@ -25,7 +26,10 @@ var (
)
type staticUpstream struct {
- from string
+ from string
+ stop chan struct{} // Signals running goroutines to stop.
+ wg sync.WaitGroup // Used to wait for running goroutines to stop.
+
Hosts HostPool
Policy Policy
Spray Policy
@@ -49,6 +53,7 @@ func NewStaticUpstreams(c *caddyfile.Dispenser) ([]Upstream, error) {
for c.Next() {
upstream := &staticUpstream{
from: ".",
+ stop: make(chan struct{}),
Hosts: nil,
Policy: &Random{},
Spray: nil,
@@ -108,13 +113,25 @@ func NewStaticUpstreams(c *caddyfile.Dispenser) ([]Upstream, error) {
}
if upstream.HealthCheck.Path != "" {
- go upstream.HealthCheckWorker(nil)
+ upstream.wg.Add(1)
+ go func() {
+ defer upstream.wg.Done()
+ upstream.HealthCheckWorker(upstream.stop)
+ }()
}
upstreams = append(upstreams, upstream)
}
return upstreams, nil
}
+// Stop sends a signal to all goroutines started by this staticUpstream to exit
+// and waits for them to finish before returning.
+func (u *staticUpstream) Stop() error {
+ close(u.stop)
+ u.wg.Wait()
+ return nil
+}
+
// RegisterPolicy adds a custom policy to the proxy.
func RegisterPolicy(name string, policy func() Policy) {
supportedPolicies[name] = policy
@@ -281,9 +298,8 @@ func (u *staticUpstream) HealthCheckWorker(stop chan struct{}) {
case <-ticker.C:
u.healthCheck()
case <-stop:
- // TODO: the library should provide a stop channel and global
- // waitgroup to allow goroutines started by plugins a chance
- // to clean themselves up.
+ ticker.Stop()
+ return
}
}
}