aboutsummaryrefslogtreecommitdiff
path: root/plugin/kubernetes
diff options
context:
space:
mode:
authorGravatar Miek Gieben <miek@miek.nl> 2017-09-14 09:36:06 +0100
committerGravatar GitHub <noreply@github.com> 2017-09-14 09:36:06 +0100
commitd8714e64e400ef873c2adc4d929a07d7890727b9 (patch)
treec9fa4c157e6af12eb1517654f8d23ca5d5619513 /plugin/kubernetes
parentb984aa45595dc95253b91191afe7d3ee29e71b48 (diff)
downloadcoredns-d8714e64e400ef873c2adc4d929a07d7890727b9.tar.gz
coredns-d8714e64e400ef873c2adc4d929a07d7890727b9.tar.zst
coredns-d8714e64e400ef873c2adc4d929a07d7890727b9.zip
Remove the word middleware (#1067)
* Rename middleware to plugin first pass; mostly used 'sed', few spots where I manually changed text. This still builds a coredns binary. * fmt error * Rename AddMiddleware to AddPlugin * Readd AddMiddleware to remain backwards compat
Diffstat (limited to 'plugin/kubernetes')
-rw-r--r--plugin/kubernetes/DEV-README.md43
-rw-r--r--plugin/kubernetes/README.md167
-rw-r--r--plugin/kubernetes/apiproxy.go76
-rw-r--r--plugin/kubernetes/autopath.go53
-rw-r--r--plugin/kubernetes/controller.go399
-rw-r--r--plugin/kubernetes/federation.go45
-rw-r--r--plugin/kubernetes/handler.go86
-rw-r--r--plugin/kubernetes/handler_pod_disabled_test.go61
-rw-r--r--plugin/kubernetes/handler_pod_insecure_test.go59
-rw-r--r--plugin/kubernetes/handler_pod_verified_test.go59
-rw-r--r--plugin/kubernetes/handler_test.go347
-rw-r--r--plugin/kubernetes/kubernetes.go457
-rw-r--r--plugin/kubernetes/kubernetes_apex_test.go68
-rw-r--r--plugin/kubernetes/kubernetes_test.go242
-rw-r--r--plugin/kubernetes/local.go40
-rw-r--r--plugin/kubernetes/ns.go65
-rw-r--r--plugin/kubernetes/ns_test.go69
-rw-r--r--plugin/kubernetes/parse.go112
-rw-r--r--plugin/kubernetes/parse_test.go56
-rw-r--r--plugin/kubernetes/reverse.go55
-rw-r--r--plugin/kubernetes/reverse_test.go125
-rw-r--r--plugin/kubernetes/setup.go208
-rw-r--r--plugin/kubernetes/setup_reverse_test.go35
-rw-r--r--plugin/kubernetes/setup_test.go473
-rw-r--r--plugin/kubernetes/setup_ttl_test.go45
25 files changed, 3445 insertions, 0 deletions
diff --git a/plugin/kubernetes/DEV-README.md b/plugin/kubernetes/DEV-README.md
new file mode 100644
index 000000000..4f652b578
--- /dev/null
+++ b/plugin/kubernetes/DEV-README.md
@@ -0,0 +1,43 @@
+# Basic Setup for Development and Testing
+
+## Launch Kubernetes
+
+To run the tests, you'll need a private, live Kubernetes cluster. If you don't have one,
+you can try out [minikube](https://github.com/kubernetes/minikube), which is
+also available via Homebrew for OS X users.
+
+## Configure Test Data
+
+The test data is all in [this manifest](https://github.com/coredns/coredns/blob/master/.travis/kubernetes/dns-test.yaml)
+and you can load it with `kubectl apply -f`. It will create a couple namespaces and some services.
+For the tests to pass, you should not create anything else in the cluster.
+
+## Proxy the API Server
+
+Assuming your Kuberentes API server isn't running on http://localhost:8080, you will need to proxy from that
+port to your cluster. You can do this with `kubectl proxy --port 8080`.
+
+## Run CoreDNS Kubernetes Tests
+
+Now you can run the tests locally, for example:
+
+~~~
+$ cd $GOPATH/src/github.com/coredns/coredns/test
+$ go test -v -tags k8s
+~~~
+
+# Implementation Notes/Ideas
+
+* Additional features:
+ * Implement IP selection and ordering (internal/external). Related to
+ wildcards and SkyDNS use of CNAMES.
+ * Expose arbitrary kubernetes repository data as TXT records?
+* DNS Correctness
+ * Do we need to generate synthetic zone records for namespaces?
+ * Do we need to generate synthetic zone records for the skydns synthetic zones?
+* Test cases
+ * Test with CoreDNS caching. CoreDNS caching for DNS response is working
+ using the `cache` directive. Tested working using 20s cache timeout
+ and A-record queries. Automate testing with cache in place.
+ * Automate CoreDNS performance tests. Initially for zone files, and for
+ pre-loaded k8s API cache. With and without CoreDNS response caching.
diff --git a/plugin/kubernetes/README.md b/plugin/kubernetes/README.md
new file mode 100644
index 000000000..387f1cf75
--- /dev/null
+++ b/plugin/kubernetes/README.md
@@ -0,0 +1,167 @@
+# kubernetes
+
+The *kubernetes* plugin enables the reading zone data from a Kubernetes cluster. It implements
+the [Kubernetes DNS-Based Service Discovery
+Specification](https://github.com/kubernetes/dns/blob/master/docs/specification.md).
+
+CoreDNS running the kubernetes plugin can be used as a replacement of kube-dns in a kubernetes
+cluster. See the [deployment](https://github.com/coredns/deployment) repository for details on [how
+to deploy CoreDNS in Kubernetes](https://github.com/coredns/deployment/tree/master/kubernetes).
+
+[stubDomains](http://blog.kubernetes.io/2017/04/configuring-private-dns-zones-upstream-nameservers-kubernetes.html)
+are implemented via the *proxy* plugin.
+
+## Syntax
+
+~~~
+kubernetes [ZONES...]
+~~~
+
+With only the directive specified, the *kubernetes* plugin will default to the zone specified in
+the server's block. It will handle all queries in that zone and connect to Kubernetes in-cluster. It
+will not provide PTR records for services, or A records for pods. If **ZONES** is used it specifies
+all the zones the plugin should be authoritative for.
+
+```
+kubernetes [ZONES...] {
+ resyncperiod DURATION
+ endpoint URL
+ tls CERT KEY CACERT
+ namespaces NAMESPACE...
+ labels EXPRESSION
+ pods POD-MODE
+ upstream ADDRESS...
+ ttl TTL
+ fallthrough
+}
+```
+* `resyncperiod` specifies the Kubernetes data API **DURATION** period.
+* `endpoint` specifies the **URL** for a remove k8s API endpoint.
+ If omitted, it will connect to k8s in-cluster using the cluster service account.
+ Multiple k8s API endpoints could be specified, separated by `,`s, e.g.
+ `endpoint http://k8s-endpoint1:8080,http://k8s-endpoint2:8080`. CoreDNS
+ will automatically perform a healthcheck and proxy to the healthy k8s API endpoint.
+* `tls` **CERT** **KEY** **CACERT** are the TLS cert, key and the CA cert file names for remote k8s connection.
+ This option is ignored if connecting in-cluster (i.e. endpoint is not specified).
+* `namespaces` **NAMESPACE [NAMESPACE...]**, exposed only the k8s namespaces listed.
+ If this option is omitted all namespaces are exposed
+* `labels` **EXPRESSION** only exposes the records for Kubernetes objects that match this label selector.
+ The label selector syntax is described in the
+ [Kubernetes User Guide - Labels](http://kubernetes.io/docs/user-guide/labels/). An example that
+ only exposes objects labeled as "application=nginx" in the "staging" or "qa" environments, would
+ use: `labels environment in (staging, qa),application=nginx`.
+* `pods` **POD-MODE** sets the mode for handling IP-based pod A records, e.g.
+ `1-2-3-4.ns.pod.cluster.local. in A 1.2.3.4`.
+ This option is provided to facilitate use of SSL certs when connecting directly to pods. Valid
+ values for **POD-MODE**:
+
+ * `disabled`: Default. Do not process pod requests, always returning `NXDOMAIN`
+ * `insecure`: Always return an A record with IP from request (without checking k8s). This option
+ is is vulnerable to abuse if used maliciously in conjunction with wildcard SSL certs. This
+ option is provided for backward compatibility with kube-dns.
+ * `verified`: Return an A record if there exists a pod in same namespace with matching IP. This
+ option requires substantially more memory than in insecure mode, since it will maintain a watch
+ on all pods.
+
+* `upstream` **ADDRESS [ADDRESS...]** defines the upstream resolvers used for resolving services
+ that point to external hosts (External Services). **ADDRESS** can be an ip, an ip:port, or a path
+ to a file structured like resolv.conf.
+* `ttl` allows you to set a custom TTL for responses. The default (and allowed minimum) is to use
+ 5 seconds, the maximum is capped at 3600 seconds.
+* `fallthrough` If a query for a record in the cluster zone results in NXDOMAIN, normally that is
+ what the response will be. However, if you specify this option, the query will instead be passed
+ on down the plugin chain, which can include another plugin to handle the query.
+
+## Examples
+
+Handle all queries in the `cluster.local` zone. Connect to Kubernetes in-cluster.
+Also handle all `PTR` requests for `10.0.0.0/16` . Verify the existence of pods when answering pod
+requests. Resolve upstream records against `10.102.3.10`. Note we show the entire server block
+here:
+
+~~~ txt
+10.0.0.0/16 cluster.local {
+ kubernetes {
+ pods verified
+ upstream 10.102.3.10:53
+ }
+}
+~~~
+
+Or you can selectively expose some namespaces:
+
+~~~ txt
+kubernetes cluster.local {
+ namespaces test staging
+}
+~~~
+
+Connect to Kubernetes with CoreDNS running outside the cluster:
+
+~~~ txt
+kubernetes cluster.local {
+ endpoint https://k8s-endpoint:8443
+ tls cert key cacert
+}
+~~~
+
+Here we use the *proxy* plugin to implement stubDomains that forwards `example.org` and
+`example.com` to another nameserver.
+
+~~~ txt
+cluster.local {
+ kubernetes {
+ endpoint https://k8s-endpoint:8443
+ tls cert key cacert
+ }
+}
+example.org {
+ proxy . 8.8.8.8:53
+}
+example.com {
+ proxy . 8.8.8.8:53
+}
+~~~
+
+## AutoPath
+
+The *kubernetes* plugin can be used in conjunction with the *autopath* plugin. Using this
+feature enables server-side domain search path completion in kubernetes clusters. Note: `pods` must
+be set to `verified` for this to function properly.
+
+ cluster.local {
+ autopath @kubernetes
+ kubernetes {
+ pods verified
+ }
+ }
+
+## Federation
+
+The *kubernetes* plugin can be used in conjunction with the *federation* plugin. Using this
+feature enables serving federated domains from the kubernetes clusters.
+
+ cluster.local {
+ federation {
+ fallthrough
+ prod prod.example.org
+ staging staging.example.org
+
+ }
+ kubernetes
+ }
+
+
+## Wildcards
+
+Some query labels accept a wildcard value to match any value. If a label is a valid wildcard (\*,
+or the word "any"), then that label will match all values. The labels that accept wildcards are:
+
+ * _service_ in an `A` record request: _service_.namespace.svc.zone.
+ * e.g. `*.ns.svc.myzone.local`
+ * _namespace_ in an `A` record request: service._namespace_.svc.zone.
+ * e.g. `nginx.*.svc.myzone.local`
+ * _port and/or protocol_ in an `SRV` request: __port_.__protocol_.service.namespace.svc.zone.
+ * e.g. `_http.*.service.ns.svc.`
+ * multiple wild cards are allowed in a single query.
+ * e.g. `A` Request `*.*.svc.zone.` or `SRV` request `*.*.*.*.svc.zone.`
diff --git a/plugin/kubernetes/apiproxy.go b/plugin/kubernetes/apiproxy.go
new file mode 100644
index 000000000..3e185f898
--- /dev/null
+++ b/plugin/kubernetes/apiproxy.go
@@ -0,0 +1,76 @@
+package kubernetes
+
+import (
+ "fmt"
+ "io"
+ "log"
+ "net"
+ "net/http"
+
+ "github.com/coredns/coredns/plugin/pkg/healthcheck"
+)
+
+type proxyHandler struct {
+ healthcheck.HealthCheck
+}
+
+type apiProxy struct {
+ http.Server
+ listener net.Listener
+ handler proxyHandler
+}
+
+func (p *proxyHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
+ upstream := p.Select()
+ network := "tcp"
+ if upstream.Network != "" {
+ network = upstream.Network
+ }
+ address := upstream.Name
+ d, err := net.Dial(network, address)
+ if err != nil {
+ log.Printf("[ERROR] Unable to establish connection to upstream %s://%s: %s", network, address, err)
+ http.Error(w, fmt.Sprintf("Unable to establish connection to upstream %s://%s: %s", network, address, err), 500)
+ return
+ }
+ hj, ok := w.(http.Hijacker)
+ if !ok {
+ log.Printf("[ERROR] Unable to establish connection: no hijacker")
+ http.Error(w, "Unable to establish connection: no hijacker", 500)
+ return
+ }
+ nc, _, err := hj.Hijack()
+ if err != nil {
+ log.Printf("[ERROR] Unable to hijack connection: %s", err)
+ http.Error(w, fmt.Sprintf("Unable to hijack connection: %s", err), 500)
+ return
+ }
+ defer nc.Close()
+ defer d.Close()
+
+ err = r.Write(d)
+ if err != nil {
+ log.Printf("[ERROR] Unable to copy connection to upstream %s://%s: %s", network, address, err)
+ http.Error(w, fmt.Sprintf("Unable to copy connection to upstream %s://%s: %s", network, address, err), 500)
+ return
+ }
+
+ errChan := make(chan error, 2)
+ cp := func(dst io.Writer, src io.Reader) {
+ _, err := io.Copy(dst, src)
+ errChan <- err
+ }
+ go cp(d, nc)
+ go cp(nc, d)
+ <-errChan
+}
+
+func (p *apiProxy) Run() {
+ p.handler.Start()
+ p.Serve(p.listener)
+}
+
+func (p *apiProxy) Stop() {
+ p.handler.Stop()
+ p.listener.Close()
+}
diff --git a/plugin/kubernetes/autopath.go b/plugin/kubernetes/autopath.go
new file mode 100644
index 000000000..f758869f1
--- /dev/null
+++ b/plugin/kubernetes/autopath.go
@@ -0,0 +1,53 @@
+package kubernetes
+
+import (
+ "github.com/coredns/coredns/plugin"
+ "github.com/coredns/coredns/request"
+
+ "k8s.io/client-go/1.5/pkg/api"
+)
+
+// AutoPath implements the AutoPathFunc call from the autopath plugin.
+// It returns a per-query search path or nil indicating no searchpathing should happen.
+func (k *Kubernetes) AutoPath(state request.Request) []string {
+ // Check if the query falls in a zone we are actually authoriative for and thus if we want autopath.
+ zone := plugin.Zones(k.Zones).Matches(state.Name())
+ if zone == "" {
+ return nil
+ }
+
+ ip := state.IP()
+
+ pod := k.podWithIP(ip)
+ if pod == nil {
+ return nil
+ }
+
+ search := make([]string, 3)
+ if zone == "." {
+ search[0] = pod.Namespace + ".svc."
+ search[1] = "svc."
+ search[2] = "."
+ } else {
+ search[0] = pod.Namespace + ".svc." + zone
+ search[1] = "svc." + zone
+ search[2] = zone
+ }
+
+ search = append(search, k.autoPathSearch...)
+ search = append(search, "") // sentinal
+ return search
+}
+
+// podWithIP return the api.Pod for source IP ip. It returns nil if nothing can be found.
+func (k *Kubernetes) podWithIP(ip string) (p *api.Pod) {
+ objList := k.APIConn.PodIndex(ip)
+ for _, o := range objList {
+ p, ok := o.(*api.Pod)
+ if !ok {
+ return nil
+ }
+ return p
+ }
+ return nil
+}
diff --git a/plugin/kubernetes/controller.go b/plugin/kubernetes/controller.go
new file mode 100644
index 000000000..b809264e1
--- /dev/null
+++ b/plugin/kubernetes/controller.go
@@ -0,0 +1,399 @@
+package kubernetes
+
+import (
+ "errors"
+ "fmt"
+ "log"
+ "sync"
+ "time"
+
+ "k8s.io/client-go/1.5/kubernetes"
+ "k8s.io/client-go/1.5/pkg/api"
+ unversionedapi "k8s.io/client-go/1.5/pkg/api/unversioned"
+ "k8s.io/client-go/1.5/pkg/api/v1"
+ "k8s.io/client-go/1.5/pkg/labels"
+ "k8s.io/client-go/1.5/pkg/runtime"
+ "k8s.io/client-go/1.5/pkg/watch"
+ "k8s.io/client-go/1.5/tools/cache"
+)
+
+var (
+ namespace = api.NamespaceAll
+)
+
+// storeToNamespaceLister makes a Store that lists Namespaces.
+type storeToNamespaceLister struct {
+ cache.Store
+}
+
+const podIPIndex = "PodIP"
+
+// List lists all Namespaces in the store.
+func (s *storeToNamespaceLister) List() (ns api.NamespaceList, err error) {
+ for _, m := range s.Store.List() {
+ ns.Items = append(ns.Items, *(m.(*api.Namespace)))
+ }
+ return ns, nil
+}
+
+type dnsController interface {
+ ServiceList() []*api.Service
+ PodIndex(string) []interface{}
+ EndpointsList() api.EndpointsList
+
+ GetNodeByName(string) (api.Node, error)
+
+ Run()
+ Stop() error
+}
+
+type dnsControl struct {
+ client *kubernetes.Clientset
+
+ selector *labels.Selector
+
+ svcController *cache.Controller
+ podController *cache.Controller
+ nsController *cache.Controller
+ epController *cache.Controller
+
+ svcLister cache.StoreToServiceLister
+ podLister cache.StoreToPodLister
+ nsLister storeToNamespaceLister
+ epLister cache.StoreToEndpointsLister
+
+ // stopLock is used to enforce only a single call to Stop is active.
+ // Needed because we allow stopping through an http endpoint and
+ // allowing concurrent stoppers leads to stack traces.
+ stopLock sync.Mutex
+ shutdown bool
+ stopCh chan struct{}
+}
+
+type dnsControlOpts struct {
+ initPodCache bool
+ resyncPeriod time.Duration
+ // Label handling.
+ labelSelector *unversionedapi.LabelSelector
+ selector *labels.Selector
+}
+
+// newDNSController creates a controller for CoreDNS.
+func newdnsController(kubeClient *kubernetes.Clientset, opts dnsControlOpts) *dnsControl {
+ dns := dnsControl{
+ client: kubeClient,
+ selector: opts.selector,
+ stopCh: make(chan struct{}),
+ }
+
+ dns.svcLister.Indexer, dns.svcController = cache.NewIndexerInformer(
+ &cache.ListWatch{
+ ListFunc: serviceListFunc(dns.client, namespace, dns.selector),
+ WatchFunc: serviceWatchFunc(dns.client, namespace, dns.selector),
+ },
+ &api.Service{},
+ opts.resyncPeriod,
+ cache.ResourceEventHandlerFuncs{},
+ cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
+
+ if opts.initPodCache {
+ dns.podLister.Indexer, dns.podController = cache.NewIndexerInformer(
+ &cache.ListWatch{
+ ListFunc: podListFunc(dns.client, namespace, dns.selector),
+ WatchFunc: podWatchFunc(dns.client, namespace, dns.selector),
+ },
+ &api.Pod{}, // TODO replace with a lighter-weight custom struct
+ opts.resyncPeriod,
+ cache.ResourceEventHandlerFuncs{},
+ cache.Indexers{podIPIndex: podIPIndexFunc})
+ }
+
+ dns.nsLister.Store, dns.nsController = cache.NewInformer(
+ &cache.ListWatch{
+ ListFunc: namespaceListFunc(dns.client, dns.selector),
+ WatchFunc: namespaceWatchFunc(dns.client, dns.selector),
+ },
+ &api.Namespace{},
+ opts.resyncPeriod,
+ cache.ResourceEventHandlerFuncs{})
+
+ dns.epLister.Store, dns.epController = cache.NewInformer(
+ &cache.ListWatch{
+ ListFunc: endpointsListFunc(dns.client, namespace, dns.selector),
+ WatchFunc: endpointsWatchFunc(dns.client, namespace, dns.selector),
+ },
+ &api.Endpoints{},
+ opts.resyncPeriod,
+ cache.ResourceEventHandlerFuncs{})
+
+ return &dns
+}
+
+func podIPIndexFunc(obj interface{}) ([]string, error) {
+ p, ok := obj.(*api.Pod)
+ if !ok {
+ return nil, errors.New("obj was not an *api.Pod")
+ }
+ return []string{p.Status.PodIP}, nil
+}
+
+func serviceListFunc(c *kubernetes.Clientset, ns string, s *labels.Selector) func(api.ListOptions) (runtime.Object, error) {
+ return func(opts api.ListOptions) (runtime.Object, error) {
+ if s != nil {
+ opts.LabelSelector = *s
+ }
+ listV1, err := c.Core().Services(ns).List(opts)
+
+ if err != nil {
+ return nil, err
+ }
+ var listAPI api.ServiceList
+ err = v1.Convert_v1_ServiceList_To_api_ServiceList(listV1, &listAPI, nil)
+ if err != nil {
+ return nil, err
+ }
+ return &listAPI, err
+ }
+}
+
+func podListFunc(c *kubernetes.Clientset, ns string, s *labels.Selector) func(api.ListOptions) (runtime.Object, error) {
+ return func(opts api.ListOptions) (runtime.Object, error) {
+ if s != nil {
+ opts.LabelSelector = *s
+ }
+ listV1, err := c.Core().Pods(ns).List(opts)
+
+ if err != nil {
+ return nil, err
+ }
+ var listAPI api.PodList
+ err = v1.Convert_v1_PodList_To_api_PodList(listV1, &listAPI, nil)
+ if err != nil {
+ return nil, err
+ }
+
+ return &listAPI, err
+ }
+}
+
+func v1ToAPIFilter(in watch.Event) (out watch.Event, keep bool) {
+ if in.Type == watch.Error {
+ return in, true
+ }
+
+ switch v1Obj := in.Object.(type) {
+ case *v1.Service:
+ var apiObj api.Service
+ err := v1.Convert_v1_Service_To_api_Service(v1Obj, &apiObj, nil)
+ if err != nil {
+ log.Printf("[ERROR] Could not convert v1.Service: %s", err)
+ return in, true
+ }
+ return watch.Event{Type: in.Type, Object: &apiObj}, true
+ case *v1.Pod:
+ var apiObj api.Pod
+ err := v1.Convert_v1_Pod_To_api_Pod(v1Obj, &apiObj, nil)
+ if err != nil {
+ log.Printf("[ERROR] Could not convert v1.Pod: %s", err)
+ return in, true
+ }
+ return watch.Event{Type: in.Type, Object: &apiObj}, true
+ case *v1.Namespace:
+ var apiObj api.Namespace
+ err := v1.Convert_v1_Namespace_To_api_Namespace(v1Obj, &apiObj, nil)
+ if err != nil {
+ log.Printf("[ERROR] Could not convert v1.Namespace: %s", err)
+ return in, true
+ }
+ return watch.Event{Type: in.Type, Object: &apiObj}, true
+ case *v1.Endpoints:
+ var apiObj api.Endpoints
+ err := v1.Convert_v1_Endpoints_To_api_Endpoints(v1Obj, &apiObj, nil)
+ if err != nil {
+ log.Printf("[ERROR] Could not convert v1.Endpoint: %s", err)
+ return in, true
+ }
+ return watch.Event{Type: in.Type, Object: &apiObj}, true
+ }
+
+ log.Printf("[WARN] Unhandled v1 type in event: %v", in)
+ return in, true
+}
+
+func serviceWatchFunc(c *kubernetes.Clientset, ns string, s *labels.Selector) func(options api.ListOptions) (watch.Interface, error) {
+ return func(options api.ListOptions) (watch.Interface, error) {
+ if s != nil {
+ options.LabelSelector = *s
+ }
+ w, err := c.Core().Services(ns).Watch(options)
+ if err != nil {
+ return nil, err
+ }
+ return watch.Filter(w, v1ToAPIFilter), nil
+ }
+}
+
+func podWatchFunc(c *kubernetes.Clientset, ns string, s *labels.Selector) func(options api.ListOptions) (watch.Interface, error) {
+ return func(options api.ListOptions) (watch.Interface, error) {
+ if s != nil {
+ options.LabelSelector = *s
+ }
+ w, err := c.Core().Pods(ns).Watch(options)
+
+ if err != nil {
+ return nil, err
+ }
+ return watch.Filter(w, v1ToAPIFilter), nil
+ }
+}
+
+func namespaceListFunc(c *kubernetes.Clientset, s *labels.Selector) func(api.ListOptions) (runtime.Object, error) {
+ return func(opts api.ListOptions) (runtime.Object, error) {
+ if s != nil {
+ opts.LabelSelector = *s
+ }
+ listV1, err := c.Core().Namespaces().List(opts)
+ if err != nil {
+ return nil, err
+ }
+ var listAPI api.NamespaceList
+ err = v1.Convert_v1_NamespaceList_To_api_NamespaceList(listV1, &listAPI, nil)
+ if err != nil {
+ return nil, err
+ }
+ return &listAPI, err
+ }
+}
+
+func namespaceWatchFunc(c *kubernetes.Clientset, s *labels.Selector) func(options api.ListOptions) (watch.Interface, error) {
+ return func(options api.ListOptions) (watch.Interface, error) {
+ if s != nil {
+ options.LabelSelector = *s
+ }
+ w, err := c.Core().Namespaces().Watch(options)
+ if err != nil {
+ return nil, err
+ }
+ return watch.Filter(w, v1ToAPIFilter), nil
+ }
+}
+
+func endpointsListFunc(c *kubernetes.Clientset, ns string, s *labels.Selector) func(api.ListOptions) (runtime.Object, error) {
+ return func(opts api.ListOptions) (runtime.Object, error) {
+ if s != nil {
+ opts.LabelSelector = *s
+ }
+ listV1, err := c.Core().Endpoints(ns).List(opts)
+
+ if err != nil {
+ return nil, err
+ }
+ var listAPI api.EndpointsList
+ err = v1.Convert_v1_EndpointsList_To_api_EndpointsList(listV1, &listAPI, nil)
+ if err != nil {
+ return nil, err
+ }
+ return &listAPI, err
+ }
+}
+
+func endpointsWatchFunc(c *kubernetes.Clientset, ns string, s *labels.Selector) func(options api.ListOptions) (watch.Interface, error) {
+ return func(options api.ListOptions) (watch.Interface, error) {
+ if s != nil {
+ options.LabelSelector = *s
+ }
+ w, err := c.Core().Endpoints(ns).Watch(options)
+ if err != nil {
+ return nil, err
+ }
+ return watch.Filter(w, v1ToAPIFilter), nil
+ }
+}
+
+func (dns *dnsControl) controllersInSync() bool {
+ hs := dns.svcController.HasSynced() &&
+ dns.nsController.HasSynced() &&
+ dns.epController.HasSynced()
+
+ if dns.podController != nil {
+ hs = hs && dns.podController.HasSynced()
+ }
+
+ return hs
+}
+
+// Stop stops the controller.
+func (dns *dnsControl) Stop() error {
+ dns.stopLock.Lock()
+ defer dns.stopLock.Unlock()
+
+ // Only try draining the workqueue if we haven't already.
+ if !dns.shutdown {
+ close(dns.stopCh)
+ dns.shutdown = true
+
+ return nil
+ }
+
+ return fmt.Errorf("shutdown already in progress")
+}
+
+// Run starts the controller.
+func (dns *dnsControl) Run() {
+ go dns.svcController.Run(dns.stopCh)
+ go dns.nsController.Run(dns.stopCh)
+ go dns.epController.Run(dns.stopCh)
+ if dns.podController != nil {
+ go dns.podController.Run(dns.stopCh)
+ }
+ <-dns.stopCh
+}
+
+func (dns *dnsControl) NamespaceList() *api.NamespaceList {
+ nsList, err := dns.nsLister.List()
+ if err != nil {
+ return &api.NamespaceList{}
+ }
+
+ return &nsList
+}
+
+func (dns *dnsControl) ServiceList() []*api.Service {
+ svcs, err := dns.svcLister.List(labels.Everything())
+ if err != nil {
+ return []*api.Service{}
+ }
+
+ return svcs
+}
+
+func (dns *dnsControl) PodIndex(ip string) []interface{} {
+ pods, err := dns.podLister.Indexer.ByIndex(podIPIndex, ip)
+ if err != nil {
+ return nil
+ }
+
+ return pods
+}
+
+func (dns *dnsControl) EndpointsList() api.EndpointsList {
+ epl, err := dns.epLister.List()
+ if err != nil {
+ return api.EndpointsList{}
+ }
+
+ return epl
+}
+
+func (dns *dnsControl) GetNodeByName(name string) (api.Node, error) {
+ v1node, err := dns.client.Core().Nodes().Get(name)
+ if err != nil {
+ return api.Node{}, err
+ }
+ var apinode api.Node
+ err = v1.Convert_v1_Node_To_api_Node(v1node, &apinode, nil)
+ if err != nil {
+ return api.Node{}, err
+ }
+ return apinode, nil
+}
diff --git a/plugin/kubernetes/federation.go b/plugin/kubernetes/federation.go
new file mode 100644
index 000000000..df6ae948b
--- /dev/null
+++ b/plugin/kubernetes/federation.go
@@ -0,0 +1,45 @@
+package kubernetes
+
+import (
+ "github.com/coredns/coredns/plugin/etcd/msg"
+ "github.com/coredns/coredns/plugin/pkg/dnsutil"
+ "github.com/coredns/coredns/request"
+)
+
+// The federation node.Labels keys used.
+const (
+ // TODO: Do not hardcode these labels. Pull them out of the API instead.
+ //
+ // We can get them via ....
+ // import metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ // metav1.LabelZoneFailureDomain
+ // metav1.LabelZoneRegion
+ //
+ // But importing above breaks coredns with flag collision of 'log_dir'
+
+ LabelZone = "failure-domain.beta.kubernetes.io/zone"
+ LabelRegion = "failure-domain.beta.kubernetes.io/region"
+)
+
+// Federations is used from the federations plugin to return the service that should be
+// returned as a CNAME for federation(s) to work.
+func (k *Kubernetes) Federations(state request.Request, fname, fzone string) (msg.Service, error) {
+ nodeName := k.localNodeName()
+ node, err := k.APIConn.GetNodeByName(nodeName)
+ if err != nil {
+ return msg.Service{}, err
+ }
+ r, err := parseRequest(state)
+ if err != nil {
+ return msg.Service{}, err
+ }
+
+ lz := node.Labels[LabelZone]
+ lr := node.Labels[LabelRegion]
+
+ if r.endpoint == "" {
+ return msg.Service{Host: dnsutil.Join([]string{r.service, r.namespace, fname, r.podOrSvc, lz, lr, fzone})}, nil
+ }
+
+ return msg.Service{Host: dnsutil.Join([]string{r.endpoint, r.service, r.namespace, fname, r.podOrSvc, lz, lr, fzone})}, nil
+}
diff --git a/plugin/kubernetes/handler.go b/plugin/kubernetes/handler.go
new file mode 100644
index 000000000..9dc435111
--- /dev/null
+++ b/plugin/kubernetes/handler.go
@@ -0,0 +1,86 @@
+package kubernetes
+
+import (
+ "github.com/coredns/coredns/plugin"
+ "github.com/coredns/coredns/plugin/pkg/dnsutil"
+ "github.com/coredns/coredns/request"
+
+ "github.com/miekg/dns"
+ "golang.org/x/net/context"
+)
+
+// ServeDNS implements the plugin.Handler interface.
+func (k Kubernetes) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg) (int, error) {
+ state := request.Request{W: w, Req: r}
+
+ m := new(dns.Msg)
+ m.SetReply(r)
+ m.Authoritative, m.RecursionAvailable, m.Compress = true, true, true
+
+ zone := plugin.Zones(k.Zones).Matches(state.Name())
+ if zone == "" {
+ return plugin.NextOrFailure(k.Name(), k.Next, ctx, w, r)
+ }
+
+ state.Zone = zone
+
+ var (
+ records []dns.RR
+ extra []dns.RR
+ err error
+ )
+
+ switch state.Type() {
+ case "A":
+ records, err = plugin.A(&k, zone, state, nil, plugin.Options{})
+ case "AAAA":
+ records, err = plugin.AAAA(&k, zone, state, nil, plugin.Options{})
+ case "TXT":
+ records, err = plugin.TXT(&k, zone, state, plugin.Options{})
+ case "CNAME":
+ records, err = plugin.CNAME(&k, zone, state, plugin.Options{})
+ case "PTR":
+ records, err = plugin.PTR(&k, zone, state, plugin.Options{})
+ case "MX":
+ records, extra, err = plugin.MX(&k, zone, state, plugin.Options{})
+ case "SRV":
+ records, extra, err = plugin.SRV(&k, zone, state, plugin.Options{})
+ case "SOA":
+ records, err = plugin.SOA(&k, zone, state, plugin.Options{})
+ case "NS":
+ if state.Name() == zone {
+ records, extra, err = plugin.NS(&k, zone, state, plugin.Options{})
+ break
+ }
+ fallthrough
+ default:
+ // Do a fake A lookup, so we can distinguish between NODATA and NXDOMAIN
+ _, err = plugin.A(&k, zone, state, nil, plugin.Options{})
+ }
+
+ if k.IsNameError(err) {
+ if k.Fallthrough {
+ return plugin.NextOrFailure(k.Name(), k.Next, ctx, w, r)
+ }
+ return plugin.BackendError(&k, zone, dns.RcodeNameError, state, nil /* err */, plugin.Options{})
+ }
+ if err != nil {
+ return dns.RcodeServerFailure, err
+ }
+
+ if len(records) == 0 {
+ return plugin.BackendError(&k, zone, dns.RcodeSuccess, state, nil, plugin.Options{})
+ }
+
+ m.Answer = append(m.Answer, records...)
+ m.Extra = append(m.Extra, extra...)
+
+ m = dnsutil.Dedup(m)
+ state.SizeAndDo(m)
+ m, _ = state.Scrub(m)
+ w.WriteMsg(m)
+ return dns.RcodeSuccess, nil
+}
+
+// Name implements the Handler interface.
+func (k Kubernetes) Name() string { return "kubernetes" }
diff --git a/plugin/kubernetes/handler_pod_disabled_test.go b/plugin/kubernetes/handler_pod_disabled_test.go
new file mode 100644
index 000000000..4c6e15710
--- /dev/null
+++ b/plugin/kubernetes/handler_pod_disabled_test.go
@@ -0,0 +1,61 @@
+package kubernetes
+
+import (
+ "testing"
+
+ "github.com/coredns/coredns/plugin/pkg/dnsrecorder"
+ "github.com/coredns/coredns/plugin/test"
+
+ "github.com/miekg/dns"
+ "golang.org/x/net/context"
+)
+
+var podModeDisabledCases = []test.Case{
+ {
+ Qname: "10-240-0-1.podns.pod.cluster.local.", Qtype: dns.TypeA,
+ Rcode: dns.RcodeNameError,
+ Error: errPodsDisabled,
+ Ns: []dns.RR{
+ test.SOA("cluster.local. 300 IN SOA ns.dns.cluster.local. hostmaster.cluster.local. 1499347823 7200 1800 86400 60"),
+ },
+ },
+ {
+ Qname: "172-0-0-2.podns.pod.cluster.local.", Qtype: dns.TypeA,
+ Rcode: dns.RcodeNameError,
+ Error: errPodsDisabled,
+ Ns: []dns.RR{
+ test.SOA("cluster.local. 300 IN SOA ns.dns.cluster.local. hostmaster.cluster.local. 1499347823 7200 1800 86400 60"),
+ },
+ },
+}
+
+func TestServeDNSModeDisabled(t *testing.T) {
+
+ k := New([]string{"cluster.local."})
+ k.APIConn = &APIConnServeTest{}
+ k.Next = test.NextHandler(dns.RcodeSuccess, nil)
+ k.podMode = podModeDisabled
+ ctx := context.TODO()
+
+ for i, tc := range podModeDisabledCases {
+ r := tc.Msg()
+
+ w := dnsrecorder.New(&test.ResponseWriter{})
+
+ _, err := k.ServeDNS(ctx, w, r)
+ if err != tc.Error {
+ t.Errorf("Test %d expected no error, got %v", i, err)
+ return
+ }
+ if tc.Error != nil {
+ continue
+ }
+
+ resp := w.Msg
+ if resp == nil {
+ t.Fatalf("Test %d, got nil message and no error for %q", i, r.Question[0].Name)
+ }
+
+ test.SortAndCheck(t, resp, tc)
+ }
+}
diff --git a/plugin/kubernetes/handler_pod_insecure_test.go b/plugin/kubernetes/handler_pod_insecure_test.go
new file mode 100644
index 000000000..b2df8a504
--- /dev/null
+++ b/plugin/kubernetes/handler_pod_insecure_test.go
@@ -0,0 +1,59 @@
+package kubernetes
+
+import (
+ "testing"
+
+ "github.com/coredns/coredns/plugin/pkg/dnsrecorder"
+ "github.com/coredns/coredns/plugin/test"
+
+ "github.com/miekg/dns"
+ "golang.org/x/net/context"
+)
+
+var podModeInsecureCases = []test.Case{
+ {
+ Qname: "10-240-0-1.podns.pod.cluster.local.", Qtype: dns.TypeA,
+ Rcode: dns.RcodeSuccess,
+ Answer: []dns.RR{
+ test.A("10-240-0-1.podns.pod.cluster.local. 0 IN A 10.240.0.1"),
+ },
+ },
+ {
+ Qname: "172-0-0-2.podns.pod.cluster.local.", Qtype: dns.TypeA,
+ Rcode: dns.RcodeSuccess,
+ Answer: []dns.RR{
+ test.A("172-0-0-2.podns.pod.cluster.local. 0 IN A 172.0.0.2"),
+ },
+ },
+}
+
+func TestServeDNSModeInsecure(t *testing.T) {
+
+ k := New([]string{"cluster.local."})
+ k.APIConn = &APIConnServeTest{}
+ k.Next = test.NextHandler(dns.RcodeSuccess, nil)
+ ctx := context.TODO()
+ k.podMode = podModeInsecure
+
+ for i, tc := range podModeInsecureCases {
+ r := tc.Msg()
+
+ w := dnsrecorder.New(&test.ResponseWriter{})
+
+ _, err := k.ServeDNS(ctx, w, r)
+ if err != tc.Error {
+ t.Errorf("Test %d expected no error, got %v", i, err)
+ return
+ }
+ if tc.Error != nil {
+ continue
+ }
+
+ resp := w.Msg
+ if resp == nil {
+ t.Fatalf("Test %d, got nil message and no error for %q", i, r.Question[0].Name)
+ }
+
+ test.SortAndCheck(t, resp, tc)
+ }
+}
diff --git a/plugin/kubernetes/handler_pod_verified_test.go b/plugin/kubernetes/handler_pod_verified_test.go
new file mode 100644
index 000000000..ea585cc6a
--- /dev/null
+++ b/plugin/kubernetes/handler_pod_verified_test.go
@@ -0,0 +1,59 @@
+package kubernetes
+
+import (
+ "testing"
+
+ "github.com/coredns/coredns/plugin/pkg/dnsrecorder"
+ "github.com/coredns/coredns/plugin/test"
+
+ "github.com/miekg/dns"
+ "golang.org/x/net/context"
+)
+
+var podModeVerifiedCases = []test.Case{
+ {
+ Qname: "10-240-0-1.podns.pod.cluster.local.", Qtype: dns.TypeA,
+ Rcode: dns.RcodeSuccess,
+ Answer: []dns.RR{
+ test.A("10-240-0-1.podns.pod.cluster.local. 0 IN A 10.240.0.1"),
+ },
+ },
+ {
+ Qname: "172-0-0-2.podns.pod.cluster.local.", Qtype: dns.TypeA,
+ Rcode: dns.RcodeNameError,
+ Ns: []dns.RR{
+ test.SOA("cluster.local. 300 IN SOA ns.dns.cluster.local. hostmaster.cluster.local. 1499347823 7200 1800 86400 60"),
+ },
+ },
+}
+
+func TestServeDNSModeVerified(t *testing.T) {
+
+ k := New([]string{"cluster.local."})
+ k.APIConn = &APIConnServeTest{}
+ k.Next = test.NextHandler(dns.RcodeSuccess, nil)
+ ctx := context.TODO()
+ k.podMode = podModeVerified
+
+ for i, tc := range podModeVerifiedCases {
+ r := tc.Msg()
+
+ w := dnsrecorder.New(&test.ResponseWriter{})
+
+ _, err := k.ServeDNS(ctx, w, r)
+ if err != tc.Error {
+ t.Errorf("Test %d expected no error, got %v", i, err)
+ return
+ }
+ if tc.Error != nil {
+ continue
+ }
+
+ resp := w.Msg
+ if resp == nil {
+ t.Fatalf("Test %d, got nil message and no error for %q", i, r.Question[0].Name)
+ }
+
+ test.SortAndCheck(t, resp, tc)
+ }
+}
diff --git a/plugin/kubernetes/handler_test.go b/plugin/kubernetes/handler_test.go
new file mode 100644
index 000000000..5413f5b4c
--- /dev/null
+++ b/plugin/kubernetes/handler_test.go
@@ -0,0 +1,347 @@
+package kubernetes
+
+import (
+ "testing"
+
+ "github.com/coredns/coredns/plugin/pkg/dnsrecorder"
+ "github.com/coredns/coredns/plugin/test"
+
+ "github.com/miekg/dns"
+ "golang.org/x/net/context"
+ "k8s.io/client-go/1.5/pkg/api"
+)
+
+var dnsTestCases = []test.Case{
+ // A Service
+ {
+ Qname: "svc1.testns.svc.cluster.local.", Qtype: dns.TypeA,
+ Rcode: dns.RcodeSuccess,
+ Answer: []dns.RR{
+ test.A("svc1.testns.svc.cluster.local. 5 IN A 10.0.0.1"),
+ },
+ },
+ // A Service (wildcard)
+ {
+ Qname: "svc1.*.svc.cluster.local.", Qtype: dns.TypeA,
+ Rcode: dns.RcodeSuccess,
+ Answer: []dns.RR{
+ test.A("svc1.*.svc.cluster.local. 5 IN A 10.0.0.1"),
+ },
+ },
+ {
+ Qname: "svc1.testns.svc.cluster.local.", Qtype: dns.TypeSRV,
+ Rcode: dns.RcodeSuccess,
+ Answer: []dns.RR{test.SRV("svc1.testns.svc.cluster.local. 303 IN SRV 0 100 80 svc1.testns.svc.cluster.local.")},
+ Extra: []dns.RR{test.A("svc1.testns.svc.cluster.local. 303 IN A 10.0.0.1")},
+ },
+ // SRV Service (wildcard)
+ {
+ Qname: "svc1.*.svc.cluster.local.", Qtype: dns.TypeSRV,
+ Rcode: dns.RcodeSuccess,
+ Answer: []dns.RR{test.SRV("svc1.*.svc.cluster.local. 303 IN SRV 0 100 80 svc1.testns.svc.cluster.local.")},
+ Extra: []dns.RR{test.A("svc1.testns.svc.cluster.local. 303 IN A 10.0.0.1")},
+ },
+ // SRV Service (wildcards)
+ {
+ Qname: "*.any.svc1.*.svc.cluster.local.", Qtype: dns.TypeSRV,
+ Rcode: dns.RcodeSuccess,
+ Answer: []dns.RR{test.SRV("*.any.svc1.*.svc.cluster.local. 303 IN SRV 0 100 80 svc1.testns.svc.cluster.local.")},
+ Extra: []dns.RR{test.A("svc1.testns.svc.cluster.local. 303 IN A 10.0.0.1")},
+ },
+ // A Service (wildcards)
+ {
+ Qname: "*.any.svc1.*.svc.cluster.local.", Qtype: dns.TypeA,
+ Rcode: dns.RcodeSuccess,
+ Answer: []dns.RR{
+ test.A("*.any.svc1.*.svc.cluster.local. 303 IN A 10.0.0.1"),
+ },
+ },
+ // SRV Service Not udp/tcp
+ {
+ Qname: "*._not-udp-or-tcp.svc1.testns.svc.cluster.local.", Qtype: dns.TypeSRV,
+ Rcode: dns.RcodeNameError,
+ Ns: []dns.RR{
+ test.SOA("cluster.local. 300 IN SOA ns.dns.cluster.local. hostmaster.cluster.local. 1499347823 7200 1800 86400 60"),
+ },
+ },
+ // SRV Service
+ {
+ Qname: "_http._tcp.svc1.testns.svc.cluster.local.", Qtype: dns.TypeSRV,
+ Rcode: dns.RcodeSuccess,
+ Answer: []dns.RR{
+ test.SRV("_http._tcp.svc1.testns.svc.cluster.local. 303 IN SRV 0 100 80 svc1.testns.svc.cluster.local."),
+ },
+ Extra: []dns.RR{
+ test.A("svc1.testns.svc.cluster.local. 303 IN A 10.0.0.1"),
+ },
+ },
+ // A Service (Headless)
+ {
+ Qname: "hdls1.testns.svc.cluster.local.", Qtype: dns.TypeA,
+ Rcode: dns.RcodeSuccess,
+ Answer: []dns.RR{
+ test.A("hdls1.testns.svc.cluster.local. 303 IN A 172.0.0.2"),
+ test.A("hdls1.testns.svc.cluster.local. 303 IN A 172.0.0.3"),
+ },
+ },
+ // SRV Service (Headless)
+ {
+ Qname: "_http._tcp.hdls1.testns.svc.cluster.local.", Qtype: dns.TypeSRV,
+ Rcode: dns.RcodeSuccess,
+ Answer: []dns.RR{
+ test.SRV("_http._tcp.hdls1.testns.svc.cluster.local. 303 IN SRV 0 50 80 172-0-0-2.hdls1.testns.svc.cluster.local."),
+ test.SRV("_http._tcp.hdls1.testns.svc.cluster.local. 303 IN SRV 0 50 80 172-0-0-3.hdls1.testns.svc.cluster.local."),
+ },
+ Extra: []dns.RR{
+ test.A("172-0-0-2.hdls1.testns.svc.cluster.local. 303 IN A 172.0.0.2"),
+ test.A("172-0-0-3.hdls1.testns.svc.cluster.local. 303 IN A 172.0.0.3"),
+ },
+ },
+ // CNAME External
+ {
+ Qname: "external.testns.svc.cluster.local.", Qtype: dns.TypeCNAME,
+ Rcode: dns.RcodeSuccess,
+ Answer: []dns.RR{
+ test.CNAME("external.testns.svc.cluster.local. 303 IN CNAME ext.interwebs.test."),
+ },
+ },
+ // AAAA Service (existing service)
+ {
+ Qname: "svc1.testns.svc.cluster.local.", Qtype: dns.TypeAAAA,
+ Rcode: dns.RcodeSuccess,
+ Ns: []dns.RR{
+ test.SOA("cluster.local. 300 IN SOA ns.dns.cluster.local. hostmaster.cluster.local. 1499347823 7200 1800 86400 60"),
+ },
+ },
+ // AAAA Service (non-existing service)
+ {
+ Qname: "svc0.testns.svc.cluster.local.", Qtype: dns.TypeAAAA,
+ Rcode: dns.RcodeNameError,
+ Ns: []dns.RR{
+ test.SOA("cluster.local. 300 IN SOA ns.dns.cluster.local. hostmaster.cluster.local. 1499347823 7200 1800 86400 60"),
+ },
+ },
+ // A Service (non-existing service)
+ {
+ Qname: "svc0.testns.svc.cluster.local.", Qtype: dns.TypeA,
+ Rcode: dns.RcodeNameError,
+ Ns: []dns.RR{
+ test.SOA("cluster.local. 300 IN SOA ns.dns.cluster.local. hostmaster.cluster.local. 1499347823 7200 1800 86400 60"),
+ },
+ },
+ // TXT Schema
+ {
+ Qname: "dns-version.cluster.local.", Qtype: dns.TypeTXT,
+ Rcode: dns.RcodeSuccess,
+ Answer: []dns.RR{
+ test.TXT("dns-version.cluster.local 28800 IN TXT 1.0.1"),
+ },
+ },
+ // A Service (Headless) does not exist
+ {
+ Qname: "bogusendpoint.hdls1.testns.svc.cluster.local.", Qtype: dns.TypeA,
+ Rcode: dns.RcodeNameError,
+ Ns: []dns.RR{
+ test.SOA("cluster.local. 300 IN SOA ns.dns.cluster.local. hostmaster.cluster.local. 1499347823 7200 1800 86400 60"),
+ },
+ },
+ // A Service does not exist
+ {
+ Qname: "bogusendpoint.svc0.testns.svc.cluster.local.", Qtype: dns.TypeA,
+ Rcode: dns.RcodeNameError,
+ Ns: []dns.RR{
+ test.SOA("cluster.local. 300 IN SOA ns.dns.cluster.local. hostmaster.cluster.local. 1499347823 7200 1800 86400 60"),
+ },
+ },
+}
+
+func TestServeDNS(t *testing.T) {
+
+ k := New([]string{"cluster.local."})
+ k.APIConn = &APIConnServeTest{}
+ k.Next = test.NextHandler(dns.RcodeSuccess, nil)
+ ctx := context.TODO()
+
+ for i, tc := range dnsTestCases {
+ r := tc.Msg()
+
+ w := dnsrecorder.New(&test.ResponseWriter{})
+
+ _, err := k.ServeDNS(ctx, w, r)
+ if err != tc.Error {
+ t.Errorf("Test %d expected no error, got %v", i, err)
+ return
+ }
+ if tc.Error != nil {
+ continue
+ }
+
+ resp := w.Msg
+ if resp == nil {
+ t.Fatalf("Test %d, got nil message and no error for %q", i, r.Question[0].Name)
+ }
+
+ // Before sorting, make sure that CNAMES do not appear after their target records
+ test.CNAMEOrder(t, resp)
+
+ test.SortAndCheck(t, resp, tc)
+ }
+}
+
+type APIConnServeTest struct{}
+
+func (APIConnServeTest) Run() { return }
+func (APIConnServeTest) Stop() error { return nil }
+
+func (APIConnServeTest) PodIndex(string) []interface{} {
+ a := make([]interface{}, 1)
+ a[0] = &api.Pod{
+ ObjectMeta: api.ObjectMeta{
+ Namespace: "podns",
+ },
+ Status: api.PodStatus{
+ PodIP: "10.240.0.1", // Remote IP set in test.ResponseWriter
+ },
+ }
+ return a
+}
+
+func (APIConnServeTest) ServiceList() []*api.Service {
+ svcs := []*api.Service{
+ {
+ ObjectMeta: api.ObjectMeta{
+ Name: "svc1",
+ Namespace: "testns",
+ },
+ Spec: api.ServiceSpec{
+ ClusterIP: "10.0.0.1",
+ Ports: []api.ServicePort{{
+ Name: "http",
+ Protocol: "tcp",
+ Port: 80,
+ }},
+ },
+ },
+ {
+ ObjectMeta: api.ObjectMeta{
+ Name: "hdls1",
+ Namespace: "testns",
+ },
+ Spec: api.ServiceSpec{
+ ClusterIP: api.ClusterIPNone,
+ },
+ },
+ {
+ ObjectMeta: api.ObjectMeta{
+ Name: "external",
+ Namespace: "testns",
+ },
+ Spec: api.ServiceSpec{
+ ExternalName: "ext.interwebs.test",
+ Ports: []api.ServicePort{{
+ Name: "http",
+ Protocol: "tcp",
+ Port: 80,
+ }},
+ },
+ },
+ }
+ return svcs
+
+}
+
+func (APIConnServeTest) EndpointsList() api.EndpointsList {
+ n := "test.node.foo.bar"
+
+ return api.EndpointsList{
+ Items: []api.Endpoints{
+ {
+ Subsets: []api.EndpointSubset{
+ {
+ Addresses: []api.EndpointAddress{
+ {
+ IP: "172.0.0.1",
+ Hostname: "ep1a",
+ },
+ },
+ Ports: []api.EndpointPort{
+ {
+ Port: 80,
+ Protocol: "tcp",
+ Name: "http",
+ },
+ },
+ },
+ },
+ ObjectMeta: api.ObjectMeta{
+ Name: "svc1",
+ Namespace: "testns",
+ },
+ },
+ {
+ Subsets: []api.EndpointSubset{
+ {
+ Addresses: []api.EndpointAddress{
+ {
+ IP: "172.0.0.2",
+ },
+ },
+ Ports: []api.EndpointPort{
+ {
+ Port: 80,
+ Protocol: "tcp",
+ Name: "http",
+ },
+ },
+ },
+ },
+ ObjectMeta: api.ObjectMeta{
+ Name: "hdls1",
+ Namespace: "testns",
+ },
+ },
+ {
+ Subsets: []api.EndpointSubset{
+ {
+ Addresses: []api.EndpointAddress{
+ {
+ IP: "172.0.0.3",
+ },
+ },
+ Ports: []api.EndpointPort{
+ {
+ Port: 80,
+ Protocol: "tcp",
+ Name: "http",
+ },
+ },
+ },
+ },
+ ObjectMeta: api.ObjectMeta{
+ Name: "hdls1",
+ Namespace: "testns",
+ },
+ },
+ {
+ Subsets: []api.EndpointSubset{
+ {
+ Addresses: []api.EndpointAddress{
+ {
+ IP: "10.9.8.7",
+ NodeName: &n,
+ },
+ },
+ },
+ },
+ },
+ },
+ }
+}
+
+func (APIConnServeTest) GetNodeByName(name string) (api.Node, error) {
+ return api.Node{
+ ObjectMeta: api.ObjectMeta{
+ Name: "test.node.foo.bar",
+ },
+ }, nil
+}
diff --git a/plugin/kubernetes/kubernetes.go b/plugin/kubernetes/kubernetes.go
new file mode 100644
index 000000000..90fcd6182
--- /dev/null
+++ b/plugin/kubernetes/kubernetes.go
@@ -0,0 +1,457 @@
+// Package kubernetes provides the kubernetes backend.
+package kubernetes
+
+import (
+ "errors"
+ "fmt"
+ "net"
+ "strings"
+ "sync/atomic"
+ "time"
+
+ "github.com/coredns/coredns/plugin"
+ "github.com/coredns/coredns/plugin/etcd/msg"
+ "github.com/coredns/coredns/plugin/pkg/dnsutil"
+ "github.com/coredns/coredns/plugin/pkg/healthcheck"
+ "github.com/coredns/coredns/plugin/proxy"
+ "github.com/coredns/coredns/request"
+
+ "github.com/miekg/dns"
+ "k8s.io/client-go/1.5/kubernetes"
+ "k8s.io/client-go/1.5/pkg/api"
+ unversionedapi "k8s.io/client-go/1.5/pkg/api/unversioned"
+ "k8s.io/client-go/1.5/pkg/labels"
+ "k8s.io/client-go/1.5/rest"
+ "k8s.io/client-go/1.5/tools/clientcmd"
+ clientcmdapi "k8s.io/client-go/1.5/tools/clientcmd/api"
+)
+
+// Kubernetes implements a plugin that connects to a Kubernetes cluster.
+type Kubernetes struct {
+ Next plugin.Handler
+ Zones []string
+ Proxy proxy.Proxy // Proxy for looking up names during the resolution process
+ APIServerList []string
+ APIProxy *apiProxy
+ APICertAuth string
+ APIClientCert string
+ APIClientKey string
+ APIConn dnsController
+ Namespaces map[string]bool
+ podMode string
+ Fallthrough bool
+ ttl uint32
+
+ primaryZoneIndex int
+ interfaceAddrsFunc func() net.IP
+ autoPathSearch []string // Local search path from /etc/resolv.conf. Needed for autopath.
+}
+
+// New returns a intialized Kubernetes. It default interfaceAddrFunc to return 127.0.0.1. All other
+// values default to their zero value, primaryZoneIndex will thus point to the first zone.
+func New(zones []string) *Kubernetes {
+ k := new(Kubernetes)
+ k.Zones = zones
+ k.Namespaces = make(map[string]bool)
+ k.interfaceAddrsFunc = func() net.IP { return net.ParseIP("127.0.0.1") }
+ k.podMode = podModeDisabled
+ k.Proxy = proxy.Proxy{}
+ k.ttl = defaultTTL
+
+ return k
+}
+
+const (
+ // podModeDisabled is the default value where pod requests are ignored
+ podModeDisabled = "disabled"
+ // podModeVerified is where Pod requests are answered only if they exist
+ podModeVerified = "verified"
+ // podModeInsecure is where pod requests are answered without verfying they exist
+ podModeInsecure = "insecure"
+ // DNSSchemaVersion is the schema version: https://github.com/kubernetes/dns/blob/master/docs/specification.md
+ DNSSchemaVersion = "1.0.1"
+)
+
+var (
+ errNoItems = errors.New("no items found")
+ errNsNotExposed = errors.New("namespace is not exposed")
+ errInvalidRequest = errors.New("invalid query name")
+ errAPIBadPodType = errors.New("expected type *api.Pod")
+ errPodsDisabled = errors.New("pod records disabled")
+)
+
+// Services implements the ServiceBackend interface.
+func (k *Kubernetes) Services(state request.Request, exact bool, opt plugin.Options) (svcs []msg.Service, err error) {
+
+ // We're looking again at types, which we've already done in ServeDNS, but there are some types k8s just can't answer.
+ switch state.QType() {
+
+ case dns.TypeTXT:
+ // 1 label + zone, label must be "dns-version".
+ t, _ := dnsutil.TrimZone(state.Name(), state.Zone)
+
+ segs := dns.SplitDomainName(t)
+ if len(segs) != 1 {
+ return nil, fmt.Errorf("kubernetes: TXT query can only be for dns-version: %s", state.QName())
+ }
+ if segs[0] != "dns-version" {
+ return nil, nil
+ }
+ svc := msg.Service{Text: DNSSchemaVersion, TTL: 28800, Key: msg.Path(state.QName(), "coredns")}
+ return []msg.Service{svc}, nil
+
+ case dns.TypeNS:
+ // We can only get here if the qname equal the zone, see ServeDNS in handler.go.
+ ns := k.nsAddr()
+ svc := msg.Service{Host: ns.A.String(), Key: msg.Path(state.QName(), "coredns")}
+ return []msg.Service{svc}, nil
+ }
+
+ if state.QType() == dns.TypeA && isDefaultNS(state.Name(), state.Zone) {
+ // If this is an A request for "ns.dns", respond with a "fake" record for coredns.
+ // SOA records always use this hardcoded name
+ ns := k.nsAddr()
+ svc := msg.Service{Host: ns.A.String(), Key: msg.Path(state.QName(), "coredns")}
+ return []msg.Service{svc}, nil
+ }
+
+ s, e := k.Records(state, false)
+
+ // SRV for external services is not yet implemented, so remove those records.
+
+ if state.QType() != dns.TypeSRV {
+ return s, e
+ }
+
+ internal := []msg.Service{}
+ for _, svc := range s {
+ if t, _ := svc.HostType(); t != dns.TypeCNAME {
+ internal = append(internal, svc)
+ }
+ }
+
+ return internal, e
+}
+
+// primaryZone will return the first non-reverse zone being handled by this plugin
+func (k *Kubernetes) primaryZone() string { return k.Zones[k.primaryZoneIndex] }
+
+// Lookup implements the ServiceBackend interface.
+func (k *Kubernetes) Lookup(state request.Request, name string, typ uint16) (*dns.Msg, error) {
+ return k.Proxy.Lookup(state, name, typ)
+}
+
+// IsNameError implements the ServiceBackend interface.
+func (k *Kubernetes) IsNameError(err error) bool {
+ return err == errNoItems || err == errNsNotExposed || err == errInvalidRequest
+}
+
+func (k *Kubernetes) getClientConfig() (*rest.Config, error) {
+ loadingRules := &clientcmd.ClientConfigLoadingRules{}
+ overrides := &clientcmd.ConfigOverrides{}
+ clusterinfo := clientcmdapi.Cluster{}
+ authinfo := clientcmdapi.AuthInfo{}
+
+ if len(k.APIServerList) == 0 {
+ cc, err := rest.InClusterConfig()
+ if err != nil {
+ return nil, err
+ }
+ return cc, err
+ }
+
+ endpoint := k.APIServerList[0]
+ if len(k.APIServerList) > 1 {
+ // Use a random port for api proxy, will get the value later through listener.Addr()
+ listener, err := net.Listen("tcp", "127.0.0.1:0")
+ if err != nil {
+ return nil, fmt.Errorf("failed to create kubernetes api proxy: %v", err)
+ }
+ k.APIProxy = &apiProxy{
+ listener: listener,
+ handler: proxyHandler{
+ HealthCheck: healthcheck.HealthCheck{
+ FailTimeout: 3 * time.Second,
+ MaxFails: 1,
+ Future: 10 * time.Second,
+ Path: "/",
+ Interval: 5 * time.Second,
+ },
+ },
+ }
+ k.APIProxy.handler.Hosts = make([]*healthcheck.UpstreamHost, len(k.APIServerList))
+ for i, entry := range k.APIServerList {
+
+ uh := &healthcheck.UpstreamHost{
+ Name: strings.TrimPrefix(entry, "http://"),
+
+ CheckDown: func(upstream *proxyHandler) healthcheck.UpstreamHostDownFunc {
+ return func(uh *healthcheck.UpstreamHost) bool {
+
+ down := false
+
+ uh.CheckMu.Lock()
+ until := uh.OkUntil
+ uh.CheckMu.Unlock()
+
+ if !until.IsZero() && time.Now().After(until) {
+ down = true
+ }
+
+ fails := atomic.LoadInt32(&uh.Fails)
+ if fails >= upstream.MaxFails && upstream.MaxFails != 0 {
+ down = true
+ }
+ return down
+ }
+ }(&k.APIProxy.handler),
+ }
+
+ k.APIProxy.handler.Hosts[i] = uh
+ }
+ k.APIProxy.Handler = &k.APIProxy.handler
+
+ // Find the random port used for api proxy
+ endpoint = fmt.Sprintf("http://%s", listener.Addr())
+ }
+ clusterinfo.Server = endpoint
+
+ if len(k.APICertAuth) > 0 {
+ clusterinfo.CertificateAuthority = k.APICertAuth
+ }
+ if len(k.APIClientCert) > 0 {
+ authinfo.ClientCertificate = k.APIClientCert
+ }
+ if len(k.APIClientKey) > 0 {
+ authinfo.ClientKey = k.APIClientKey
+ }
+
+ overrides.ClusterInfo = clusterinfo
+ overrides.AuthInfo = authinfo
+ clientConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loadingRules, overrides)
+
+ return clientConfig.ClientConfig()
+}
+
+// initKubeCache initializes a new Kubernetes cache.
+func (k *Kubernetes) initKubeCache(opts dnsControlOpts) (err error) {
+
+ config, err := k.getClientConfig()
+ if err != nil {
+ return err
+ }
+
+ kubeClient, err := kubernetes.NewForConfig(config)
+ if err != nil {
+ return fmt.Errorf("failed to create kubernetes notification controller: %q", err)
+ }
+
+ if opts.labelSelector != nil {
+ var selector labels.Selector
+ selector, err = unversionedapi.LabelSelectorAsSelector(opts.labelSelector)
+ if err != nil {
+ return fmt.Errorf("unable to create Selector for LabelSelector '%s': %q", opts.labelSelector, err)
+ }
+ opts.selector = &selector
+ }
+
+ opts.initPodCache = k.podMode == podModeVerified
+
+ k.APIConn = newdnsController(kubeClient, opts)
+
+ return err
+}
+
+// Records looks up services in kubernetes.
+func (k *Kubernetes) Records(state request.Request, exact bool) ([]msg.Service, error) {
+ r, e := parseRequest(state)
+ if e != nil {
+ return nil, e
+ }
+
+ if !wildcard(r.namespace) && !k.namespaceExposed(r.namespace) {
+ return nil, errNsNotExposed
+ }
+
+ if r.podOrSvc == Pod {
+ pods, err := k.findPods(r, state.Zone)
+ return pods, err
+ }
+
+ services, err := k.findServices(r, state.Zone)
+ return services, err
+}
+
+func endpointHostname(addr api.EndpointAddress) string {
+ if addr.Hostname != "" {
+ return strings.ToLower(addr.Hostname)
+ }
+ if strings.Contains(addr.IP, ".") {
+ return strings.Replace(addr.IP, ".", "-", -1)
+ }
+ if strings.Contains(addr.IP, ":") {
+ return strings.ToLower(strings.Replace(addr.IP, ":", "-", -1))
+ }
+ return ""
+}
+
+func (k *Kubernetes) findPods(r recordRequest, zone string) (pods []msg.Service, err error) {
+ if k.podMode == podModeDisabled {
+ return nil, errPodsDisabled
+ }
+
+ namespace := r.namespace
+ podname := r.service
+ zonePath := msg.Path(zone, "coredns")
+ ip := ""
+ err = errNoItems
+
+ if strings.Count(podname, "-") == 3 && !strings.Contains(podname, "--") {
+ ip = strings.Replace(podname, "-", ".", -1)
+ } else {
+ ip = strings.Replace(podname, "-", ":", -1)
+ }
+
+ if k.podMode == podModeInsecure {
+ return []msg.Service{{Key: strings.Join([]string{zonePath, Pod, namespace, podname}, "/"), Host: ip}}, nil
+ }
+
+ // PodModeVerified
+ objList := k.APIConn.PodIndex(ip)
+
+ for _, o := range objList {
+ p, ok := o.(*api.Pod)
+ if !ok {
+ return nil, errAPIBadPodType
+ }
+ // If namespace has a wildcard, filter results against Corefile namespace list.
+ if wildcard(namespace) && !k.namespaceExposed(p.Namespace) {
+ continue
+ }
+ // check for matching ip and namespace
+ if ip == p.Status.PodIP && match(namespace, p.Namespace) {
+ s := msg.Service{Key: strings.Join([]string{zonePath, Pod, namespace, podname}, "/"), Host: ip}
+ pods = append(pods, s)
+
+ err = nil
+ }
+ }
+ return pods, err
+}
+
+// findServices returns the services matching r from the cache.
+func (k *Kubernetes) findServices(r recordRequest, zone string) (services []msg.Service, err error) {
+ serviceList := k.APIConn.ServiceList()
+ zonePath := msg.Path(zone, "coredns")
+ err = errNoItems // Set to errNoItems to signal really nothing found, gets reset when name is matched.
+
+ for _, svc := range serviceList {
+ if !(match(r.namespace, svc.Namespace) && match(r.service, svc.Name)) {
+ continue
+ }
+
+ // If namespace has a wildcard, filter results against Corefile namespace list.
+ // (Namespaces without a wildcard were filtered before the call to this function.)
+ if wildcard(r.namespace) && !k.namespaceExposed(svc.Namespace) {
+ continue
+ }
+
+ // Endpoint query or headless service
+ if svc.Spec.ClusterIP == api.ClusterIPNone || r.endpoint != "" {
+ endpointsList := k.APIConn.EndpointsList()
+ for _, ep := range endpointsList.Items {
+ if ep.ObjectMeta.Name != svc.Name || ep.ObjectMeta.Namespace != svc.Namespace {
+ continue
+ }
+
+ for _, eps := range ep.Subsets {
+ for _, addr := range eps.Addresses {
+
+ // See comments in parse.go parseRequest about the endpoint handling.
+
+ if r.endpoint != "" {
+ if !match(r.endpoint, endpointHostname(addr)) {
+ continue
+ }
+ }
+
+ for _, p := range eps.Ports {
+ if !(match(r.port, p.Name) && match(r.protocol, string(p.Protocol))) {
+ continue
+ }
+ s := msg.Service{Host: addr.IP, Port: int(p.Port), TTL: k.ttl}
+ s.Key = strings.Join([]string{zonePath, Svc, svc.Namespace, svc.Name, endpointHostname(addr)}, "/")
+
+ err = nil
+
+ services = append(services, s)
+ }
+ }
+ }
+ }
+ continue
+ }
+
+ // External service
+ if svc.Spec.ExternalName != "" {
+ s := msg.Service{Key: strings.Join([]string{zonePath, Svc, svc.Namespace, svc.Name}, "/"), Host: svc.Spec.ExternalName, TTL: k.ttl}
+ if t, _ := s.HostType(); t == dns.TypeCNAME {
+ s.Key = strings.Join([]string{zonePath, Svc, svc.Namespace, svc.Name}, "/")
+ services = append(services, s)
+
+ err = nil
+
+ continue
+ }
+ }
+
+ // ClusterIP service
+ for _, p := range svc.Spec.Ports {
+ if !(match(r.port, p.Name) && match(r.protocol, string(p.Protocol))) {
+ continue
+ }
+
+ err = nil
+
+ s := msg.Service{Host: svc.Spec.ClusterIP, Port: int(p.Port), TTL: k.ttl}
+ s.Key = strings.Join([]string{zonePath, Svc, svc.Namespace, svc.Name}, "/")
+
+ services = append(services, s)
+ }
+ }
+ return services, err
+}
+
+// match checks if a and b are equal taking wildcards into account.
+func match(a, b string) bool {
+ if wildcard(a) {
+ return true
+ }
+ if wildcard(b) {
+ return true
+ }
+ return strings.EqualFold(a, b)
+}
+
+// wildcard checks whether s contains a wildcard value defined as "*" or "any".
+func wildcard(s string) bool {
+ return s == "*" || s == "any"
+}
+
+// namespaceExposed returns true when the namespace is exposed.
+func (k *Kubernetes) namespaceExposed(namespace string) bool {
+ _, ok := k.Namespaces[namespace]
+ if len(k.Namespaces) > 0 && !ok {
+ return false
+ }
+ return true
+}
+
+const (
+ // Svc is the DNS schema for kubernetes services
+ Svc = "svc"
+ // Pod is the DNS schema for kubernetes pods
+ Pod = "pod"
+ // defaultTTL to apply to all answers.
+ defaultTTL = 5
+)
diff --git a/plugin/kubernetes/kubernetes_apex_test.go b/plugin/kubernetes/kubernetes_apex_test.go
new file mode 100644
index 000000000..41b70b883
--- /dev/null
+++ b/plugin/kubernetes/kubernetes_apex_test.go
@@ -0,0 +1,68 @@
+package kubernetes
+
+import (
+ "testing"
+
+ "github.com/coredns/coredns/plugin/pkg/dnsrecorder"
+ "github.com/coredns/coredns/plugin/test"
+
+ "github.com/miekg/dns"
+ "golang.org/x/net/context"
+)
+
+var kubeApexCases = [](test.Case){
+ {
+ Qname: "cluster.local.", Qtype: dns.TypeSOA,
+ Rcode: dns.RcodeSuccess,
+ Answer: []dns.RR{
+ test.SOA("cluster.local. 303 IN SOA ns.dns.cluster.local. hostmaster.cluster.local. 1499347823 7200 1800 86400 60"),
+ },
+ },
+ {
+ Qname: "cluster.local.", Qtype: dns.TypeHINFO,
+ Rcode: dns.RcodeSuccess,
+ Ns: []dns.RR{
+ test.SOA("cluster.local. 303 IN SOA ns.dns.cluster.local. hostmaster.cluster.local. 1499347823 7200 1800 86400 60"),
+ },
+ },
+ {
+ Qname: "cluster.local.", Qtype: dns.TypeNS,
+ Rcode: dns.RcodeSuccess,
+ Answer: []dns.RR{
+ test.NS("cluster.local. 303 IN NS ns.dns.cluster.local."),
+ },
+ Extra: []dns.RR{
+ test.A("ns.dns.cluster.local. 303 IN A 127.0.0.1"),
+ },
+ },
+}
+
+func TestServeDNSApex(t *testing.T) {
+
+ k := New([]string{"cluster.local."})
+ k.APIConn = &APIConnServeTest{}
+ k.Next = test.NextHandler(dns.RcodeSuccess, nil)
+ ctx := context.TODO()
+
+ for i, tc := range kubeApexCases {
+ r := tc.Msg()
+
+ w := dnsrecorder.New(&test.ResponseWriter{})
+
+ _, err := k.ServeDNS(ctx, w, r)
+ if err != tc.Error {
+ t.Errorf("Test %d, expected no error, got %v\n", i, err)
+ return
+ }
+ if tc.Error != nil {
+ continue
+ }
+
+ resp := w.Msg
+ if resp == nil {
+ t.Fatalf("Test %d, got nil message and no error ford", i)
+ }
+
+ test.SortAndCheck(t, resp, tc)
+ }
+}
diff --git a/plugin/kubernetes/kubernetes_test.go b/plugin/kubernetes/kubernetes_test.go
new file mode 100644
index 000000000..f347f10fc
--- /dev/null
+++ b/plugin/kubernetes/kubernetes_test.go
@@ -0,0 +1,242 @@
+package kubernetes
+
+import (
+ "testing"
+
+ "github.com/coredns/coredns/plugin"
+ "github.com/coredns/coredns/request"
+
+ "github.com/miekg/dns"
+ "k8s.io/client-go/1.5/pkg/api"
+)
+
+func TestWildcard(t *testing.T) {
+ var tests = []struct {
+ s string
+ expected bool
+ }{
+ {"mynamespace", false},
+ {"*", true},
+ {"any", true},
+ {"my*space", false},
+ {"*space", false},
+ {"myname*", false},
+ }
+
+ for _, te := range tests {
+ got := wildcard(te.s)
+ if got != te.expected {
+ t.Errorf("Expected Wildcard result '%v' for example '%v', got '%v'.", te.expected, te.s, got)
+ }
+ }
+}
+
+func TestEndpointHostname(t *testing.T) {
+ var tests = []struct {
+ ip string
+ hostname string
+ expected string
+ }{
+ {"10.11.12.13", "", "10-11-12-13"},
+ {"10.11.12.13", "epname", "epname"},
+ }
+ for _, test := range tests {
+ result := endpointHostname(api.EndpointAddress{IP: test.ip, Hostname: test.hostname})
+ if result != test.expected {
+ t.Errorf("Expected endpoint name for (ip:%v hostname:%v) to be '%v', but got '%v'", test.ip, test.hostname, test.expected, result)
+ }
+ }
+}
+
+type APIConnServiceTest struct{}
+
+func (APIConnServiceTest) Run() { return }
+func (APIConnServiceTest) Stop() error { return nil }
+func (APIConnServiceTest) PodIndex(string) []interface{} { return nil }
+
+func (APIConnServiceTest) ServiceList() []*api.Service {
+ svcs := []*api.Service{
+ {
+ ObjectMeta: api.ObjectMeta{
+ Name: "svc1",
+ Namespace: "testns",
+ },
+ Spec: api.ServiceSpec{
+ ClusterIP: "10.0.0.1",
+ Ports: []api.ServicePort{{
+ Name: "http",
+ Protocol: "tcp",
+ Port: 80,
+ }},
+ },
+ },
+ {
+ ObjectMeta: api.ObjectMeta{
+ Name: "hdls1",
+ Namespace: "testns",
+ },
+ Spec: api.ServiceSpec{
+ ClusterIP: api.ClusterIPNone,
+ },
+ },
+ {
+ ObjectMeta: api.ObjectMeta{
+ Name: "external",
+ Namespace: "testns",
+ },
+ Spec: api.ServiceSpec{
+ ExternalName: "coredns.io",
+ Ports: []api.ServicePort{{
+ Name: "http",
+ Protocol: "tcp",
+ Port: 80,
+ }},
+ },
+ },
+ }
+ return svcs
+}
+
+func (APIConnServiceTest) EndpointsList() api.EndpointsList {
+ n := "test.node.foo.bar"
+
+ return api.EndpointsList{
+ Items: []api.Endpoints{
+ {
+ Subsets: []api.EndpointSubset{
+ {
+ Addresses: []api.EndpointAddress{
+ {
+ IP: "172.0.0.1",
+ Hostname: "ep1a",
+ },
+ },
+ Ports: []api.EndpointPort{
+ {
+ Port: 80,
+ Protocol: "tcp",
+ Name: "http",
+ },
+ },
+ },
+ },
+ ObjectMeta: api.ObjectMeta{
+ Name: "svc1",
+ Namespace: "testns",
+ },
+ },
+ {
+ Subsets: []api.EndpointSubset{
+ {
+ Addresses: []api.EndpointAddress{
+ {
+ IP: "172.0.0.2",
+ },
+ },
+ Ports: []api.EndpointPort{
+ {
+ Port: 80,
+ Protocol: "tcp",
+ Name: "http",
+ },
+ },
+ },
+ },
+ ObjectMeta: api.ObjectMeta{
+ Name: "hdls1",
+ Namespace: "testns",
+ },
+ },
+ {
+ Subsets: []api.EndpointSubset{
+ {
+ Addresses: []api.EndpointAddress{
+ {
+ IP: "172.0.0.3",
+ },
+ },
+ Ports: []api.EndpointPort{
+ {
+ Port: 80,
+ Protocol: "tcp",
+ Name: "http",
+ },
+ },
+ },
+ },
+ ObjectMeta: api.ObjectMeta{
+ Name: "hdls1",
+ Namespace: "testns",
+ },
+ },
+ {
+ Subsets: []api.EndpointSubset{
+ {
+ Addresses: []api.EndpointAddress{
+ {
+ IP: "10.9.8.7",
+ NodeName: &n,
+ },
+ },
+ },
+ },
+ },
+ },
+ }
+}
+
+func (APIConnServiceTest) GetNodeByName(name string) (api.Node, error) {
+ return api.Node{
+ ObjectMeta: api.ObjectMeta{
+ Name: "test.node.foo.bar",
+ },
+ }, nil
+}
+
+func TestServices(t *testing.T) {
+
+ k := New([]string{"interwebs.test."})
+ k.APIConn = &APIConnServiceTest{}
+
+ type svcAns struct {
+ host string
+ key string
+ }
+ type svcTest struct {
+ qname string
+ qtype uint16
+ answer svcAns
+ }
+ tests := []svcTest{
+ // Cluster IP Services
+ {qname: "svc1.testns.svc.interwebs.test.", qtype: dns.TypeA, answer: svcAns{host: "10.0.0.1", key: "/coredns/test/interwebs/svc/testns/svc1"}},
+ {qname: "_http._tcp.svc1.testns.svc.interwebs.test.", qtype: dns.TypeSRV, answer: svcAns{host: "10.0.0.1", key: "/coredns/test/interwebs/svc/testns/svc1"}},
+ {qname: "ep1a.svc1.testns.svc.interwebs.test.", qtype: dns.TypeA, answer: svcAns{host: "172.0.0.1", key: "/coredns/test/interwebs/svc/testns/svc1/ep1a"}},
+
+ // External Services
+ {qname: "external.testns.svc.interwebs.test.", qtype: dns.TypeCNAME, answer: svcAns{host: "coredns.io", key: "/coredns/test/interwebs/svc/testns/external"}},
+ }
+
+ for i, test := range tests {
+ state := request.Request{
+ Req: &dns.Msg{Question: []dns.Question{{Name: test.qname, Qtype: test.qtype}}},
+ Zone: "interwebs.test.", // must match from k.Zones[0]
+ }
+ svcs, e := k.Services(state, false, plugin.Options{})
+ if e != nil {
+ t.Errorf("Test %d: got error '%v'", i, e)
+ continue
+ }
+ if len(svcs) != 1 {
+ t.Errorf("Test %d, expected expected 1 answer, got %v", i, len(svcs))
+ continue
+ }
+
+ if test.answer.host != svcs[0].Host {
+ t.Errorf("Test %d, expected host '%v', got '%v'", i, test.answer.host, svcs[0].Host)
+ }
+ if test.answer.key != svcs[0].Key {
+ t.Errorf("Test %d, expected key '%v', got '%v'", i, test.answer.key, svcs[0].Key)
+ }
+ }
+}
diff --git a/plugin/kubernetes/local.go b/plugin/kubernetes/local.go
new file mode 100644
index 000000000..e5b7f1e0f
--- /dev/null
+++ b/plugin/kubernetes/local.go
@@ -0,0 +1,40 @@
+package kubernetes
+
+import "net"
+
+func localPodIP() net.IP {
+ addrs, err := net.InterfaceAddrs()
+ if err != nil {
+ return nil
+ }
+
+ for _, addr := range addrs {
+ ip, _, _ := net.ParseCIDR(addr.String())
+ ip = ip.To4()
+ if ip == nil || ip.IsLoopback() {
+ continue
+ }
+ return ip
+ }
+ return nil
+}
+
+func (k *Kubernetes) localNodeName() string {
+ localIP := k.interfaceAddrsFunc()
+ if localIP == nil {
+ return ""
+ }
+
+ // Find endpoint matching localIP
+ endpointsList := k.APIConn.EndpointsList()
+ for _, ep := range endpointsList.Items {
+ for _, eps := range ep.Subsets {
+ for _, addr := range eps.Addresses {
+ if localIP.Equal(net.ParseIP(addr.IP)) {
+ return *addr.NodeName
+ }
+ }
+ }
+ }
+ return ""
+}
diff --git a/plugin/kubernetes/ns.go b/plugin/kubernetes/ns.go
new file mode 100644
index 000000000..4cacc382f
--- /dev/null
+++ b/plugin/kubernetes/ns.go
@@ -0,0 +1,65 @@
+package kubernetes
+
+import (
+ "net"
+ "strings"
+
+ "github.com/miekg/dns"
+ "k8s.io/client-go/1.5/pkg/api"
+)
+
+func isDefaultNS(name, zone string) bool {
+ return strings.Index(name, defaultNSName) == 0 && strings.Index(name, zone) == len(defaultNSName)
+}
+
+func (k *Kubernetes) nsAddr() *dns.A {
+ var (
+ svcName string
+ svcNamespace string
+ )
+
+ rr := new(dns.A)
+ localIP := k.interfaceAddrsFunc()
+ endpointsList := k.APIConn.EndpointsList()
+
+ rr.A = localIP
+
+FindEndpoint:
+ for _, ep := range endpointsList.Items {
+ for _, eps := range ep.Subsets {
+ for _, addr := range eps.Addresses {
+ if localIP.Equal(net.ParseIP(addr.IP)) {
+ svcNamespace = ep.ObjectMeta.Namespace
+ svcName = ep.ObjectMeta.Name
+ break FindEndpoint
+ }
+ }
+ }
+ }
+
+ if len(svcName) == 0 {
+ rr.Hdr.Name = defaultNSName
+ rr.A = localIP
+ return rr
+ }
+ // Find service to get ClusterIP
+ serviceList := k.APIConn.ServiceList()
+
+FindService:
+ for _, svc := range serviceList {
+ if svcName == svc.Name && svcNamespace == svc.Namespace {
+ if svc.Spec.ClusterIP == api.ClusterIPNone {
+ rr.A = localIP
+ } else {
+ rr.A = net.ParseIP(svc.Spec.ClusterIP)
+ }
+ break FindService
+ }
+ }
+
+ rr.Hdr.Name = strings.Join([]string{svcName, svcNamespace, "svc."}, ".")
+
+ return rr
+}
+
+const defaultNSName = "ns.dns."
diff --git a/plugin/kubernetes/ns_test.go b/plugin/kubernetes/ns_test.go
new file mode 100644
index 000000000..8e9e80c71
--- /dev/null
+++ b/plugin/kubernetes/ns_test.go
@@ -0,0 +1,69 @@
+package kubernetes
+
+import (
+ "testing"
+
+ "k8s.io/client-go/1.5/pkg/api"
+)
+
+type APIConnTest struct{}
+
+func (APIConnTest) Run() { return }
+func (APIConnTest) Stop() error { return nil }
+func (APIConnTest) PodIndex(string) []interface{} { return nil }
+
+func (APIConnTest) ServiceList() []*api.Service {
+ svc := api.Service{
+ ObjectMeta: api.ObjectMeta{
+ Name: "dns-service",
+ Namespace: "kube-system",
+ },
+ Spec: api.ServiceSpec{
+ ClusterIP: "10.0.0.111",
+ },
+ }
+
+ return []*api.Service{&svc}
+
+}
+
+func (APIConnTest) EndpointsList() api.EndpointsList {
+ return api.EndpointsList{
+ Items: []api.Endpoints{
+ {
+ Subsets: []api.EndpointSubset{
+ {
+ Addresses: []api.EndpointAddress{
+ {
+ IP: "127.0.0.1",
+ },
+ },
+ },
+ },
+ ObjectMeta: api.ObjectMeta{
+ Name: "dns-service",
+ Namespace: "kube-system",
+ },
+ },
+ },
+ }
+}
+
+func (APIConnTest) GetNodeByName(name string) (api.Node, error) { return api.Node{}, nil }
+
+func TestNsAddr(t *testing.T) {
+
+ k := New([]string{"inter.webs.test."})
+ k.APIConn = &APIConnTest{}
+
+ cdr := k.nsAddr()
+ expected := "10.0.0.111"
+
+ if cdr.A.String() != expected {
+ t.Errorf("Expected A to be %q, got %q", expected, cdr.A.String())
+ }
+ expected = "dns-service.kube-system.svc."
+ if cdr.Hdr.Name != expected {
+ t.Errorf("Expected Hdr.Name to be %q, got %q", expected, cdr.Hdr.Name)
+ }
+}
diff --git a/plugin/kubernetes/parse.go b/plugin/kubernetes/parse.go
new file mode 100644
index 000000000..a66e77699
--- /dev/null
+++ b/plugin/kubernetes/parse.go
@@ -0,0 +1,112 @@
+package kubernetes
+
+import (
+ "github.com/coredns/coredns/plugin/pkg/dnsutil"
+ "github.com/coredns/coredns/request"
+
+ "github.com/miekg/dns"
+)
+
+type recordRequest struct {
+ // The named port from the kubernetes DNS spec, this is the service part (think _https) from a well formed
+ // SRV record.
+ port string
+ // The protocol is usually _udp or _tcp (if set), and comes from the protocol part of a well formed
+ // SRV record.
+ protocol string
+ endpoint string
+ // The servicename used in Kubernetes.
+ service string
+ // The namespace used in Kubernetes.
+ namespace string
+ // A each name can be for a pod or a service, here we track what we've seen, either "pod" or "service".
+ podOrSvc string
+}
+
+// parseRequest parses the qname to find all the elements we need for querying k8s. Anything
+// that is not parsed will have the wildcard "*" value (except r.endpoint).
+// Potential underscores are stripped from _port and _protocol.
+func parseRequest(state request.Request) (r recordRequest, err error) {
+ // 3 Possible cases:
+ // 1. _port._protocol.service.namespace.pod|svc.zone
+ // 2. (endpoint): endpoint.service.namespace.pod|svc.zone
+ // 3. (service): service.namespace.pod|svc.zone
+ //
+ // Federations are handled in the federation plugin. And aren't parsed here.
+
+ base, _ := dnsutil.TrimZone(state.Name(), state.Zone)
+ segs := dns.SplitDomainName(base)
+
+ r.port = "*"
+ r.protocol = "*"
+ r.service = "*"
+ r.namespace = "*"
+ // r.endpoint is the odd one out, we need to know if it has been set or not. If it is
+ // empty we should skip the endpoint check in k.get(). Hence we cannot set if to "*".
+
+ // start at the right and fill out recordRequest with the bits we find, so we look for
+ // pod|svc.namespace.service and then either
+ // * endpoint
+ // *_protocol._port
+
+ last := len(segs) - 1
+ if last < 0 {
+ return r, nil
+ }
+ r.podOrSvc = segs[last]
+ if r.podOrSvc != Pod && r.podOrSvc != Svc {
+ return r, errInvalidRequest
+ }
+ last--
+ if last < 0 {
+ return r, nil
+ }
+
+ r.namespace = segs[last]
+ last--
+ if last < 0 {
+ return r, nil
+ }
+
+ r.service = segs[last]
+ last--
+ if last < 0 {
+ return r, nil
+ }
+
+ // Because of ambiquity we check the labels left: 1: an endpoint. 2: port and protocol.
+ // Anything else is a query that is too long to answer and can safely be delegated to return an nxdomain.
+ switch last {
+
+ case 0: // endpoint only
+ r.endpoint = segs[last]
+ case 1: // service and port
+ r.protocol = stripUnderscore(segs[last])
+ r.port = stripUnderscore(segs[last-1])
+
+ default: // too long
+ return r, errInvalidRequest
+ }
+
+ return r, nil
+}
+
+// stripUnderscore removes a prefixed underscore from s.
+func stripUnderscore(s string) string {
+ if s[0] != '_' {
+ return s
+ }
+ return s[1:]
+}
+
+// String return a string representation of r, it just returns all fields concatenated with dots.
+// This is mostly used in tests.
+func (r recordRequest) String() string {
+ s := r.port
+ s += "." + r.protocol
+ s += "." + r.endpoint
+ s += "." + r.service
+ s += "." + r.namespace
+ s += "." + r.podOrSvc
+ return s
+}
diff --git a/plugin/kubernetes/parse_test.go b/plugin/kubernetes/parse_test.go
new file mode 100644
index 000000000..06d5a2aaa
--- /dev/null
+++ b/plugin/kubernetes/parse_test.go
@@ -0,0 +1,56 @@
+package kubernetes
+
+import (
+ "testing"
+
+ "github.com/coredns/coredns/request"
+
+ "github.com/miekg/dns"
+)
+
+func TestParseRequest(t *testing.T) {
+ tests := []struct {
+ query string
+ expected string // output from r.String()
+ }{
+ // valid SRV request
+ {"_http._tcp.webs.mynamespace.svc.inter.webs.test.", "http.tcp..webs.mynamespace.svc"},
+ // wildcard acceptance
+ {"*.any.*.any.svc.inter.webs.test.", "*.any..*.any.svc"},
+ // A request of endpoint
+ {"1-2-3-4.webs.mynamespace.svc.inter.webs.test.", "*.*.1-2-3-4.webs.mynamespace.svc"},
+ }
+ for i, tc := range tests {
+ m := new(dns.Msg)
+ m.SetQuestion(tc.query, dns.TypeA)
+ state := request.Request{Zone: zone, Req: m}
+
+ r, e := parseRequest(state)
+ if e != nil {
+ t.Errorf("Test %d, expected no error, got '%v'.", i, e)
+ }
+ rs := r.String()
+ if rs != tc.expected {
+ t.Errorf("Test %d, expected (stringyfied) recordRequest: %s, got %s", i, tc.expected, rs)
+ }
+ }
+}
+
+func TestParseInvalidRequest(t *testing.T) {
+ invalid := []string{
+ "webs.mynamespace.pood.inter.webs.test.", // Request must be for pod or svc subdomain.
+ "too.long.for.what.I.am.trying.to.pod.inter.webs.tests.", // Too long.
+ }
+
+ for i, query := range invalid {
+ m := new(dns.Msg)
+ m.SetQuestion(query, dns.TypeA)
+ state := request.Request{Zone: zone, Req: m}
+
+ if _, e := parseRequest(state); e == nil {
+ t.Errorf("Test %d: expected error from %s, got none", i, query)
+ }
+ }
+}
+
+const zone = "intern.webs.tests."
diff --git a/plugin/kubernetes/reverse.go b/plugin/kubernetes/reverse.go
new file mode 100644
index 000000000..0143b721a
--- /dev/null
+++ b/plugin/kubernetes/reverse.go
@@ -0,0 +1,55 @@
+package kubernetes
+
+import (
+ "strings"
+
+ "github.com/coredns/coredns/plugin"
+ "github.com/coredns/coredns/plugin/etcd/msg"
+ "github.com/coredns/coredns/plugin/pkg/dnsutil"
+ "github.com/coredns/coredns/request"
+)
+
+// Reverse implements the ServiceBackend interface.
+func (k *Kubernetes) Reverse(state request.Request, exact bool, opt plugin.Options) ([]msg.Service, error) {
+
+ ip := dnsutil.ExtractAddressFromReverse(state.Name())
+ if ip == "" {
+ return nil, nil
+ }
+
+ records := k.serviceRecordForIP(ip, state.Name())
+ return records, nil
+}
+
+// serviceRecordForIP gets a service record with a cluster ip matching the ip argument
+// If a service cluster ip does not match, it checks all endpoints
+func (k *Kubernetes) serviceRecordForIP(ip, name string) []msg.Service {
+ // First check services with cluster ips
+ svcList := k.APIConn.ServiceList()
+
+ for _, service := range svcList {
+ if (len(k.Namespaces) > 0) && !k.namespaceExposed(service.Namespace) {
+ continue
+ }
+ if service.Spec.ClusterIP == ip {
+ domain := strings.Join([]string{service.Name, service.Namespace, Svc, k.primaryZone()}, ".")
+ return []msg.Service{{Host: domain}}
+ }
+ }
+ // If no cluster ips match, search endpoints
+ epList := k.APIConn.EndpointsList()
+ for _, ep := range epList.Items {
+ if (len(k.Namespaces) > 0) && !k.namespaceExposed(ep.ObjectMeta.Namespace) {
+ continue
+ }
+ for _, eps := range ep.Subsets {
+ for _, addr := range eps.Addresses {
+ if addr.IP == ip {
+ domain := strings.Join([]string{endpointHostname(addr), ep.ObjectMeta.Name, ep.ObjectMeta.Namespace, Svc, k.primaryZone()}, ".")
+ return []msg.Service{{Host: domain}}
+ }
+ }
+ }
+ }
+ return nil
+}
diff --git a/plugin/kubernetes/reverse_test.go b/plugin/kubernetes/reverse_test.go
new file mode 100644
index 000000000..aa9d09585
--- /dev/null
+++ b/plugin/kubernetes/reverse_test.go
@@ -0,0 +1,125 @@
+package kubernetes
+
+import (
+ "testing"
+
+ "github.com/coredns/coredns/plugin/pkg/dnsrecorder"
+ "github.com/coredns/coredns/plugin/test"
+
+ "github.com/miekg/dns"
+ "golang.org/x/net/context"
+ "k8s.io/client-go/1.5/pkg/api"
+)
+
+type APIConnReverseTest struct{}
+
+func (APIConnReverseTest) Run() { return }
+func (APIConnReverseTest) Stop() error { return nil }
+func (APIConnReverseTest) PodIndex(string) []interface{} { return nil }
+
+func (APIConnReverseTest) ServiceList() []*api.Service {
+ svcs := []*api.Service{
+ {
+ ObjectMeta: api.ObjectMeta{
+ Name: "svc1",
+ Namespace: "testns",
+ },
+ Spec: api.ServiceSpec{
+ ClusterIP: "192.168.1.100",
+ Ports: []api.ServicePort{{
+ Name: "http",
+ Protocol: "tcp",
+ Port: 80,
+ }},
+ },
+ },
+ }
+ return svcs
+}
+
+func (APIConnReverseTest) EndpointsList() api.EndpointsList {
+ return api.EndpointsList{
+ Items: []api.Endpoints{
+ {
+ Subsets: []api.EndpointSubset{
+ {
+ Addresses: []api.EndpointAddress{
+ {
+ IP: "10.0.0.100",
+ Hostname: "ep1a",
+ },
+ },
+ Ports: []api.EndpointPort{
+ {
+ Port: 80,
+ Protocol: "tcp",
+ Name: "http",
+ },
+ },
+ },
+ },
+ ObjectMeta: api.ObjectMeta{
+ Name: "svc1",
+ Namespace: "testns",
+ },
+ },
+ },
+ }
+}
+
+func (APIConnReverseTest) GetNodeByName(name string) (api.Node, error) {
+ return api.Node{
+ ObjectMeta: api.ObjectMeta{
+ Name: "test.node.foo.bar",
+ },
+ }, nil
+}
+
+func TestReverse(t *testing.T) {
+
+ k := New([]string{"cluster.local.", "0.10.in-addr.arpa."})
+ k.APIConn = &APIConnReverseTest{}
+
+ tests := []test.Case{
+ {
+ Qname: "100.0.0.10.in-addr.arpa.", Qtype: dns.TypePTR,
+ Rcode: dns.RcodeSuccess,
+ Answer: []dns.RR{
+ test.PTR("100.0.0.10.in-addr.arpa. 303 IN PTR ep1a.svc1.testns.svc.cluster.local."),
+ },
+ },
+ {
+ Qname: "101.0.0.10.in-addr.arpa.", Qtype: dns.TypePTR,
+ Rcode: dns.RcodeSuccess,
+ Ns: []dns.RR{
+ test.SOA("0.10.in-addr.arpa. 300 IN SOA ns.dns.0.10.in-addr.arpa. hostmaster.0.10.in-addr.arpa. 1502782828 7200 1800 86400 60"),
+ },
+ },
+ {
+ Qname: "example.org.cluster.local.", Qtype: dns.TypePTR,
+ Rcode: dns.RcodeSuccess,
+ Ns: []dns.RR{
+ test.SOA("cluster.local. 300 IN SOA ns.dns.cluster.local. hostmaster.cluster.local. 1502989566 7200 1800 86400 60"),
+ },
+ },
+ }
+
+ ctx := context.TODO()
+ for i, tc := range tests {
+ r := tc.Msg()
+
+ w := dnsrecorder.New(&test.ResponseWriter{})
+
+ _, err := k.ServeDNS(ctx, w, r)
+ if err != tc.Error {
+ t.Errorf("Test %d: expected no error, got %v", i, err)
+ return
+ }
+
+ resp := w.Msg
+ if resp == nil {
+ t.Fatalf("Test %d: got nil message and no error for: %s %d", i, r.Question[0].Name, r.Question[0].Qtype)
+ }
+ test.SortAndCheck(t, resp, tc)
+ }
+}
diff --git a/plugin/kubernetes/setup.go b/plugin/kubernetes/setup.go
new file mode 100644
index 000000000..e60239d42
--- /dev/null
+++ b/plugin/kubernetes/setup.go
@@ -0,0 +1,208 @@
+package kubernetes
+
+import (
+ "errors"
+ "fmt"
+ "strconv"
+ "strings"
+ "time"
+
+ "github.com/coredns/coredns/core/dnsserver"
+ "github.com/coredns/coredns/plugin"
+ "github.com/coredns/coredns/plugin/pkg/dnsutil"
+ "github.com/coredns/coredns/plugin/proxy"
+ "github.com/miekg/dns"
+
+ "github.com/mholt/caddy"
+ unversionedapi "k8s.io/client-go/1.5/pkg/api/unversioned"
+)
+
+func init() {
+ caddy.RegisterPlugin("kubernetes", caddy.Plugin{
+ ServerType: "dns",
+ Action: setup,
+ })
+}
+
+func setup(c *caddy.Controller) error {
+ kubernetes, initOpts, err := kubernetesParse(c)
+ if err != nil {
+ return plugin.Error("kubernetes", err)
+ }
+
+ err = kubernetes.initKubeCache(initOpts)
+ if err != nil {
+ return plugin.Error("kubernetes", err)
+ }
+
+ // Register KubeCache start and stop functions with Caddy
+ c.OnStartup(func() error {
+ go kubernetes.APIConn.Run()
+ if kubernetes.APIProxy != nil {
+ go kubernetes.APIProxy.Run()
+ }
+ return nil
+ })
+
+ c.OnShutdown(func() error {
+ if kubernetes.APIProxy != nil {
+ kubernetes.APIProxy.Stop()
+ }
+ return kubernetes.APIConn.Stop()
+ })
+
+ dnsserver.GetConfig(c).AddPlugin(func(next plugin.Handler) plugin.Handler {
+ kubernetes.Next = next
+ return kubernetes
+ })
+
+ return nil
+}
+
+func kubernetesParse(c *caddy.Controller) (*Kubernetes, dnsControlOpts, error) {
+ k8s := New([]string{""})
+ k8s.interfaceAddrsFunc = localPodIP
+ k8s.autoPathSearch = searchFromResolvConf()
+
+ opts := dnsControlOpts{
+ resyncPeriod: defaultResyncPeriod,
+ }
+
+ for c.Next() {
+ zones := c.RemainingArgs()
+
+ if len(zones) != 0 {
+ k8s.Zones = zones
+ for i := 0; i < len(k8s.Zones); i++ {
+ k8s.Zones[i] = plugin.Host(k8s.Zones[i]).Normalize()
+ }
+ } else {
+ k8s.Zones = make([]string, len(c.ServerBlockKeys))
+ for i := 0; i < len(c.ServerBlockKeys); i++ {
+ k8s.Zones[i] = plugin.Host(c.ServerBlockKeys[i]).Normalize()
+ }
+ }
+
+ k8s.primaryZoneIndex = -1
+ for i, z := range k8s.Zones {
+ if strings.HasSuffix(z, "in-addr.arpa.") || strings.HasSuffix(z, "ip6.arpa.") {
+ continue
+ }
+ k8s.primaryZoneIndex = i
+ break
+ }
+
+ if k8s.primaryZoneIndex == -1 {
+ return nil, opts, errors.New("non-reverse zone name must be used")
+ }
+
+ for c.NextBlock() {
+ switch c.Val() {
+ case "pods":
+ args := c.RemainingArgs()
+ if len(args) == 1 {
+ switch args[0] {
+ case podModeDisabled, podModeInsecure, podModeVerified:
+ k8s.podMode = args[0]
+ default:
+ return nil, opts, fmt.Errorf("wrong value for pods: %s, must be one of: disabled, verified, insecure", args[0])
+ }
+ continue
+ }
+ return nil, opts, c.ArgErr()
+ case "namespaces":
+ args := c.RemainingArgs()
+ if len(args) > 0 {
+ for _, a := range args {
+ k8s.Namespaces[a] = true
+ }
+ continue
+ }
+ return nil, opts, c.ArgErr()
+ case "endpoint":
+ args := c.RemainingArgs()
+ if len(args) > 0 {
+ for _, endpoint := range strings.Split(args[0], ",") {
+ k8s.APIServerList = append(k8s.APIServerList, strings.TrimSpace(endpoint))
+ }
+ continue
+ }
+ return nil, opts, c.ArgErr()
+ case "tls": // cert key cacertfile
+ args := c.RemainingArgs()
+ if len(args) == 3 {
+ k8s.APIClientCert, k8s.APIClientKey, k8s.APICertAuth = args[0], args[1], args[2]
+ continue
+ }
+ return nil, opts, c.ArgErr()
+ case "resyncperiod":
+ args := c.RemainingArgs()
+ if len(args) > 0 {
+ rp, err := time.ParseDuration(args[0])
+ if err != nil {
+ return nil, opts, fmt.Errorf("unable to parse resync duration value: '%v': %v", args[0], err)
+ }
+ opts.resyncPeriod = rp
+ continue
+ }
+ return nil, opts, c.ArgErr()
+ case "labels":
+ args := c.RemainingArgs()
+ if len(args) > 0 {
+ labelSelectorString := strings.Join(args, " ")
+ ls, err := unversionedapi.ParseToLabelSelector(labelSelectorString)
+ if err != nil {
+ return nil, opts, fmt.Errorf("unable to parse label selector value: '%v': %v", labelSelectorString, err)
+ }
+ opts.labelSelector = ls
+ continue
+ }
+ return nil, opts, c.ArgErr()
+ case "fallthrough":
+ args := c.RemainingArgs()
+ if len(args) == 0 {
+ k8s.Fallthrough = true
+ continue
+ }
+ return nil, opts, c.ArgErr()
+ case "upstream":
+ args := c.RemainingArgs()
+ if len(args) == 0 {
+ return nil, opts, c.ArgErr()
+ }
+ ups, err := dnsutil.ParseHostPortOrFile(args...)
+ if err != nil {
+ return nil, opts, err
+ }
+ k8s.Proxy = proxy.NewLookup(ups)
+ case "ttl":
+ args := c.RemainingArgs()
+ if len(args) == 0 {
+ return nil, opts, c.ArgErr()
+ }
+ t, err := strconv.Atoi(args[0])
+ if err != nil {
+ return nil, opts, err
+ }
+ if t < 5 || t > 3600 {
+ return nil, opts, c.Errf("ttl must be in range [5, 3600]: %d", t)
+ }
+ k8s.ttl = uint32(t)
+ default:
+ return nil, opts, c.Errf("unknown property '%s'", c.Val())
+ }
+ }
+ }
+ return k8s, opts, nil
+}
+
+func searchFromResolvConf() []string {
+ rc, err := dns.ClientConfigFromFile("/etc/resolv.conf")
+ if err != nil {
+ return nil
+ }
+ plugin.Zones(rc.Search).Normalize()
+ return rc.Search
+}
+
+const defaultResyncPeriod = 5 * time.Minute
diff --git a/plugin/kubernetes/setup_reverse_test.go b/plugin/kubernetes/setup_reverse_test.go
new file mode 100644
index 000000000..ed51a7410
--- /dev/null
+++ b/plugin/kubernetes/setup_reverse_test.go
@@ -0,0 +1,35 @@
+package kubernetes
+
+import (
+ "testing"
+
+ "github.com/mholt/caddy"
+)
+
+func TestKubernetesParseReverseZone(t *testing.T) {
+ tests := []struct {
+ input string // Corefile data as string
+ expectedZones []string // expected count of defined zones.
+ }{
+ {`kubernetes coredns.local 10.0.0.0/16`, []string{"coredns.local.", "0.10.in-addr.arpa."}},
+ {`kubernetes coredns.local 10.0.0.0/17`, []string{"coredns.local.", "10.0.0.0/17."}},
+ }
+
+ for i, tc := range tests {
+ c := caddy.NewTestController("dns", tc.input)
+ k, _, err := kubernetesParse(c)
+ if err != nil {
+ t.Fatalf("Test %d: Expected no error, got %q", i, err)
+ }
+
+ zl := len(k.Zones)
+ if zl != len(tc.expectedZones) {
+ t.Errorf("Test %d: Expected kubernetes to be initialized with %d zones, found %d zones", i, len(tc.expectedZones), zl)
+ }
+ for i, z := range tc.expectedZones {
+ if k.Zones[i] != z {
+ t.Errorf("Test %d: Expected zones to be %q, got %q", i, z, k.Zones[i])
+ }
+ }
+ }
+}
diff --git a/plugin/kubernetes/setup_test.go b/plugin/kubernetes/setup_test.go
new file mode 100644
index 000000000..2fdc38a9c
--- /dev/null
+++ b/plugin/kubernetes/setup_test.go
@@ -0,0 +1,473 @@
+package kubernetes
+
+import (
+ "strings"
+ "testing"
+ "time"
+
+ "github.com/mholt/caddy"
+ "k8s.io/client-go/1.5/pkg/api/unversioned"
+)
+
+func TestKubernetesParse(t *testing.T) {
+ tests := []struct {
+ input string // Corefile data as string
+ shouldErr bool // true if test case is exected to produce an error.
+ expectedErrContent string // substring from the expected error. Empty for positive cases.
+ expectedZoneCount int // expected count of defined zones.
+ expectedNSCount int // expected count of namespaces.
+ expectedResyncPeriod time.Duration // expected resync period value
+ expectedLabelSelector string // expected label selector value
+ expectedPodMode string
+ expectedFallthrough bool
+ expectedUpstreams []string
+ }{
+ // positive
+ {
+ `kubernetes coredns.local`,
+ false,
+ "",
+ 1,
+ 0,
+ defaultResyncPeriod,
+ "",
+ podModeDisabled,
+ false,
+ nil,
+ },
+ {
+ `kubernetes coredns.local test.local`,
+ false,
+ "",
+ 2,
+ 0,
+ defaultResyncPeriod,
+ "",
+ podModeDisabled,
+ false,
+ nil,
+ },
+ {
+ `kubernetes coredns.local {
+}`,
+ false,
+ "",
+ 1,
+ 0,
+ defaultResyncPeriod,
+ "",
+ podModeDisabled,
+ false,
+ nil,
+ },
+ {
+ `kubernetes coredns.local {
+ endpoint http://localhost:9090
+}`,
+ false,
+ "",
+ 1,
+ 0,
+ defaultResyncPeriod,
+ "",
+ podModeDisabled,
+ false,
+ nil,
+ },
+ {
+ `kubernetes coredns.local {
+ namespaces demo
+}`,
+ false,
+ "",
+ 1,
+ 1,
+ defaultResyncPeriod,
+ "",
+ podModeDisabled,
+ false,
+ nil,
+ },
+ {
+ `kubernetes coredns.local {
+ namespaces demo test
+}`,
+ false,
+ "",
+ 1,
+ 2,
+ defaultResyncPeriod,
+ "",
+ podModeDisabled,
+ false,
+ nil,
+ },
+ {
+ `kubernetes coredns.local {
+ resyncperiod 30s
+}`,
+ false,
+ "",
+ 1,
+ 0,
+ 30 * time.Second,
+ "",
+ podModeDisabled,
+ false,
+ nil,
+ },
+ {
+ `kubernetes coredns.local {
+ resyncperiod 15m
+}`,
+ false,
+ "",
+ 1,
+ 0,
+ 15 * time.Minute,
+ "",
+ podModeDisabled,
+ false,
+ nil,
+ },
+ {
+ `kubernetes coredns.local {
+ labels environment=prod
+}`,
+ false,
+ "",
+ 1,
+ 0,
+ defaultResyncPeriod,
+ "environment=prod",
+ podModeDisabled,
+ false,
+ nil,
+ },
+ {
+ `kubernetes coredns.local {
+ labels environment in (production, staging, qa),application=nginx
+}`,
+ false,
+ "",
+ 1,
+ 0,
+ defaultResyncPeriod,
+ "application=nginx,environment in (production,qa,staging)",
+ podModeDisabled,
+ false,
+ nil,
+ },
+ {
+ `kubernetes coredns.local test.local {
+ resyncperiod 15m
+ endpoint http://localhost:8080
+ namespaces demo test
+ labels environment in (production, staging, qa),application=nginx
+ fallthrough
+}`,
+ false,
+ "",
+ 2,
+ 2,
+ 15 * time.Minute,
+ "application=nginx,environment in (production,qa,staging)",
+ podModeDisabled,
+ true,
+ nil,
+ },
+ // negative
+ {
+ `kubernetes coredns.local {
+ endpoint
+}`,
+ true,
+ "rong argument count or unexpected line ending",
+ -1,
+ -1,
+ defaultResyncPeriod,
+ "",
+ podModeDisabled,
+ false,
+ nil,
+ },
+ {
+ `kubernetes coredns.local {
+ namespaces
+}`,
+ true,
+ "rong argument count or unexpected line ending",
+ -1,
+ -1,
+ defaultResyncPeriod,
+ "",
+ podModeDisabled,
+ false,
+ nil,
+ },
+ {
+ `kubernetes coredns.local {
+ resyncperiod
+}`,
+ true,
+ "rong argument count or unexpected line ending",
+ -1,
+ 0,
+ 0 * time.Minute,
+ "",
+ podModeDisabled,
+ false,
+ nil,
+ },
+ {
+ `kubernetes coredns.local {
+ resyncperiod 15
+}`,
+ true,
+ "unable to parse resync duration value",
+ -1,
+ 0,
+ 0 * time.Second,
+ "",
+ podModeDisabled,
+ false,
+ nil,
+ },
+ {
+ `kubernetes coredns.local {
+ resyncperiod abc
+}`,
+ true,
+ "unable to parse resync duration value",
+ -1,
+ 0,
+ 0 * time.Second,
+ "",
+ podModeDisabled,
+ false,
+ nil,
+ },
+ {
+ `kubernetes coredns.local {
+ labels
+}`,
+ true,
+ "rong argument count or unexpected line ending",
+ -1,
+ 0,
+ 0 * time.Second,
+ "",
+ podModeDisabled,
+ false,
+ nil,
+ },
+ {
+ `kubernetes coredns.local {
+ labels environment in (production, qa
+}`,
+ true,
+ "unable to parse label selector",
+ -1,
+ 0,
+ 0 * time.Second,
+ "",
+ podModeDisabled,
+ false,
+ nil,
+ },
+ // pods disabled
+ {
+ `kubernetes coredns.local {
+ pods disabled
+}`,
+ false,
+ "",
+ 1,
+ 0,
+ defaultResyncPeriod,
+ "",
+ podModeDisabled,
+ false,
+ nil,
+ },
+ // pods insecure
+ {
+ `kubernetes coredns.local {
+ pods insecure
+}`,
+ false,
+ "",
+ 1,
+ 0,
+ defaultResyncPeriod,
+ "",
+ podModeInsecure,
+ false,
+ nil,
+ },
+ // pods verified
+ {
+ `kubernetes coredns.local {
+ pods verified
+}`,
+ false,
+ "",
+ 1,
+ 0,
+ defaultResyncPeriod,
+ "",
+ podModeVerified,
+ false,
+ nil,
+ },
+ // pods invalid
+ {
+ `kubernetes coredns.local {
+ pods giant_seed
+}`,
+ true,
+ "rong value for pods",
+ -1,
+ 0,
+ defaultResyncPeriod,
+ "",
+ podModeVerified,
+ false,
+ nil,
+ },
+ // fallthrough invalid
+ {
+ `kubernetes coredns.local {
+ fallthrough junk
+}`,
+ true,
+ "rong argument count",
+ -1,
+ 0,
+ defaultResyncPeriod,
+ "",
+ podModeDisabled,
+ false,
+ nil,
+ },
+ // Valid upstream
+ {
+ `kubernetes coredns.local {
+ upstream 13.14.15.16:53
+}`,
+ false,
+ "",
+ 1,
+ 0,
+ defaultResyncPeriod,
+ "",
+ podModeDisabled,
+ false,
+ []string{"13.14.15.16:53"},
+ },
+ // Invalid upstream
+ {
+ `kubernetes coredns.local {
+ upstream 13.14.15.16orange
+}`,
+ true,
+ "not an IP address or file: \"13.14.15.16orange\"",
+ -1,
+ 0,
+ defaultResyncPeriod,
+ "",
+ podModeDisabled,
+ false,
+ nil,
+ },
+ }
+
+ for i, test := range tests {
+ c := caddy.NewTestController("dns", test.input)
+ k8sController, opts, err := kubernetesParse(c)
+
+ if test.shouldErr && err == nil {
+ t.Errorf("Test %d: Expected error, but did not find error for input '%s'. Error was: '%v'", i, test.input, err)
+ }
+
+ if err != nil {
+ if !test.shouldErr {
+ t.Errorf("Test %d: Expected no error but found one for input %s. Error was: %v", i, test.input, err)
+ continue
+ }
+
+ if test.shouldErr && (len(test.expectedErrContent) < 1) {
+ t.Fatalf("Test %d: Test marked as expecting an error, but no expectedErrContent provided for input '%s'. Error was: '%v'", i, test.input, err)
+ }
+
+ if test.shouldErr && (test.expectedZoneCount >= 0) {
+ t.Errorf("Test %d: Test marked as expecting an error, but provides value for expectedZoneCount!=-1 for input '%s'. Error was: '%v'", i, test.input, err)
+ }
+
+ if !strings.Contains(err.Error(), test.expectedErrContent) {
+ t.Errorf("Test %d: Expected error to contain: %v, found error: %v, input: %s", i, test.expectedErrContent, err, test.input)
+ }
+ continue
+ }
+
+ // No error was raised, so validate initialization of k8sController
+ // Zones
+ foundZoneCount := len(k8sController.Zones)
+ if foundZoneCount != test.expectedZoneCount {
+ t.Errorf("Test %d: Expected kubernetes controller to be initialized with %d zones, instead found %d zones: '%v' for input '%s'", i, test.expectedZoneCount, foundZoneCount, k8sController.Zones, test.input)
+ }
+
+ // Namespaces
+ foundNSCount := len(k8sController.Namespaces)
+ if foundNSCount != test.expectedNSCount {
+ t.Errorf("Test %d: Expected kubernetes controller to be initialized with %d namespaces. Instead found %d namespaces: '%v' for input '%s'", i, test.expectedNSCount, foundNSCount, k8sController.Namespaces, test.input)
+ }
+
+ // ResyncPeriod
+ foundResyncPeriod := opts.resyncPeriod
+ if foundResyncPeriod != test.expectedResyncPeriod {
+ t.Errorf("Test %d: Expected kubernetes controller to be initialized with resync period '%s'. Instead found period '%s' for input '%s'", i, test.expectedResyncPeriod, foundResyncPeriod, test.input)
+ }
+
+ // Labels
+ if opts.labelSelector != nil {
+ foundLabelSelectorString := unversioned.FormatLabelSelector(opts.labelSelector)
+ if foundLabelSelectorString != test.expectedLabelSelector {
+ t.Errorf("Test %d: Expected kubernetes controller to be initialized with label selector '%s'. Instead found selector '%s' for input '%s'", i, test.expectedLabelSelector, foundLabelSelectorString, test.input)
+ }
+ }
+ // Pods
+ foundPodMode := k8sController.podMode
+ if foundPodMode != test.expectedPodMode {
+ t.Errorf("Test %d: Expected kubernetes controller to be initialized with pod mode '%s'. Instead found pod mode '%s' for input '%s'", i, test.expectedPodMode, foundPodMode, test.input)
+ }
+
+ // fallthrough
+ foundFallthrough := k8sController.Fallthrough
+ if foundFallthrough != test.expectedFallthrough {
+ t.Errorf("Test %d: Expected kubernetes controller to be initialized with fallthrough '%v'. Instead found fallthrough '%v' for input '%s'", i, test.expectedFallthrough, foundFallthrough, test.input)
+ }
+ // upstream
+ foundUpstreams := k8sController.Proxy.Upstreams
+ if test.expectedUpstreams == nil {
+ if foundUpstreams != nil {
+ t.Errorf("Test %d: Expected kubernetes controller to not be initialized with upstreams for input '%s'", i, test.input)
+ }
+ } else {
+ if foundUpstreams == nil {
+ t.Errorf("Test %d: Expected kubernetes controller to be initialized with upstreams for input '%s'", i, test.input)
+ } else {
+ if len(*foundUpstreams) != len(test.expectedUpstreams) {
+ t.Errorf("Test %d: Expected kubernetes controller to be initialized with %d upstreams. Instead found %d upstreams for input '%s'", i, len(test.expectedUpstreams), len(*foundUpstreams), test.input)
+ }
+ for j, want := range test.expectedUpstreams {
+ got := (*foundUpstreams)[j].Select().Name
+ if got != want {
+ t.Errorf("Test %d: Expected kubernetes controller to be initialized with upstream '%s'. Instead found upstream '%s' for input '%s'", i, want, got, test.input)
+ }
+ }
+
+ }
+ }
+ }
+}
diff --git a/plugin/kubernetes/setup_ttl_test.go b/plugin/kubernetes/setup_ttl_test.go
new file mode 100644
index 000000000..d58f91576
--- /dev/null
+++ b/plugin/kubernetes/setup_ttl_test.go
@@ -0,0 +1,45 @@
+package kubernetes
+
+import (
+ "testing"
+
+ "github.com/mholt/caddy"
+)
+
+func TestKubernetesParseTTL(t *testing.T) {
+ tests := []struct {
+ input string // Corefile data as string
+ expectedTTL uint32 // expected count of defined zones.
+ shouldErr bool
+ }{
+ {`kubernetes cluster.local {
+ ttl 56
+ }`, 56, false},
+ {`kubernetes cluster.local`, defaultTTL, false},
+ {`kubernetes cluster.local {
+ ttl -1
+ }`, 0, true},
+ {`kubernetes cluster.local {
+ ttl 3601
+ }`, 0, true},
+ }
+
+ for i, tc := range tests {
+ c := caddy.NewTestController("dns", tc.input)
+ k, _, err := kubernetesParse(c)
+ if err != nil && !tc.shouldErr {
+ t.Fatalf("Test %d: Expected no error, got %q", i, err)
+ }
+ if err == nil && tc.shouldErr {
+ t.Fatalf("Test %d: Expected error, got none", i)
+ }
+ if err != nil && tc.shouldErr {
+ // input should error
+ continue
+ }
+
+ if k.ttl != tc.expectedTTL {
+ t.Errorf("Test %d: Expected TTl to be %d, got %d", i, tc.expectedTTL, k.ttl)
+ }
+ }
+}