aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--middleware/kubernetes/README.md3
-rw-r--r--middleware/kubernetes/apiproxy.go76
-rw-r--r--middleware/kubernetes/kubernetes.go63
-rw-r--r--middleware/kubernetes/setup.go10
-rw-r--r--middleware/pkg/healthcheck/healthcheck.go1
-rw-r--r--test/kubernetes_test.go23
6 files changed, 171 insertions, 5 deletions
diff --git a/middleware/kubernetes/README.md b/middleware/kubernetes/README.md
index 4872b8fee..14e63c163 100644
--- a/middleware/kubernetes/README.md
+++ b/middleware/kubernetes/README.md
@@ -27,6 +27,9 @@ kubernetes ZONE [ZONE...] [
* `resyncperiod` specifies the Kubernetes data API **DURATION** period.
* `endpoint` specifies the **URL** for a remove k8s API endpoint.
If omitted, it will connect to k8s in-cluster using the cluster service account.
+ Multiple k8s API endpoints could be specified, separated by `,`s, e.g.
+ `endpoint http://k8s-endpoint1:8080,http://k8s-endpoint2:8080`. CoreDNS
+ will automatically perform a healthcheck and proxy to the healthy k8s API endpoint.
* `tls` **CERT** **KEY** **CACERT** are the TLS cert, key and the CA cert file names for remote k8s connection.
This option is ignored if connecting in-cluster (i.e. endpoint is not specified).
* `namespaces` **NAMESPACE [NAMESPACE...]**, exposed only the k8s namespaces listed.
diff --git a/middleware/kubernetes/apiproxy.go b/middleware/kubernetes/apiproxy.go
new file mode 100644
index 000000000..966e5753b
--- /dev/null
+++ b/middleware/kubernetes/apiproxy.go
@@ -0,0 +1,76 @@
+package kubernetes
+
+import (
+ "fmt"
+ "io"
+ "log"
+ "net"
+ "net/http"
+
+ "github.com/coredns/coredns/middleware/pkg/healthcheck"
+)
+
+type proxyHandler struct {
+ healthcheck.HealthCheck
+}
+
+type apiProxy struct {
+ http.Server
+ listener net.Listener
+ handler proxyHandler
+}
+
+func (p *proxyHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ upstream := p.Select()
+ network := "tcp"
+ if upstream.Network != "" {
+ network = upstream.Network
+ }
+ address := upstream.Name
+ d, err := net.Dial(network, address)
+ if err != nil {
+ log.Printf("[ERROR] Unable to establish connection to upstream %s://%s: %s", network, address, err)
+ http.Error(w, fmt.Sprintf("Unable to establish connection to upstream %s://%s: %s", network, address, err), 500)
+ return
+ }
+ hj, ok := w.(http.Hijacker)
+ if !ok {
+ log.Printf("[ERROR] Unable to establish connection: no hijacker")
+ http.Error(w, "Unable to establish connection: no hijacker", 500)
+ return
+ }
+ nc, _, err := hj.Hijack()
+ if err != nil {
+ log.Printf("[ERROR] Unable to hijack connection: %s", err)
+ http.Error(w, fmt.Sprintf("Unable to hijack connection: %s", err), 500)
+ return
+ }
+ defer nc.Close()
+ defer d.Close()
+
+ err = r.Write(d)
+ if err != nil {
+ log.Printf("[ERROR] Unable to copy connection to upstream %s://%s: %s", network, address, err)
+ http.Error(w, fmt.Sprintf("Unable to copy connection to upstream %s://%s: %s", network, address, err), 500)
+ return
+ }
+
+ errChan := make(chan error, 2)
+ cp := func(dst io.Writer, src io.Reader) {
+ _, err := io.Copy(dst, src)
+ errChan <- err
+ }
+ go cp(d, nc)
+ go cp(nc, d)
+ <-errChan
+}
+
+func (p *apiProxy) Run() {
+ p.handler.Start()
+ p.Serve(p.listener)
+}
+
+func (p *apiProxy) Stop() {
+ p.handler.Stop()
+ p.Close()
+}
diff --git a/middleware/kubernetes/kubernetes.go b/middleware/kubernetes/kubernetes.go
index 209e09eb4..87c9fd4a7 100644
--- a/middleware/kubernetes/kubernetes.go
+++ b/middleware/kubernetes/kubernetes.go
@@ -7,11 +7,13 @@ import (
"log"
"net"
"strings"
+ "sync/atomic"
"time"
"github.com/coredns/coredns/middleware"
"github.com/coredns/coredns/middleware/etcd/msg"
"github.com/coredns/coredns/middleware/pkg/dnsutil"
+ "github.com/coredns/coredns/middleware/pkg/healthcheck"
dnsstrings "github.com/coredns/coredns/middleware/pkg/strings"
"github.com/coredns/coredns/middleware/proxy"
"github.com/coredns/coredns/request"
@@ -32,7 +34,8 @@ type Kubernetes struct {
Zones []string
primaryZone int
Proxy proxy.Proxy // Proxy for looking up names during the resolution process
- APIEndpoint string
+ APIServerList []string
+ APIProxy *apiProxy
APICertAuth string
APIClientCert string
APIClientKey string
@@ -173,8 +176,62 @@ func (k *Kubernetes) getClientConfig() (*rest.Config, error) {
overrides := &clientcmd.ConfigOverrides{}
clusterinfo := clientcmdapi.Cluster{}
authinfo := clientcmdapi.AuthInfo{}
- if len(k.APIEndpoint) > 0 {
- clusterinfo.Server = k.APIEndpoint
+ if len(k.APIServerList) > 0 {
+ endpoint := k.APIServerList[0]
+ if len(k.APIServerList) > 1 {
+ // Use a random port for api proxy, will get the value later through listener.Addr()
+ listener, err := net.Listen("tcp", "127.0.0.1:0")
+ if err != nil {
+ return nil, fmt.Errorf("failed to create kubernetes api proxy: %v", err)
+ }
+ k.APIProxy = &apiProxy{
+ listener: listener,
+ handler: proxyHandler{
+ HealthCheck: healthcheck.HealthCheck{
+ FailTimeout: 3 * time.Second,
+ MaxFails: 1,
+ Future: 10 * time.Second,
+ Path: "/",
+ Interval: 5 * time.Second,
+ },
+ },
+ }
+ k.APIProxy.handler.Hosts = make([]*healthcheck.UpstreamHost, len(k.APIServerList))
+ for i, entry := range k.APIServerList {
+
+ uh := &healthcheck.UpstreamHost{
+ Name: strings.TrimPrefix(entry, "http://"),
+
+ CheckDown: func(upstream *proxyHandler) healthcheck.UpstreamHostDownFunc {
+ return func(uh *healthcheck.UpstreamHost) bool {
+
+ down := false
+
+ uh.CheckMu.Lock()
+ until := uh.OkUntil
+ uh.CheckMu.Unlock()
+
+ if !until.IsZero() && time.Now().After(until) {
+ down = true
+ }
+
+ fails := atomic.LoadInt32(&uh.Fails)
+ if fails >= upstream.MaxFails && upstream.MaxFails != 0 {
+ down = true
+ }
+ return down
+ }
+ }(&k.APIProxy.handler),
+ }
+
+ k.APIProxy.handler.Hosts[i] = uh
+ }
+ k.APIProxy.Handler = &k.APIProxy.handler
+
+ // Find the random port used for api proxy
+ endpoint = fmt.Sprintf("http://%s", listener.Addr())
+ }
+ clusterinfo.Server = endpoint
} else {
cc, err := rest.InClusterConfig()
if err != nil {
diff --git a/middleware/kubernetes/setup.go b/middleware/kubernetes/setup.go
index 6b8cbba6e..130dca083 100644
--- a/middleware/kubernetes/setup.go
+++ b/middleware/kubernetes/setup.go
@@ -39,10 +39,16 @@ func setup(c *caddy.Controller) error {
// Register KubeCache start and stop functions with Caddy
c.OnStartup(func() error {
go kubernetes.APIConn.Run()
+ if kubernetes.APIProxy != nil {
+ go kubernetes.APIProxy.Run()
+ }
return nil
})
c.OnShutdown(func() error {
+ if kubernetes.APIProxy != nil {
+ kubernetes.APIProxy.Stop()
+ }
return kubernetes.APIConn.Stop()
})
@@ -140,7 +146,9 @@ func kubernetesParse(c *caddy.Controller) (*Kubernetes, error) {
case "endpoint":
args := c.RemainingArgs()
if len(args) > 0 {
- k8s.APIEndpoint = args[0]
+ for _, endpoint := range strings.Split(args[0], ",") {
+ k8s.APIServerList = append(k8s.APIServerList, strings.TrimSpace(endpoint))
+ }
continue
}
return nil, c.ArgErr()
diff --git a/middleware/pkg/healthcheck/healthcheck.go b/middleware/pkg/healthcheck/healthcheck.go
index e0152a47b..fc9f698e6 100644
--- a/middleware/pkg/healthcheck/healthcheck.go
+++ b/middleware/pkg/healthcheck/healthcheck.go
@@ -19,6 +19,7 @@ type UpstreamHostDownFunc func(*UpstreamHost) bool
type UpstreamHost struct {
Conns int64 // must be first field to be 64-bit aligned on 32-bit systems
Name string // IP address (and port) of this upstream host
+ Network string // Network (tcp, unix, etc) of the host, default "" is "tcp"
Fails int32
FailTimeout time.Duration
OkUntil time.Time
diff --git a/test/kubernetes_test.go b/test/kubernetes_test.go
index 1d00be3b8..d99ca1618 100644
--- a/test/kubernetes_test.go
+++ b/test/kubernetes_test.go
@@ -450,7 +450,7 @@ func doIntegrationTests(t *testing.T, corefile string, testCases []test.Case) {
// Work-around for timing condition that results in no-data being returned in
// test environment.
- time.Sleep(1 * time.Second)
+ time.Sleep(3 * time.Second)
for _, tc := range testCases {
@@ -513,6 +513,27 @@ func TestKubernetesIntegration(t *testing.T) {
doIntegrationTests(t, corefile, dnsTestCases)
}
+func TestKubernetesIntegrationAPIProxy(t *testing.T) {
+
+ removeUpstreamConfig, upstreamServer, udp := createUpstreamServer(t)
+ defer upstreamServer.Stop()
+ defer removeUpstreamConfig()
+
+ corefile :=
+ `.:0 {
+ kubernetes cluster.local 0.0.10.in-addr.arpa {
+ endpoint http://nonexistance:8080,http://invalidip:8080,http://localhost:8080
+ namespaces test-1
+ pods disabled
+ upstream ` + udp + `
+ }
+ erratic . {
+ drop 0
+ }
+`
+ doIntegrationTests(t, corefile, dnsTestCases)
+}
+
func TestKubernetesIntegrationPodsInsecure(t *testing.T) {
corefile :=
`.:0 {