aboutsummaryrefslogtreecommitdiff
path: root/middleware/kubernetes/kubernetes.go
diff options
context:
space:
mode:
Diffstat (limited to 'middleware/kubernetes/kubernetes.go')
-rw-r--r--middleware/kubernetes/kubernetes.go63
1 files changed, 60 insertions, 3 deletions
diff --git a/middleware/kubernetes/kubernetes.go b/middleware/kubernetes/kubernetes.go
index 209e09eb4..87c9fd4a7 100644
--- a/middleware/kubernetes/kubernetes.go
+++ b/middleware/kubernetes/kubernetes.go
@@ -7,11 +7,13 @@ import (
"log"
"net"
"strings"
+ "sync/atomic"
"time"
"github.com/coredns/coredns/middleware"
"github.com/coredns/coredns/middleware/etcd/msg"
"github.com/coredns/coredns/middleware/pkg/dnsutil"
+ "github.com/coredns/coredns/middleware/pkg/healthcheck"
dnsstrings "github.com/coredns/coredns/middleware/pkg/strings"
"github.com/coredns/coredns/middleware/proxy"
"github.com/coredns/coredns/request"
@@ -32,7 +34,8 @@ type Kubernetes struct {
Zones []string
primaryZone int
Proxy proxy.Proxy // Proxy for looking up names during the resolution process
- APIEndpoint string
+ APIServerList []string
+ APIProxy *apiProxy
APICertAuth string
APIClientCert string
APIClientKey string
@@ -173,8 +176,62 @@ func (k *Kubernetes) getClientConfig() (*rest.Config, error) {
overrides := &clientcmd.ConfigOverrides{}
clusterinfo := clientcmdapi.Cluster{}
authinfo := clientcmdapi.AuthInfo{}
- if len(k.APIEndpoint) > 0 {
- clusterinfo.Server = k.APIEndpoint
+ if len(k.APIServerList) > 0 {
+ endpoint := k.APIServerList[0]
+ if len(k.APIServerList) > 1 {
+ // Use a random port for api proxy, will get the value later through listener.Addr()
+ listener, err := net.Listen("tcp", "127.0.0.1:0")
+ if err != nil {
+ return nil, fmt.Errorf("failed to create kubernetes api proxy: %v", err)
+ }
+ k.APIProxy = &apiProxy{
+ listener: listener,
+ handler: proxyHandler{
+ HealthCheck: healthcheck.HealthCheck{
+ FailTimeout: 3 * time.Second,
+ MaxFails: 1,
+ Future: 10 * time.Second,
+ Path: "/",
+ Interval: 5 * time.Second,
+ },
+ },
+ }
+ k.APIProxy.handler.Hosts = make([]*healthcheck.UpstreamHost, len(k.APIServerList))
+ for i, entry := range k.APIServerList {
+
+ uh := &healthcheck.UpstreamHost{
+ Name: strings.TrimPrefix(entry, "http://"),
+
+ CheckDown: func(upstream *proxyHandler) healthcheck.UpstreamHostDownFunc {
+ return func(uh *healthcheck.UpstreamHost) bool {
+
+ down := false
+
+ uh.CheckMu.Lock()
+ until := uh.OkUntil
+ uh.CheckMu.Unlock()
+
+ if !until.IsZero() && time.Now().After(until) {
+ down = true
+ }
+
+ fails := atomic.LoadInt32(&uh.Fails)
+ if fails >= upstream.MaxFails && upstream.MaxFails != 0 {
+ down = true
+ }
+ return down
+ }
+ }(&k.APIProxy.handler),
+ }
+
+ k.APIProxy.handler.Hosts[i] = uh
+ }
+ k.APIProxy.Handler = &k.APIProxy.handler
+
+ // Find the random port used for api proxy
+ endpoint = fmt.Sprintf("http://%s", listener.Addr())
+ }
+ clusterinfo.Server = endpoint
} else {
cc, err := rest.InClusterConfig()
if err != nil {