diff options
Diffstat (limited to 'middleware/kubernetes/kubernetes.go')
-rw-r--r-- | middleware/kubernetes/kubernetes.go | 63 |
1 files changed, 60 insertions, 3 deletions
diff --git a/middleware/kubernetes/kubernetes.go b/middleware/kubernetes/kubernetes.go index 209e09eb4..87c9fd4a7 100644 --- a/middleware/kubernetes/kubernetes.go +++ b/middleware/kubernetes/kubernetes.go @@ -7,11 +7,13 @@ import ( "log" "net" "strings" + "sync/atomic" "time" "github.com/coredns/coredns/middleware" "github.com/coredns/coredns/middleware/etcd/msg" "github.com/coredns/coredns/middleware/pkg/dnsutil" + "github.com/coredns/coredns/middleware/pkg/healthcheck" dnsstrings "github.com/coredns/coredns/middleware/pkg/strings" "github.com/coredns/coredns/middleware/proxy" "github.com/coredns/coredns/request" @@ -32,7 +34,8 @@ type Kubernetes struct { Zones []string primaryZone int Proxy proxy.Proxy // Proxy for looking up names during the resolution process - APIEndpoint string + APIServerList []string + APIProxy *apiProxy APICertAuth string APIClientCert string APIClientKey string @@ -173,8 +176,62 @@ func (k *Kubernetes) getClientConfig() (*rest.Config, error) { overrides := &clientcmd.ConfigOverrides{} clusterinfo := clientcmdapi.Cluster{} authinfo := clientcmdapi.AuthInfo{} - if len(k.APIEndpoint) > 0 { - clusterinfo.Server = k.APIEndpoint + if len(k.APIServerList) > 0 { + endpoint := k.APIServerList[0] + if len(k.APIServerList) > 1 { + // Use a random port for api proxy, will get the value later through listener.Addr() + listener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + return nil, fmt.Errorf("failed to create kubernetes api proxy: %v", err) + } + k.APIProxy = &apiProxy{ + listener: listener, + handler: proxyHandler{ + HealthCheck: healthcheck.HealthCheck{ + FailTimeout: 3 * time.Second, + MaxFails: 1, + Future: 10 * time.Second, + Path: "/", + Interval: 5 * time.Second, + }, + }, + } + k.APIProxy.handler.Hosts = make([]*healthcheck.UpstreamHost, len(k.APIServerList)) + for i, entry := range k.APIServerList { + + uh := &healthcheck.UpstreamHost{ + Name: strings.TrimPrefix(entry, "http://"), + + CheckDown: func(upstream *proxyHandler) healthcheck.UpstreamHostDownFunc { + return func(uh *healthcheck.UpstreamHost) bool { + + down := false + + uh.CheckMu.Lock() + until := uh.OkUntil + uh.CheckMu.Unlock() + + if !until.IsZero() && time.Now().After(until) { + down = true + } + + fails := atomic.LoadInt32(&uh.Fails) + if fails >= upstream.MaxFails && upstream.MaxFails != 0 { + down = true + } + return down + } + }(&k.APIProxy.handler), + } + + k.APIProxy.handler.Hosts[i] = uh + } + k.APIProxy.Handler = &k.APIProxy.handler + + // Find the random port used for api proxy + endpoint = fmt.Sprintf("http://%s", listener.Addr()) + } + clusterinfo.Server = endpoint } else { cc, err := rest.InClusterConfig() if err != nil { |