diff options
author | 2017-09-14 09:36:06 +0100 | |
---|---|---|
committer | 2017-09-14 09:36:06 +0100 | |
commit | d8714e64e400ef873c2adc4d929a07d7890727b9 (patch) | |
tree | c9fa4c157e6af12eb1517654f8d23ca5d5619513 /plugin/kubernetes | |
parent | b984aa45595dc95253b91191afe7d3ee29e71b48 (diff) | |
download | coredns-d8714e64e400ef873c2adc4d929a07d7890727b9.tar.gz coredns-d8714e64e400ef873c2adc4d929a07d7890727b9.tar.zst coredns-d8714e64e400ef873c2adc4d929a07d7890727b9.zip |
Remove the word middleware (#1067)
* Rename middleware to plugin
first pass; mostly used 'sed', few spots where I manually changed
text.
This still builds a coredns binary.
* fmt error
* Rename AddMiddleware to AddPlugin
* Readd AddMiddleware to remain backwards compat
Diffstat (limited to 'plugin/kubernetes')
25 files changed, 3445 insertions, 0 deletions
diff --git a/plugin/kubernetes/DEV-README.md b/plugin/kubernetes/DEV-README.md new file mode 100644 index 000000000..4f652b578 --- /dev/null +++ b/plugin/kubernetes/DEV-README.md @@ -0,0 +1,43 @@ +# Basic Setup for Development and Testing + +## Launch Kubernetes + +To run the tests, you'll need a private, live Kubernetes cluster. If you don't have one, +you can try out [minikube](https://github.com/kubernetes/minikube), which is +also available via Homebrew for OS X users. + +## Configure Test Data + +The test data is all in [this manifest](https://github.com/coredns/coredns/blob/master/.travis/kubernetes/dns-test.yaml) +and you can load it with `kubectl apply -f`. It will create a couple namespaces and some services. +For the tests to pass, you should not create anything else in the cluster. + +## Proxy the API Server + +Assuming your Kuberentes API server isn't running on http://localhost:8080, you will need to proxy from that +port to your cluster. You can do this with `kubectl proxy --port 8080`. + +## Run CoreDNS Kubernetes Tests + +Now you can run the tests locally, for example: + +~~~ +$ cd $GOPATH/src/github.com/coredns/coredns/test +$ go test -v -tags k8s +~~~ + +# Implementation Notes/Ideas + +* Additional features: + * Implement IP selection and ordering (internal/external). Related to + wildcards and SkyDNS use of CNAMES. + * Expose arbitrary kubernetes repository data as TXT records? +* DNS Correctness + * Do we need to generate synthetic zone records for namespaces? + * Do we need to generate synthetic zone records for the skydns synthetic zones? +* Test cases + * Test with CoreDNS caching. CoreDNS caching for DNS response is working + using the `cache` directive. Tested working using 20s cache timeout + and A-record queries. Automate testing with cache in place. + * Automate CoreDNS performance tests. Initially for zone files, and for + pre-loaded k8s API cache. With and without CoreDNS response caching. diff --git a/plugin/kubernetes/README.md b/plugin/kubernetes/README.md new file mode 100644 index 000000000..387f1cf75 --- /dev/null +++ b/plugin/kubernetes/README.md @@ -0,0 +1,167 @@ +# kubernetes + +The *kubernetes* plugin enables the reading zone data from a Kubernetes cluster. It implements +the [Kubernetes DNS-Based Service Discovery +Specification](https://github.com/kubernetes/dns/blob/master/docs/specification.md). + +CoreDNS running the kubernetes plugin can be used as a replacement of kube-dns in a kubernetes +cluster. See the [deployment](https://github.com/coredns/deployment) repository for details on [how +to deploy CoreDNS in Kubernetes](https://github.com/coredns/deployment/tree/master/kubernetes). + +[stubDomains](http://blog.kubernetes.io/2017/04/configuring-private-dns-zones-upstream-nameservers-kubernetes.html) +are implemented via the *proxy* plugin. + +## Syntax + +~~~ +kubernetes [ZONES...] +~~~ + +With only the directive specified, the *kubernetes* plugin will default to the zone specified in +the server's block. It will handle all queries in that zone and connect to Kubernetes in-cluster. It +will not provide PTR records for services, or A records for pods. If **ZONES** is used it specifies +all the zones the plugin should be authoritative for. + +``` +kubernetes [ZONES...] { + resyncperiod DURATION + endpoint URL + tls CERT KEY CACERT + namespaces NAMESPACE... + labels EXPRESSION + pods POD-MODE + upstream ADDRESS... + ttl TTL + fallthrough +} +``` +* `resyncperiod` specifies the Kubernetes data API **DURATION** period. +* `endpoint` specifies the **URL** for a remove k8s API endpoint. + If omitted, it will connect to k8s in-cluster using the cluster service account. + Multiple k8s API endpoints could be specified, separated by `,`s, e.g. + `endpoint http://k8s-endpoint1:8080,http://k8s-endpoint2:8080`. CoreDNS + will automatically perform a healthcheck and proxy to the healthy k8s API endpoint. +* `tls` **CERT** **KEY** **CACERT** are the TLS cert, key and the CA cert file names for remote k8s connection. + This option is ignored if connecting in-cluster (i.e. endpoint is not specified). +* `namespaces` **NAMESPACE [NAMESPACE...]**, exposed only the k8s namespaces listed. + If this option is omitted all namespaces are exposed +* `labels` **EXPRESSION** only exposes the records for Kubernetes objects that match this label selector. + The label selector syntax is described in the + [Kubernetes User Guide - Labels](http://kubernetes.io/docs/user-guide/labels/). An example that + only exposes objects labeled as "application=nginx" in the "staging" or "qa" environments, would + use: `labels environment in (staging, qa),application=nginx`. +* `pods` **POD-MODE** sets the mode for handling IP-based pod A records, e.g. + `1-2-3-4.ns.pod.cluster.local. in A 1.2.3.4`. + This option is provided to facilitate use of SSL certs when connecting directly to pods. Valid + values for **POD-MODE**: + + * `disabled`: Default. Do not process pod requests, always returning `NXDOMAIN` + * `insecure`: Always return an A record with IP from request (without checking k8s). This option + is is vulnerable to abuse if used maliciously in conjunction with wildcard SSL certs. This + option is provided for backward compatibility with kube-dns. + * `verified`: Return an A record if there exists a pod in same namespace with matching IP. This + option requires substantially more memory than in insecure mode, since it will maintain a watch + on all pods. + +* `upstream` **ADDRESS [ADDRESS...]** defines the upstream resolvers used for resolving services + that point to external hosts (External Services). **ADDRESS** can be an ip, an ip:port, or a path + to a file structured like resolv.conf. +* `ttl` allows you to set a custom TTL for responses. The default (and allowed minimum) is to use + 5 seconds, the maximum is capped at 3600 seconds. +* `fallthrough` If a query for a record in the cluster zone results in NXDOMAIN, normally that is + what the response will be. However, if you specify this option, the query will instead be passed + on down the plugin chain, which can include another plugin to handle the query. + +## Examples + +Handle all queries in the `cluster.local` zone. Connect to Kubernetes in-cluster. +Also handle all `PTR` requests for `10.0.0.0/16` . Verify the existence of pods when answering pod +requests. Resolve upstream records against `10.102.3.10`. Note we show the entire server block +here: + +~~~ txt +10.0.0.0/16 cluster.local { + kubernetes { + pods verified + upstream 10.102.3.10:53 + } +} +~~~ + +Or you can selectively expose some namespaces: + +~~~ txt +kubernetes cluster.local { + namespaces test staging +} +~~~ + +Connect to Kubernetes with CoreDNS running outside the cluster: + +~~~ txt +kubernetes cluster.local { + endpoint https://k8s-endpoint:8443 + tls cert key cacert +} +~~~ + +Here we use the *proxy* plugin to implement stubDomains that forwards `example.org` and +`example.com` to another nameserver. + +~~~ txt +cluster.local { + kubernetes { + endpoint https://k8s-endpoint:8443 + tls cert key cacert + } +} +example.org { + proxy . 8.8.8.8:53 +} +example.com { + proxy . 8.8.8.8:53 +} +~~~ + +## AutoPath + +The *kubernetes* plugin can be used in conjunction with the *autopath* plugin. Using this +feature enables server-side domain search path completion in kubernetes clusters. Note: `pods` must +be set to `verified` for this to function properly. + + cluster.local { + autopath @kubernetes + kubernetes { + pods verified + } + } + +## Federation + +The *kubernetes* plugin can be used in conjunction with the *federation* plugin. Using this +feature enables serving federated domains from the kubernetes clusters. + + cluster.local { + federation { + fallthrough + prod prod.example.org + staging staging.example.org + + } + kubernetes + } + + +## Wildcards + +Some query labels accept a wildcard value to match any value. If a label is a valid wildcard (\*, +or the word "any"), then that label will match all values. The labels that accept wildcards are: + + * _service_ in an `A` record request: _service_.namespace.svc.zone. + * e.g. `*.ns.svc.myzone.local` + * _namespace_ in an `A` record request: service._namespace_.svc.zone. + * e.g. `nginx.*.svc.myzone.local` + * _port and/or protocol_ in an `SRV` request: __port_.__protocol_.service.namespace.svc.zone. + * e.g. `_http.*.service.ns.svc.` + * multiple wild cards are allowed in a single query. + * e.g. `A` Request `*.*.svc.zone.` or `SRV` request `*.*.*.*.svc.zone.` diff --git a/plugin/kubernetes/apiproxy.go b/plugin/kubernetes/apiproxy.go new file mode 100644 index 000000000..3e185f898 --- /dev/null +++ b/plugin/kubernetes/apiproxy.go @@ -0,0 +1,76 @@ +package kubernetes + +import ( + "fmt" + "io" + "log" + "net" + "net/http" + + "github.com/coredns/coredns/plugin/pkg/healthcheck" +) + +type proxyHandler struct { + healthcheck.HealthCheck +} + +type apiProxy struct { + http.Server + listener net.Listener + handler proxyHandler +} + +func (p *proxyHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + upstream := p.Select() + network := "tcp" + if upstream.Network != "" { + network = upstream.Network + } + address := upstream.Name + d, err := net.Dial(network, address) + if err != nil { + log.Printf("[ERROR] Unable to establish connection to upstream %s://%s: %s", network, address, err) + http.Error(w, fmt.Sprintf("Unable to establish connection to upstream %s://%s: %s", network, address, err), 500) + return + } + hj, ok := w.(http.Hijacker) + if !ok { + log.Printf("[ERROR] Unable to establish connection: no hijacker") + http.Error(w, "Unable to establish connection: no hijacker", 500) + return + } + nc, _, err := hj.Hijack() + if err != nil { + log.Printf("[ERROR] Unable to hijack connection: %s", err) + http.Error(w, fmt.Sprintf("Unable to hijack connection: %s", err), 500) + return + } + defer nc.Close() + defer d.Close() + + err = r.Write(d) + if err != nil { + log.Printf("[ERROR] Unable to copy connection to upstream %s://%s: %s", network, address, err) + http.Error(w, fmt.Sprintf("Unable to copy connection to upstream %s://%s: %s", network, address, err), 500) + return + } + + errChan := make(chan error, 2) + cp := func(dst io.Writer, src io.Reader) { + _, err := io.Copy(dst, src) + errChan <- err + } + go cp(d, nc) + go cp(nc, d) + <-errChan +} + +func (p *apiProxy) Run() { + p.handler.Start() + p.Serve(p.listener) +} + +func (p *apiProxy) Stop() { + p.handler.Stop() + p.listener.Close() +} diff --git a/plugin/kubernetes/autopath.go b/plugin/kubernetes/autopath.go new file mode 100644 index 000000000..f758869f1 --- /dev/null +++ b/plugin/kubernetes/autopath.go @@ -0,0 +1,53 @@ +package kubernetes + +import ( + "github.com/coredns/coredns/plugin" + "github.com/coredns/coredns/request" + + "k8s.io/client-go/1.5/pkg/api" +) + +// AutoPath implements the AutoPathFunc call from the autopath plugin. +// It returns a per-query search path or nil indicating no searchpathing should happen. +func (k *Kubernetes) AutoPath(state request.Request) []string { + // Check if the query falls in a zone we are actually authoriative for and thus if we want autopath. + zone := plugin.Zones(k.Zones).Matches(state.Name()) + if zone == "" { + return nil + } + + ip := state.IP() + + pod := k.podWithIP(ip) + if pod == nil { + return nil + } + + search := make([]string, 3) + if zone == "." { + search[0] = pod.Namespace + ".svc." + search[1] = "svc." + search[2] = "." + } else { + search[0] = pod.Namespace + ".svc." + zone + search[1] = "svc." + zone + search[2] = zone + } + + search = append(search, k.autoPathSearch...) + search = append(search, "") // sentinal + return search +} + +// podWithIP return the api.Pod for source IP ip. It returns nil if nothing can be found. +func (k *Kubernetes) podWithIP(ip string) (p *api.Pod) { + objList := k.APIConn.PodIndex(ip) + for _, o := range objList { + p, ok := o.(*api.Pod) + if !ok { + return nil + } + return p + } + return nil +} diff --git a/plugin/kubernetes/controller.go b/plugin/kubernetes/controller.go new file mode 100644 index 000000000..b809264e1 --- /dev/null +++ b/plugin/kubernetes/controller.go @@ -0,0 +1,399 @@ +package kubernetes + +import ( + "errors" + "fmt" + "log" + "sync" + "time" + + "k8s.io/client-go/1.5/kubernetes" + "k8s.io/client-go/1.5/pkg/api" + unversionedapi "k8s.io/client-go/1.5/pkg/api/unversioned" + "k8s.io/client-go/1.5/pkg/api/v1" + "k8s.io/client-go/1.5/pkg/labels" + "k8s.io/client-go/1.5/pkg/runtime" + "k8s.io/client-go/1.5/pkg/watch" + "k8s.io/client-go/1.5/tools/cache" +) + +var ( + namespace = api.NamespaceAll +) + +// storeToNamespaceLister makes a Store that lists Namespaces. +type storeToNamespaceLister struct { + cache.Store +} + +const podIPIndex = "PodIP" + +// List lists all Namespaces in the store. +func (s *storeToNamespaceLister) List() (ns api.NamespaceList, err error) { + for _, m := range s.Store.List() { + ns.Items = append(ns.Items, *(m.(*api.Namespace))) + } + return ns, nil +} + +type dnsController interface { + ServiceList() []*api.Service + PodIndex(string) []interface{} + EndpointsList() api.EndpointsList + + GetNodeByName(string) (api.Node, error) + + Run() + Stop() error +} + +type dnsControl struct { + client *kubernetes.Clientset + + selector *labels.Selector + + svcController *cache.Controller + podController *cache.Controller + nsController *cache.Controller + epController *cache.Controller + + svcLister cache.StoreToServiceLister + podLister cache.StoreToPodLister + nsLister storeToNamespaceLister + epLister cache.StoreToEndpointsLister + + // stopLock is used to enforce only a single call to Stop is active. + // Needed because we allow stopping through an http endpoint and + // allowing concurrent stoppers leads to stack traces. + stopLock sync.Mutex + shutdown bool + stopCh chan struct{} +} + +type dnsControlOpts struct { + initPodCache bool + resyncPeriod time.Duration + // Label handling. + labelSelector *unversionedapi.LabelSelector + selector *labels.Selector +} + +// newDNSController creates a controller for CoreDNS. +func newdnsController(kubeClient *kubernetes.Clientset, opts dnsControlOpts) *dnsControl { + dns := dnsControl{ + client: kubeClient, + selector: opts.selector, + stopCh: make(chan struct{}), + } + + dns.svcLister.Indexer, dns.svcController = cache.NewIndexerInformer( + &cache.ListWatch{ + ListFunc: serviceListFunc(dns.client, namespace, dns.selector), + WatchFunc: serviceWatchFunc(dns.client, namespace, dns.selector), + }, + &api.Service{}, + opts.resyncPeriod, + cache.ResourceEventHandlerFuncs{}, + cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + + if opts.initPodCache { + dns.podLister.Indexer, dns.podController = cache.NewIndexerInformer( + &cache.ListWatch{ + ListFunc: podListFunc(dns.client, namespace, dns.selector), + WatchFunc: podWatchFunc(dns.client, namespace, dns.selector), + }, + &api.Pod{}, // TODO replace with a lighter-weight custom struct + opts.resyncPeriod, + cache.ResourceEventHandlerFuncs{}, + cache.Indexers{podIPIndex: podIPIndexFunc}) + } + + dns.nsLister.Store, dns.nsController = cache.NewInformer( + &cache.ListWatch{ + ListFunc: namespaceListFunc(dns.client, dns.selector), + WatchFunc: namespaceWatchFunc(dns.client, dns.selector), + }, + &api.Namespace{}, + opts.resyncPeriod, + cache.ResourceEventHandlerFuncs{}) + + dns.epLister.Store, dns.epController = cache.NewInformer( + &cache.ListWatch{ + ListFunc: endpointsListFunc(dns.client, namespace, dns.selector), + WatchFunc: endpointsWatchFunc(dns.client, namespace, dns.selector), + }, + &api.Endpoints{}, + opts.resyncPeriod, + cache.ResourceEventHandlerFuncs{}) + + return &dns +} + +func podIPIndexFunc(obj interface{}) ([]string, error) { + p, ok := obj.(*api.Pod) + if !ok { + return nil, errors.New("obj was not an *api.Pod") + } + return []string{p.Status.PodIP}, nil +} + +func serviceListFunc(c *kubernetes.Clientset, ns string, s *labels.Selector) func(api.ListOptions) (runtime.Object, error) { + return func(opts api.ListOptions) (runtime.Object, error) { + if s != nil { + opts.LabelSelector = *s + } + listV1, err := c.Core().Services(ns).List(opts) + + if err != nil { + return nil, err + } + var listAPI api.ServiceList + err = v1.Convert_v1_ServiceList_To_api_ServiceList(listV1, &listAPI, nil) + if err != nil { + return nil, err + } + return &listAPI, err + } +} + +func podListFunc(c *kubernetes.Clientset, ns string, s *labels.Selector) func(api.ListOptions) (runtime.Object, error) { + return func(opts api.ListOptions) (runtime.Object, error) { + if s != nil { + opts.LabelSelector = *s + } + listV1, err := c.Core().Pods(ns).List(opts) + + if err != nil { + return nil, err + } + var listAPI api.PodList + err = v1.Convert_v1_PodList_To_api_PodList(listV1, &listAPI, nil) + if err != nil { + return nil, err + } + + return &listAPI, err + } +} + +func v1ToAPIFilter(in watch.Event) (out watch.Event, keep bool) { + if in.Type == watch.Error { + return in, true + } + + switch v1Obj := in.Object.(type) { + case *v1.Service: + var apiObj api.Service + err := v1.Convert_v1_Service_To_api_Service(v1Obj, &apiObj, nil) + if err != nil { + log.Printf("[ERROR] Could not convert v1.Service: %s", err) + return in, true + } + return watch.Event{Type: in.Type, Object: &apiObj}, true + case *v1.Pod: + var apiObj api.Pod + err := v1.Convert_v1_Pod_To_api_Pod(v1Obj, &apiObj, nil) + if err != nil { + log.Printf("[ERROR] Could not convert v1.Pod: %s", err) + return in, true + } + return watch.Event{Type: in.Type, Object: &apiObj}, true + case *v1.Namespace: + var apiObj api.Namespace + err := v1.Convert_v1_Namespace_To_api_Namespace(v1Obj, &apiObj, nil) + if err != nil { + log.Printf("[ERROR] Could not convert v1.Namespace: %s", err) + return in, true + } + return watch.Event{Type: in.Type, Object: &apiObj}, true + case *v1.Endpoints: + var apiObj api.Endpoints + err := v1.Convert_v1_Endpoints_To_api_Endpoints(v1Obj, &apiObj, nil) + if err != nil { + log.Printf("[ERROR] Could not convert v1.Endpoint: %s", err) + return in, true + } + return watch.Event{Type: in.Type, Object: &apiObj}, true + } + + log.Printf("[WARN] Unhandled v1 type in event: %v", in) + return in, true +} + +func serviceWatchFunc(c *kubernetes.Clientset, ns string, s *labels.Selector) func(options api.ListOptions) (watch.Interface, error) { + return func(options api.ListOptions) (watch.Interface, error) { + if s != nil { + options.LabelSelector = *s + } + w, err := c.Core().Services(ns).Watch(options) + if err != nil { + return nil, err + } + return watch.Filter(w, v1ToAPIFilter), nil + } +} + +func podWatchFunc(c *kubernetes.Clientset, ns string, s *labels.Selector) func(options api.ListOptions) (watch.Interface, error) { + return func(options api.ListOptions) (watch.Interface, error) { + if s != nil { + options.LabelSelector = *s + } + w, err := c.Core().Pods(ns).Watch(options) + + if err != nil { + return nil, err + } + return watch.Filter(w, v1ToAPIFilter), nil + } +} + +func namespaceListFunc(c *kubernetes.Clientset, s *labels.Selector) func(api.ListOptions) (runtime.Object, error) { + return func(opts api.ListOptions) (runtime.Object, error) { + if s != nil { + opts.LabelSelector = *s + } + listV1, err := c.Core().Namespaces().List(opts) + if err != nil { + return nil, err + } + var listAPI api.NamespaceList + err = v1.Convert_v1_NamespaceList_To_api_NamespaceList(listV1, &listAPI, nil) + if err != nil { + return nil, err + } + return &listAPI, err + } +} + +func namespaceWatchFunc(c *kubernetes.Clientset, s *labels.Selector) func(options api.ListOptions) (watch.Interface, error) { + return func(options api.ListOptions) (watch.Interface, error) { + if s != nil { + options.LabelSelector = *s + } + w, err := c.Core().Namespaces().Watch(options) + if err != nil { + return nil, err + } + return watch.Filter(w, v1ToAPIFilter), nil + } +} + +func endpointsListFunc(c *kubernetes.Clientset, ns string, s *labels.Selector) func(api.ListOptions) (runtime.Object, error) { + return func(opts api.ListOptions) (runtime.Object, error) { + if s != nil { + opts.LabelSelector = *s + } + listV1, err := c.Core().Endpoints(ns).List(opts) + + if err != nil { + return nil, err + } + var listAPI api.EndpointsList + err = v1.Convert_v1_EndpointsList_To_api_EndpointsList(listV1, &listAPI, nil) + if err != nil { + return nil, err + } + return &listAPI, err + } +} + +func endpointsWatchFunc(c *kubernetes.Clientset, ns string, s *labels.Selector) func(options api.ListOptions) (watch.Interface, error) { + return func(options api.ListOptions) (watch.Interface, error) { + if s != nil { + options.LabelSelector = *s + } + w, err := c.Core().Endpoints(ns).Watch(options) + if err != nil { + return nil, err + } + return watch.Filter(w, v1ToAPIFilter), nil + } +} + +func (dns *dnsControl) controllersInSync() bool { + hs := dns.svcController.HasSynced() && + dns.nsController.HasSynced() && + dns.epController.HasSynced() + + if dns.podController != nil { + hs = hs && dns.podController.HasSynced() + } + + return hs +} + +// Stop stops the controller. +func (dns *dnsControl) Stop() error { + dns.stopLock.Lock() + defer dns.stopLock.Unlock() + + // Only try draining the workqueue if we haven't already. + if !dns.shutdown { + close(dns.stopCh) + dns.shutdown = true + + return nil + } + + return fmt.Errorf("shutdown already in progress") +} + +// Run starts the controller. +func (dns *dnsControl) Run() { + go dns.svcController.Run(dns.stopCh) + go dns.nsController.Run(dns.stopCh) + go dns.epController.Run(dns.stopCh) + if dns.podController != nil { + go dns.podController.Run(dns.stopCh) + } + <-dns.stopCh +} + +func (dns *dnsControl) NamespaceList() *api.NamespaceList { + nsList, err := dns.nsLister.List() + if err != nil { + return &api.NamespaceList{} + } + + return &nsList +} + +func (dns *dnsControl) ServiceList() []*api.Service { + svcs, err := dns.svcLister.List(labels.Everything()) + if err != nil { + return []*api.Service{} + } + + return svcs +} + +func (dns *dnsControl) PodIndex(ip string) []interface{} { + pods, err := dns.podLister.Indexer.ByIndex(podIPIndex, ip) + if err != nil { + return nil + } + + return pods +} + +func (dns *dnsControl) EndpointsList() api.EndpointsList { + epl, err := dns.epLister.List() + if err != nil { + return api.EndpointsList{} + } + + return epl +} + +func (dns *dnsControl) GetNodeByName(name string) (api.Node, error) { + v1node, err := dns.client.Core().Nodes().Get(name) + if err != nil { + return api.Node{}, err + } + var apinode api.Node + err = v1.Convert_v1_Node_To_api_Node(v1node, &apinode, nil) + if err != nil { + return api.Node{}, err + } + return apinode, nil +} diff --git a/plugin/kubernetes/federation.go b/plugin/kubernetes/federation.go new file mode 100644 index 000000000..df6ae948b --- /dev/null +++ b/plugin/kubernetes/federation.go @@ -0,0 +1,45 @@ +package kubernetes + +import ( + "github.com/coredns/coredns/plugin/etcd/msg" + "github.com/coredns/coredns/plugin/pkg/dnsutil" + "github.com/coredns/coredns/request" +) + +// The federation node.Labels keys used. +const ( + // TODO: Do not hardcode these labels. Pull them out of the API instead. + // + // We can get them via .... + // import metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + // metav1.LabelZoneFailureDomain + // metav1.LabelZoneRegion + // + // But importing above breaks coredns with flag collision of 'log_dir' + + LabelZone = "failure-domain.beta.kubernetes.io/zone" + LabelRegion = "failure-domain.beta.kubernetes.io/region" +) + +// Federations is used from the federations plugin to return the service that should be +// returned as a CNAME for federation(s) to work. +func (k *Kubernetes) Federations(state request.Request, fname, fzone string) (msg.Service, error) { + nodeName := k.localNodeName() + node, err := k.APIConn.GetNodeByName(nodeName) + if err != nil { + return msg.Service{}, err + } + r, err := parseRequest(state) + if err != nil { + return msg.Service{}, err + } + + lz := node.Labels[LabelZone] + lr := node.Labels[LabelRegion] + + if r.endpoint == "" { + return msg.Service{Host: dnsutil.Join([]string{r.service, r.namespace, fname, r.podOrSvc, lz, lr, fzone})}, nil + } + + return msg.Service{Host: dnsutil.Join([]string{r.endpoint, r.service, r.namespace, fname, r.podOrSvc, lz, lr, fzone})}, nil +} diff --git a/plugin/kubernetes/handler.go b/plugin/kubernetes/handler.go new file mode 100644 index 000000000..9dc435111 --- /dev/null +++ b/plugin/kubernetes/handler.go @@ -0,0 +1,86 @@ +package kubernetes + +import ( + "github.com/coredns/coredns/plugin" + "github.com/coredns/coredns/plugin/pkg/dnsutil" + "github.com/coredns/coredns/request" + + "github.com/miekg/dns" + "golang.org/x/net/context" +) + +// ServeDNS implements the plugin.Handler interface. +func (k Kubernetes) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg) (int, error) { + state := request.Request{W: w, Req: r} + + m := new(dns.Msg) + m.SetReply(r) + m.Authoritative, m.RecursionAvailable, m.Compress = true, true, true + + zone := plugin.Zones(k.Zones).Matches(state.Name()) + if zone == "" { + return plugin.NextOrFailure(k.Name(), k.Next, ctx, w, r) + } + + state.Zone = zone + + var ( + records []dns.RR + extra []dns.RR + err error + ) + + switch state.Type() { + case "A": + records, err = plugin.A(&k, zone, state, nil, plugin.Options{}) + case "AAAA": + records, err = plugin.AAAA(&k, zone, state, nil, plugin.Options{}) + case "TXT": + records, err = plugin.TXT(&k, zone, state, plugin.Options{}) + case "CNAME": + records, err = plugin.CNAME(&k, zone, state, plugin.Options{}) + case "PTR": + records, err = plugin.PTR(&k, zone, state, plugin.Options{}) + case "MX": + records, extra, err = plugin.MX(&k, zone, state, plugin.Options{}) + case "SRV": + records, extra, err = plugin.SRV(&k, zone, state, plugin.Options{}) + case "SOA": + records, err = plugin.SOA(&k, zone, state, plugin.Options{}) + case "NS": + if state.Name() == zone { + records, extra, err = plugin.NS(&k, zone, state, plugin.Options{}) + break + } + fallthrough + default: + // Do a fake A lookup, so we can distinguish between NODATA and NXDOMAIN + _, err = plugin.A(&k, zone, state, nil, plugin.Options{}) + } + + if k.IsNameError(err) { + if k.Fallthrough { + return plugin.NextOrFailure(k.Name(), k.Next, ctx, w, r) + } + return plugin.BackendError(&k, zone, dns.RcodeNameError, state, nil /* err */, plugin.Options{}) + } + if err != nil { + return dns.RcodeServerFailure, err + } + + if len(records) == 0 { + return plugin.BackendError(&k, zone, dns.RcodeSuccess, state, nil, plugin.Options{}) + } + + m.Answer = append(m.Answer, records...) + m.Extra = append(m.Extra, extra...) + + m = dnsutil.Dedup(m) + state.SizeAndDo(m) + m, _ = state.Scrub(m) + w.WriteMsg(m) + return dns.RcodeSuccess, nil +} + +// Name implements the Handler interface. +func (k Kubernetes) Name() string { return "kubernetes" } diff --git a/plugin/kubernetes/handler_pod_disabled_test.go b/plugin/kubernetes/handler_pod_disabled_test.go new file mode 100644 index 000000000..4c6e15710 --- /dev/null +++ b/plugin/kubernetes/handler_pod_disabled_test.go @@ -0,0 +1,61 @@ +package kubernetes + +import ( + "testing" + + "github.com/coredns/coredns/plugin/pkg/dnsrecorder" + "github.com/coredns/coredns/plugin/test" + + "github.com/miekg/dns" + "golang.org/x/net/context" +) + +var podModeDisabledCases = []test.Case{ + { + Qname: "10-240-0-1.podns.pod.cluster.local.", Qtype: dns.TypeA, + Rcode: dns.RcodeNameError, + Error: errPodsDisabled, + Ns: []dns.RR{ + test.SOA("cluster.local. 300 IN SOA ns.dns.cluster.local. hostmaster.cluster.local. 1499347823 7200 1800 86400 60"), + }, + }, + { + Qname: "172-0-0-2.podns.pod.cluster.local.", Qtype: dns.TypeA, + Rcode: dns.RcodeNameError, + Error: errPodsDisabled, + Ns: []dns.RR{ + test.SOA("cluster.local. 300 IN SOA ns.dns.cluster.local. hostmaster.cluster.local. 1499347823 7200 1800 86400 60"), + }, + }, +} + +func TestServeDNSModeDisabled(t *testing.T) { + + k := New([]string{"cluster.local."}) + k.APIConn = &APIConnServeTest{} + k.Next = test.NextHandler(dns.RcodeSuccess, nil) + k.podMode = podModeDisabled + ctx := context.TODO() + + for i, tc := range podModeDisabledCases { + r := tc.Msg() + + w := dnsrecorder.New(&test.ResponseWriter{}) + + _, err := k.ServeDNS(ctx, w, r) + if err != tc.Error { + t.Errorf("Test %d expected no error, got %v", i, err) + return + } + if tc.Error != nil { + continue + } + + resp := w.Msg + if resp == nil { + t.Fatalf("Test %d, got nil message and no error for %q", i, r.Question[0].Name) + } + + test.SortAndCheck(t, resp, tc) + } +} diff --git a/plugin/kubernetes/handler_pod_insecure_test.go b/plugin/kubernetes/handler_pod_insecure_test.go new file mode 100644 index 000000000..b2df8a504 --- /dev/null +++ b/plugin/kubernetes/handler_pod_insecure_test.go @@ -0,0 +1,59 @@ +package kubernetes + +import ( + "testing" + + "github.com/coredns/coredns/plugin/pkg/dnsrecorder" + "github.com/coredns/coredns/plugin/test" + + "github.com/miekg/dns" + "golang.org/x/net/context" +) + +var podModeInsecureCases = []test.Case{ + { + Qname: "10-240-0-1.podns.pod.cluster.local.", Qtype: dns.TypeA, + Rcode: dns.RcodeSuccess, + Answer: []dns.RR{ + test.A("10-240-0-1.podns.pod.cluster.local. 0 IN A 10.240.0.1"), + }, + }, + { + Qname: "172-0-0-2.podns.pod.cluster.local.", Qtype: dns.TypeA, + Rcode: dns.RcodeSuccess, + Answer: []dns.RR{ + test.A("172-0-0-2.podns.pod.cluster.local. 0 IN A 172.0.0.2"), + }, + }, +} + +func TestServeDNSModeInsecure(t *testing.T) { + + k := New([]string{"cluster.local."}) + k.APIConn = &APIConnServeTest{} + k.Next = test.NextHandler(dns.RcodeSuccess, nil) + ctx := context.TODO() + k.podMode = podModeInsecure + + for i, tc := range podModeInsecureCases { + r := tc.Msg() + + w := dnsrecorder.New(&test.ResponseWriter{}) + + _, err := k.ServeDNS(ctx, w, r) + if err != tc.Error { + t.Errorf("Test %d expected no error, got %v", i, err) + return + } + if tc.Error != nil { + continue + } + + resp := w.Msg + if resp == nil { + t.Fatalf("Test %d, got nil message and no error for %q", i, r.Question[0].Name) + } + + test.SortAndCheck(t, resp, tc) + } +} diff --git a/plugin/kubernetes/handler_pod_verified_test.go b/plugin/kubernetes/handler_pod_verified_test.go new file mode 100644 index 000000000..ea585cc6a --- /dev/null +++ b/plugin/kubernetes/handler_pod_verified_test.go @@ -0,0 +1,59 @@ +package kubernetes + +import ( + "testing" + + "github.com/coredns/coredns/plugin/pkg/dnsrecorder" + "github.com/coredns/coredns/plugin/test" + + "github.com/miekg/dns" + "golang.org/x/net/context" +) + +var podModeVerifiedCases = []test.Case{ + { + Qname: "10-240-0-1.podns.pod.cluster.local.", Qtype: dns.TypeA, + Rcode: dns.RcodeSuccess, + Answer: []dns.RR{ + test.A("10-240-0-1.podns.pod.cluster.local. 0 IN A 10.240.0.1"), + }, + }, + { + Qname: "172-0-0-2.podns.pod.cluster.local.", Qtype: dns.TypeA, + Rcode: dns.RcodeNameError, + Ns: []dns.RR{ + test.SOA("cluster.local. 300 IN SOA ns.dns.cluster.local. hostmaster.cluster.local. 1499347823 7200 1800 86400 60"), + }, + }, +} + +func TestServeDNSModeVerified(t *testing.T) { + + k := New([]string{"cluster.local."}) + k.APIConn = &APIConnServeTest{} + k.Next = test.NextHandler(dns.RcodeSuccess, nil) + ctx := context.TODO() + k.podMode = podModeVerified + + for i, tc := range podModeVerifiedCases { + r := tc.Msg() + + w := dnsrecorder.New(&test.ResponseWriter{}) + + _, err := k.ServeDNS(ctx, w, r) + if err != tc.Error { + t.Errorf("Test %d expected no error, got %v", i, err) + return + } + if tc.Error != nil { + continue + } + + resp := w.Msg + if resp == nil { + t.Fatalf("Test %d, got nil message and no error for %q", i, r.Question[0].Name) + } + + test.SortAndCheck(t, resp, tc) + } +} diff --git a/plugin/kubernetes/handler_test.go b/plugin/kubernetes/handler_test.go new file mode 100644 index 000000000..5413f5b4c --- /dev/null +++ b/plugin/kubernetes/handler_test.go @@ -0,0 +1,347 @@ +package kubernetes + +import ( + "testing" + + "github.com/coredns/coredns/plugin/pkg/dnsrecorder" + "github.com/coredns/coredns/plugin/test" + + "github.com/miekg/dns" + "golang.org/x/net/context" + "k8s.io/client-go/1.5/pkg/api" +) + +var dnsTestCases = []test.Case{ + // A Service + { + Qname: "svc1.testns.svc.cluster.local.", Qtype: dns.TypeA, + Rcode: dns.RcodeSuccess, + Answer: []dns.RR{ + test.A("svc1.testns.svc.cluster.local. 5 IN A 10.0.0.1"), + }, + }, + // A Service (wildcard) + { + Qname: "svc1.*.svc.cluster.local.", Qtype: dns.TypeA, + Rcode: dns.RcodeSuccess, + Answer: []dns.RR{ + test.A("svc1.*.svc.cluster.local. 5 IN A 10.0.0.1"), + }, + }, + { + Qname: "svc1.testns.svc.cluster.local.", Qtype: dns.TypeSRV, + Rcode: dns.RcodeSuccess, + Answer: []dns.RR{test.SRV("svc1.testns.svc.cluster.local. 303 IN SRV 0 100 80 svc1.testns.svc.cluster.local.")}, + Extra: []dns.RR{test.A("svc1.testns.svc.cluster.local. 303 IN A 10.0.0.1")}, + }, + // SRV Service (wildcard) + { + Qname: "svc1.*.svc.cluster.local.", Qtype: dns.TypeSRV, + Rcode: dns.RcodeSuccess, + Answer: []dns.RR{test.SRV("svc1.*.svc.cluster.local. 303 IN SRV 0 100 80 svc1.testns.svc.cluster.local.")}, + Extra: []dns.RR{test.A("svc1.testns.svc.cluster.local. 303 IN A 10.0.0.1")}, + }, + // SRV Service (wildcards) + { + Qname: "*.any.svc1.*.svc.cluster.local.", Qtype: dns.TypeSRV, + Rcode: dns.RcodeSuccess, + Answer: []dns.RR{test.SRV("*.any.svc1.*.svc.cluster.local. 303 IN SRV 0 100 80 svc1.testns.svc.cluster.local.")}, + Extra: []dns.RR{test.A("svc1.testns.svc.cluster.local. 303 IN A 10.0.0.1")}, + }, + // A Service (wildcards) + { + Qname: "*.any.svc1.*.svc.cluster.local.", Qtype: dns.TypeA, + Rcode: dns.RcodeSuccess, + Answer: []dns.RR{ + test.A("*.any.svc1.*.svc.cluster.local. 303 IN A 10.0.0.1"), + }, + }, + // SRV Service Not udp/tcp + { + Qname: "*._not-udp-or-tcp.svc1.testns.svc.cluster.local.", Qtype: dns.TypeSRV, + Rcode: dns.RcodeNameError, + Ns: []dns.RR{ + test.SOA("cluster.local. 300 IN SOA ns.dns.cluster.local. hostmaster.cluster.local. 1499347823 7200 1800 86400 60"), + }, + }, + // SRV Service + { + Qname: "_http._tcp.svc1.testns.svc.cluster.local.", Qtype: dns.TypeSRV, + Rcode: dns.RcodeSuccess, + Answer: []dns.RR{ + test.SRV("_http._tcp.svc1.testns.svc.cluster.local. 303 IN SRV 0 100 80 svc1.testns.svc.cluster.local."), + }, + Extra: []dns.RR{ + test.A("svc1.testns.svc.cluster.local. 303 IN A 10.0.0.1"), + }, + }, + // A Service (Headless) + { + Qname: "hdls1.testns.svc.cluster.local.", Qtype: dns.TypeA, + Rcode: dns.RcodeSuccess, + Answer: []dns.RR{ + test.A("hdls1.testns.svc.cluster.local. 303 IN A 172.0.0.2"), + test.A("hdls1.testns.svc.cluster.local. 303 IN A 172.0.0.3"), + }, + }, + // SRV Service (Headless) + { + Qname: "_http._tcp.hdls1.testns.svc.cluster.local.", Qtype: dns.TypeSRV, + Rcode: dns.RcodeSuccess, + Answer: []dns.RR{ + test.SRV("_http._tcp.hdls1.testns.svc.cluster.local. 303 IN SRV 0 50 80 172-0-0-2.hdls1.testns.svc.cluster.local."), + test.SRV("_http._tcp.hdls1.testns.svc.cluster.local. 303 IN SRV 0 50 80 172-0-0-3.hdls1.testns.svc.cluster.local."), + }, + Extra: []dns.RR{ + test.A("172-0-0-2.hdls1.testns.svc.cluster.local. 303 IN A 172.0.0.2"), + test.A("172-0-0-3.hdls1.testns.svc.cluster.local. 303 IN A 172.0.0.3"), + }, + }, + // CNAME External + { + Qname: "external.testns.svc.cluster.local.", Qtype: dns.TypeCNAME, + Rcode: dns.RcodeSuccess, + Answer: []dns.RR{ + test.CNAME("external.testns.svc.cluster.local. 303 IN CNAME ext.interwebs.test."), + }, + }, + // AAAA Service (existing service) + { + Qname: "svc1.testns.svc.cluster.local.", Qtype: dns.TypeAAAA, + Rcode: dns.RcodeSuccess, + Ns: []dns.RR{ + test.SOA("cluster.local. 300 IN SOA ns.dns.cluster.local. hostmaster.cluster.local. 1499347823 7200 1800 86400 60"), + }, + }, + // AAAA Service (non-existing service) + { + Qname: "svc0.testns.svc.cluster.local.", Qtype: dns.TypeAAAA, + Rcode: dns.RcodeNameError, + Ns: []dns.RR{ + test.SOA("cluster.local. 300 IN SOA ns.dns.cluster.local. hostmaster.cluster.local. 1499347823 7200 1800 86400 60"), + }, + }, + // A Service (non-existing service) + { + Qname: "svc0.testns.svc.cluster.local.", Qtype: dns.TypeA, + Rcode: dns.RcodeNameError, + Ns: []dns.RR{ + test.SOA("cluster.local. 300 IN SOA ns.dns.cluster.local. hostmaster.cluster.local. 1499347823 7200 1800 86400 60"), + }, + }, + // TXT Schema + { + Qname: "dns-version.cluster.local.", Qtype: dns.TypeTXT, + Rcode: dns.RcodeSuccess, + Answer: []dns.RR{ + test.TXT("dns-version.cluster.local 28800 IN TXT 1.0.1"), + }, + }, + // A Service (Headless) does not exist + { + Qname: "bogusendpoint.hdls1.testns.svc.cluster.local.", Qtype: dns.TypeA, + Rcode: dns.RcodeNameError, + Ns: []dns.RR{ + test.SOA("cluster.local. 300 IN SOA ns.dns.cluster.local. hostmaster.cluster.local. 1499347823 7200 1800 86400 60"), + }, + }, + // A Service does not exist + { + Qname: "bogusendpoint.svc0.testns.svc.cluster.local.", Qtype: dns.TypeA, + Rcode: dns.RcodeNameError, + Ns: []dns.RR{ + test.SOA("cluster.local. 300 IN SOA ns.dns.cluster.local. hostmaster.cluster.local. 1499347823 7200 1800 86400 60"), + }, + }, +} + +func TestServeDNS(t *testing.T) { + + k := New([]string{"cluster.local."}) + k.APIConn = &APIConnServeTest{} + k.Next = test.NextHandler(dns.RcodeSuccess, nil) + ctx := context.TODO() + + for i, tc := range dnsTestCases { + r := tc.Msg() + + w := dnsrecorder.New(&test.ResponseWriter{}) + + _, err := k.ServeDNS(ctx, w, r) + if err != tc.Error { + t.Errorf("Test %d expected no error, got %v", i, err) + return + } + if tc.Error != nil { + continue + } + + resp := w.Msg + if resp == nil { + t.Fatalf("Test %d, got nil message and no error for %q", i, r.Question[0].Name) + } + + // Before sorting, make sure that CNAMES do not appear after their target records + test.CNAMEOrder(t, resp) + + test.SortAndCheck(t, resp, tc) + } +} + +type APIConnServeTest struct{} + +func (APIConnServeTest) Run() { return } +func (APIConnServeTest) Stop() error { return nil } + +func (APIConnServeTest) PodIndex(string) []interface{} { + a := make([]interface{}, 1) + a[0] = &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Namespace: "podns", + }, + Status: api.PodStatus{ + PodIP: "10.240.0.1", // Remote IP set in test.ResponseWriter + }, + } + return a +} + +func (APIConnServeTest) ServiceList() []*api.Service { + svcs := []*api.Service{ + { + ObjectMeta: api.ObjectMeta{ + Name: "svc1", + Namespace: "testns", + }, + Spec: api.ServiceSpec{ + ClusterIP: "10.0.0.1", + Ports: []api.ServicePort{{ + Name: "http", + Protocol: "tcp", + Port: 80, + }}, + }, + }, + { + ObjectMeta: api.ObjectMeta{ + Name: "hdls1", + Namespace: "testns", + }, + Spec: api.ServiceSpec{ + ClusterIP: api.ClusterIPNone, + }, + }, + { + ObjectMeta: api.ObjectMeta{ + Name: "external", + Namespace: "testns", + }, + Spec: api.ServiceSpec{ + ExternalName: "ext.interwebs.test", + Ports: []api.ServicePort{{ + Name: "http", + Protocol: "tcp", + Port: 80, + }}, + }, + }, + } + return svcs + +} + +func (APIConnServeTest) EndpointsList() api.EndpointsList { + n := "test.node.foo.bar" + + return api.EndpointsList{ + Items: []api.Endpoints{ + { + Subsets: []api.EndpointSubset{ + { + Addresses: []api.EndpointAddress{ + { + IP: "172.0.0.1", + Hostname: "ep1a", + }, + }, + Ports: []api.EndpointPort{ + { + Port: 80, + Protocol: "tcp", + Name: "http", + }, + }, + }, + }, + ObjectMeta: api.ObjectMeta{ + Name: "svc1", + Namespace: "testns", + }, + }, + { + Subsets: []api.EndpointSubset{ + { + Addresses: []api.EndpointAddress{ + { + IP: "172.0.0.2", + }, + }, + Ports: []api.EndpointPort{ + { + Port: 80, + Protocol: "tcp", + Name: "http", + }, + }, + }, + }, + ObjectMeta: api.ObjectMeta{ + Name: "hdls1", + Namespace: "testns", + }, + }, + { + Subsets: []api.EndpointSubset{ + { + Addresses: []api.EndpointAddress{ + { + IP: "172.0.0.3", + }, + }, + Ports: []api.EndpointPort{ + { + Port: 80, + Protocol: "tcp", + Name: "http", + }, + }, + }, + }, + ObjectMeta: api.ObjectMeta{ + Name: "hdls1", + Namespace: "testns", + }, + }, + { + Subsets: []api.EndpointSubset{ + { + Addresses: []api.EndpointAddress{ + { + IP: "10.9.8.7", + NodeName: &n, + }, + }, + }, + }, + }, + }, + } +} + +func (APIConnServeTest) GetNodeByName(name string) (api.Node, error) { + return api.Node{ + ObjectMeta: api.ObjectMeta{ + Name: "test.node.foo.bar", + }, + }, nil +} diff --git a/plugin/kubernetes/kubernetes.go b/plugin/kubernetes/kubernetes.go new file mode 100644 index 000000000..90fcd6182 --- /dev/null +++ b/plugin/kubernetes/kubernetes.go @@ -0,0 +1,457 @@ +// Package kubernetes provides the kubernetes backend. +package kubernetes + +import ( + "errors" + "fmt" + "net" + "strings" + "sync/atomic" + "time" + + "github.com/coredns/coredns/plugin" + "github.com/coredns/coredns/plugin/etcd/msg" + "github.com/coredns/coredns/plugin/pkg/dnsutil" + "github.com/coredns/coredns/plugin/pkg/healthcheck" + "github.com/coredns/coredns/plugin/proxy" + "github.com/coredns/coredns/request" + + "github.com/miekg/dns" + "k8s.io/client-go/1.5/kubernetes" + "k8s.io/client-go/1.5/pkg/api" + unversionedapi "k8s.io/client-go/1.5/pkg/api/unversioned" + "k8s.io/client-go/1.5/pkg/labels" + "k8s.io/client-go/1.5/rest" + "k8s.io/client-go/1.5/tools/clientcmd" + clientcmdapi "k8s.io/client-go/1.5/tools/clientcmd/api" +) + +// Kubernetes implements a plugin that connects to a Kubernetes cluster. +type Kubernetes struct { + Next plugin.Handler + Zones []string + Proxy proxy.Proxy // Proxy for looking up names during the resolution process + APIServerList []string + APIProxy *apiProxy + APICertAuth string + APIClientCert string + APIClientKey string + APIConn dnsController + Namespaces map[string]bool + podMode string + Fallthrough bool + ttl uint32 + + primaryZoneIndex int + interfaceAddrsFunc func() net.IP + autoPathSearch []string // Local search path from /etc/resolv.conf. Needed for autopath. +} + +// New returns a intialized Kubernetes. It default interfaceAddrFunc to return 127.0.0.1. All other +// values default to their zero value, primaryZoneIndex will thus point to the first zone. +func New(zones []string) *Kubernetes { + k := new(Kubernetes) + k.Zones = zones + k.Namespaces = make(map[string]bool) + k.interfaceAddrsFunc = func() net.IP { return net.ParseIP("127.0.0.1") } + k.podMode = podModeDisabled + k.Proxy = proxy.Proxy{} + k.ttl = defaultTTL + + return k +} + +const ( + // podModeDisabled is the default value where pod requests are ignored + podModeDisabled = "disabled" + // podModeVerified is where Pod requests are answered only if they exist + podModeVerified = "verified" + // podModeInsecure is where pod requests are answered without verfying they exist + podModeInsecure = "insecure" + // DNSSchemaVersion is the schema version: https://github.com/kubernetes/dns/blob/master/docs/specification.md + DNSSchemaVersion = "1.0.1" +) + +var ( + errNoItems = errors.New("no items found") + errNsNotExposed = errors.New("namespace is not exposed") + errInvalidRequest = errors.New("invalid query name") + errAPIBadPodType = errors.New("expected type *api.Pod") + errPodsDisabled = errors.New("pod records disabled") +) + +// Services implements the ServiceBackend interface. +func (k *Kubernetes) Services(state request.Request, exact bool, opt plugin.Options) (svcs []msg.Service, err error) { + + // We're looking again at types, which we've already done in ServeDNS, but there are some types k8s just can't answer. + switch state.QType() { + + case dns.TypeTXT: + // 1 label + zone, label must be "dns-version". + t, _ := dnsutil.TrimZone(state.Name(), state.Zone) + + segs := dns.SplitDomainName(t) + if len(segs) != 1 { + return nil, fmt.Errorf("kubernetes: TXT query can only be for dns-version: %s", state.QName()) + } + if segs[0] != "dns-version" { + return nil, nil + } + svc := msg.Service{Text: DNSSchemaVersion, TTL: 28800, Key: msg.Path(state.QName(), "coredns")} + return []msg.Service{svc}, nil + + case dns.TypeNS: + // We can only get here if the qname equal the zone, see ServeDNS in handler.go. + ns := k.nsAddr() + svc := msg.Service{Host: ns.A.String(), Key: msg.Path(state.QName(), "coredns")} + return []msg.Service{svc}, nil + } + + if state.QType() == dns.TypeA && isDefaultNS(state.Name(), state.Zone) { + // If this is an A request for "ns.dns", respond with a "fake" record for coredns. + // SOA records always use this hardcoded name + ns := k.nsAddr() + svc := msg.Service{Host: ns.A.String(), Key: msg.Path(state.QName(), "coredns")} + return []msg.Service{svc}, nil + } + + s, e := k.Records(state, false) + + // SRV for external services is not yet implemented, so remove those records. + + if state.QType() != dns.TypeSRV { + return s, e + } + + internal := []msg.Service{} + for _, svc := range s { + if t, _ := svc.HostType(); t != dns.TypeCNAME { + internal = append(internal, svc) + } + } + + return internal, e +} + +// primaryZone will return the first non-reverse zone being handled by this plugin +func (k *Kubernetes) primaryZone() string { return k.Zones[k.primaryZoneIndex] } + +// Lookup implements the ServiceBackend interface. +func (k *Kubernetes) Lookup(state request.Request, name string, typ uint16) (*dns.Msg, error) { + return k.Proxy.Lookup(state, name, typ) +} + +// IsNameError implements the ServiceBackend interface. +func (k *Kubernetes) IsNameError(err error) bool { + return err == errNoItems || err == errNsNotExposed || err == errInvalidRequest +} + +func (k *Kubernetes) getClientConfig() (*rest.Config, error) { + loadingRules := &clientcmd.ClientConfigLoadingRules{} + overrides := &clientcmd.ConfigOverrides{} + clusterinfo := clientcmdapi.Cluster{} + authinfo := clientcmdapi.AuthInfo{} + + if len(k.APIServerList) == 0 { + cc, err := rest.InClusterConfig() + if err != nil { + return nil, err + } + return cc, err + } + + endpoint := k.APIServerList[0] + if len(k.APIServerList) > 1 { + // Use a random port for api proxy, will get the value later through listener.Addr() + listener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + return nil, fmt.Errorf("failed to create kubernetes api proxy: %v", err) + } + k.APIProxy = &apiProxy{ + listener: listener, + handler: proxyHandler{ + HealthCheck: healthcheck.HealthCheck{ + FailTimeout: 3 * time.Second, + MaxFails: 1, + Future: 10 * time.Second, + Path: "/", + Interval: 5 * time.Second, + }, + }, + } + k.APIProxy.handler.Hosts = make([]*healthcheck.UpstreamHost, len(k.APIServerList)) + for i, entry := range k.APIServerList { + + uh := &healthcheck.UpstreamHost{ + Name: strings.TrimPrefix(entry, "http://"), + + CheckDown: func(upstream *proxyHandler) healthcheck.UpstreamHostDownFunc { + return func(uh *healthcheck.UpstreamHost) bool { + + down := false + + uh.CheckMu.Lock() + until := uh.OkUntil + uh.CheckMu.Unlock() + + if !until.IsZero() && time.Now().After(until) { + down = true + } + + fails := atomic.LoadInt32(&uh.Fails) + if fails >= upstream.MaxFails && upstream.MaxFails != 0 { + down = true + } + return down + } + }(&k.APIProxy.handler), + } + + k.APIProxy.handler.Hosts[i] = uh + } + k.APIProxy.Handler = &k.APIProxy.handler + + // Find the random port used for api proxy + endpoint = fmt.Sprintf("http://%s", listener.Addr()) + } + clusterinfo.Server = endpoint + + if len(k.APICertAuth) > 0 { + clusterinfo.CertificateAuthority = k.APICertAuth + } + if len(k.APIClientCert) > 0 { + authinfo.ClientCertificate = k.APIClientCert + } + if len(k.APIClientKey) > 0 { + authinfo.ClientKey = k.APIClientKey + } + + overrides.ClusterInfo = clusterinfo + overrides.AuthInfo = authinfo + clientConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loadingRules, overrides) + + return clientConfig.ClientConfig() +} + +// initKubeCache initializes a new Kubernetes cache. +func (k *Kubernetes) initKubeCache(opts dnsControlOpts) (err error) { + + config, err := k.getClientConfig() + if err != nil { + return err + } + + kubeClient, err := kubernetes.NewForConfig(config) + if err != nil { + return fmt.Errorf("failed to create kubernetes notification controller: %q", err) + } + + if opts.labelSelector != nil { + var selector labels.Selector + selector, err = unversionedapi.LabelSelectorAsSelector(opts.labelSelector) + if err != nil { + return fmt.Errorf("unable to create Selector for LabelSelector '%s': %q", opts.labelSelector, err) + } + opts.selector = &selector + } + + opts.initPodCache = k.podMode == podModeVerified + + k.APIConn = newdnsController(kubeClient, opts) + + return err +} + +// Records looks up services in kubernetes. +func (k *Kubernetes) Records(state request.Request, exact bool) ([]msg.Service, error) { + r, e := parseRequest(state) + if e != nil { + return nil, e + } + + if !wildcard(r.namespace) && !k.namespaceExposed(r.namespace) { + return nil, errNsNotExposed + } + + if r.podOrSvc == Pod { + pods, err := k.findPods(r, state.Zone) + return pods, err + } + + services, err := k.findServices(r, state.Zone) + return services, err +} + +func endpointHostname(addr api.EndpointAddress) string { + if addr.Hostname != "" { + return strings.ToLower(addr.Hostname) + } + if strings.Contains(addr.IP, ".") { + return strings.Replace(addr.IP, ".", "-", -1) + } + if strings.Contains(addr.IP, ":") { + return strings.ToLower(strings.Replace(addr.IP, ":", "-", -1)) + } + return "" +} + +func (k *Kubernetes) findPods(r recordRequest, zone string) (pods []msg.Service, err error) { + if k.podMode == podModeDisabled { + return nil, errPodsDisabled + } + + namespace := r.namespace + podname := r.service + zonePath := msg.Path(zone, "coredns") + ip := "" + err = errNoItems + + if strings.Count(podname, "-") == 3 && !strings.Contains(podname, "--") { + ip = strings.Replace(podname, "-", ".", -1) + } else { + ip = strings.Replace(podname, "-", ":", -1) + } + + if k.podMode == podModeInsecure { + return []msg.Service{{Key: strings.Join([]string{zonePath, Pod, namespace, podname}, "/"), Host: ip}}, nil + } + + // PodModeVerified + objList := k.APIConn.PodIndex(ip) + + for _, o := range objList { + p, ok := o.(*api.Pod) + if !ok { + return nil, errAPIBadPodType + } + // If namespace has a wildcard, filter results against Corefile namespace list. + if wildcard(namespace) && !k.namespaceExposed(p.Namespace) { + continue + } + // check for matching ip and namespace + if ip == p.Status.PodIP && match(namespace, p.Namespace) { + s := msg.Service{Key: strings.Join([]string{zonePath, Pod, namespace, podname}, "/"), Host: ip} + pods = append(pods, s) + + err = nil + } + } + return pods, err +} + +// findServices returns the services matching r from the cache. +func (k *Kubernetes) findServices(r recordRequest, zone string) (services []msg.Service, err error) { + serviceList := k.APIConn.ServiceList() + zonePath := msg.Path(zone, "coredns") + err = errNoItems // Set to errNoItems to signal really nothing found, gets reset when name is matched. + + for _, svc := range serviceList { + if !(match(r.namespace, svc.Namespace) && match(r.service, svc.Name)) { + continue + } + + // If namespace has a wildcard, filter results against Corefile namespace list. + // (Namespaces without a wildcard were filtered before the call to this function.) + if wildcard(r.namespace) && !k.namespaceExposed(svc.Namespace) { + continue + } + + // Endpoint query or headless service + if svc.Spec.ClusterIP == api.ClusterIPNone || r.endpoint != "" { + endpointsList := k.APIConn.EndpointsList() + for _, ep := range endpointsList.Items { + if ep.ObjectMeta.Name != svc.Name || ep.ObjectMeta.Namespace != svc.Namespace { + continue + } + + for _, eps := range ep.Subsets { + for _, addr := range eps.Addresses { + + // See comments in parse.go parseRequest about the endpoint handling. + + if r.endpoint != "" { + if !match(r.endpoint, endpointHostname(addr)) { + continue + } + } + + for _, p := range eps.Ports { + if !(match(r.port, p.Name) && match(r.protocol, string(p.Protocol))) { + continue + } + s := msg.Service{Host: addr.IP, Port: int(p.Port), TTL: k.ttl} + s.Key = strings.Join([]string{zonePath, Svc, svc.Namespace, svc.Name, endpointHostname(addr)}, "/") + + err = nil + + services = append(services, s) + } + } + } + } + continue + } + + // External service + if svc.Spec.ExternalName != "" { + s := msg.Service{Key: strings.Join([]string{zonePath, Svc, svc.Namespace, svc.Name}, "/"), Host: svc.Spec.ExternalName, TTL: k.ttl} + if t, _ := s.HostType(); t == dns.TypeCNAME { + s.Key = strings.Join([]string{zonePath, Svc, svc.Namespace, svc.Name}, "/") + services = append(services, s) + + err = nil + + continue + } + } + + // ClusterIP service + for _, p := range svc.Spec.Ports { + if !(match(r.port, p.Name) && match(r.protocol, string(p.Protocol))) { + continue + } + + err = nil + + s := msg.Service{Host: svc.Spec.ClusterIP, Port: int(p.Port), TTL: k.ttl} + s.Key = strings.Join([]string{zonePath, Svc, svc.Namespace, svc.Name}, "/") + + services = append(services, s) + } + } + return services, err +} + +// match checks if a and b are equal taking wildcards into account. +func match(a, b string) bool { + if wildcard(a) { + return true + } + if wildcard(b) { + return true + } + return strings.EqualFold(a, b) +} + +// wildcard checks whether s contains a wildcard value defined as "*" or "any". +func wildcard(s string) bool { + return s == "*" || s == "any" +} + +// namespaceExposed returns true when the namespace is exposed. +func (k *Kubernetes) namespaceExposed(namespace string) bool { + _, ok := k.Namespaces[namespace] + if len(k.Namespaces) > 0 && !ok { + return false + } + return true +} + +const ( + // Svc is the DNS schema for kubernetes services + Svc = "svc" + // Pod is the DNS schema for kubernetes pods + Pod = "pod" + // defaultTTL to apply to all answers. + defaultTTL = 5 +) diff --git a/plugin/kubernetes/kubernetes_apex_test.go b/plugin/kubernetes/kubernetes_apex_test.go new file mode 100644 index 000000000..41b70b883 --- /dev/null +++ b/plugin/kubernetes/kubernetes_apex_test.go @@ -0,0 +1,68 @@ +package kubernetes + +import ( + "testing" + + "github.com/coredns/coredns/plugin/pkg/dnsrecorder" + "github.com/coredns/coredns/plugin/test" + + "github.com/miekg/dns" + "golang.org/x/net/context" +) + +var kubeApexCases = [](test.Case){ + { + Qname: "cluster.local.", Qtype: dns.TypeSOA, + Rcode: dns.RcodeSuccess, + Answer: []dns.RR{ + test.SOA("cluster.local. 303 IN SOA ns.dns.cluster.local. hostmaster.cluster.local. 1499347823 7200 1800 86400 60"), + }, + }, + { + Qname: "cluster.local.", Qtype: dns.TypeHINFO, + Rcode: dns.RcodeSuccess, + Ns: []dns.RR{ + test.SOA("cluster.local. 303 IN SOA ns.dns.cluster.local. hostmaster.cluster.local. 1499347823 7200 1800 86400 60"), + }, + }, + { + Qname: "cluster.local.", Qtype: dns.TypeNS, + Rcode: dns.RcodeSuccess, + Answer: []dns.RR{ + test.NS("cluster.local. 303 IN NS ns.dns.cluster.local."), + }, + Extra: []dns.RR{ + test.A("ns.dns.cluster.local. 303 IN A 127.0.0.1"), + }, + }, +} + +func TestServeDNSApex(t *testing.T) { + + k := New([]string{"cluster.local."}) + k.APIConn = &APIConnServeTest{} + k.Next = test.NextHandler(dns.RcodeSuccess, nil) + ctx := context.TODO() + + for i, tc := range kubeApexCases { + r := tc.Msg() + + w := dnsrecorder.New(&test.ResponseWriter{}) + + _, err := k.ServeDNS(ctx, w, r) + if err != tc.Error { + t.Errorf("Test %d, expected no error, got %v\n", i, err) + return + } + if tc.Error != nil { + continue + } + + resp := w.Msg + if resp == nil { + t.Fatalf("Test %d, got nil message and no error ford", i) + } + + test.SortAndCheck(t, resp, tc) + } +} diff --git a/plugin/kubernetes/kubernetes_test.go b/plugin/kubernetes/kubernetes_test.go new file mode 100644 index 000000000..f347f10fc --- /dev/null +++ b/plugin/kubernetes/kubernetes_test.go @@ -0,0 +1,242 @@ +package kubernetes + +import ( + "testing" + + "github.com/coredns/coredns/plugin" + "github.com/coredns/coredns/request" + + "github.com/miekg/dns" + "k8s.io/client-go/1.5/pkg/api" +) + +func TestWildcard(t *testing.T) { + var tests = []struct { + s string + expected bool + }{ + {"mynamespace", false}, + {"*", true}, + {"any", true}, + {"my*space", false}, + {"*space", false}, + {"myname*", false}, + } + + for _, te := range tests { + got := wildcard(te.s) + if got != te.expected { + t.Errorf("Expected Wildcard result '%v' for example '%v', got '%v'.", te.expected, te.s, got) + } + } +} + +func TestEndpointHostname(t *testing.T) { + var tests = []struct { + ip string + hostname string + expected string + }{ + {"10.11.12.13", "", "10-11-12-13"}, + {"10.11.12.13", "epname", "epname"}, + } + for _, test := range tests { + result := endpointHostname(api.EndpointAddress{IP: test.ip, Hostname: test.hostname}) + if result != test.expected { + t.Errorf("Expected endpoint name for (ip:%v hostname:%v) to be '%v', but got '%v'", test.ip, test.hostname, test.expected, result) + } + } +} + +type APIConnServiceTest struct{} + +func (APIConnServiceTest) Run() { return } +func (APIConnServiceTest) Stop() error { return nil } +func (APIConnServiceTest) PodIndex(string) []interface{} { return nil } + +func (APIConnServiceTest) ServiceList() []*api.Service { + svcs := []*api.Service{ + { + ObjectMeta: api.ObjectMeta{ + Name: "svc1", + Namespace: "testns", + }, + Spec: api.ServiceSpec{ + ClusterIP: "10.0.0.1", + Ports: []api.ServicePort{{ + Name: "http", + Protocol: "tcp", + Port: 80, + }}, + }, + }, + { + ObjectMeta: api.ObjectMeta{ + Name: "hdls1", + Namespace: "testns", + }, + Spec: api.ServiceSpec{ + ClusterIP: api.ClusterIPNone, + }, + }, + { + ObjectMeta: api.ObjectMeta{ + Name: "external", + Namespace: "testns", + }, + Spec: api.ServiceSpec{ + ExternalName: "coredns.io", + Ports: []api.ServicePort{{ + Name: "http", + Protocol: "tcp", + Port: 80, + }}, + }, + }, + } + return svcs +} + +func (APIConnServiceTest) EndpointsList() api.EndpointsList { + n := "test.node.foo.bar" + + return api.EndpointsList{ + Items: []api.Endpoints{ + { + Subsets: []api.EndpointSubset{ + { + Addresses: []api.EndpointAddress{ + { + IP: "172.0.0.1", + Hostname: "ep1a", + }, + }, + Ports: []api.EndpointPort{ + { + Port: 80, + Protocol: "tcp", + Name: "http", + }, + }, + }, + }, + ObjectMeta: api.ObjectMeta{ + Name: "svc1", + Namespace: "testns", + }, + }, + { + Subsets: []api.EndpointSubset{ + { + Addresses: []api.EndpointAddress{ + { + IP: "172.0.0.2", + }, + }, + Ports: []api.EndpointPort{ + { + Port: 80, + Protocol: "tcp", + Name: "http", + }, + }, + }, + }, + ObjectMeta: api.ObjectMeta{ + Name: "hdls1", + Namespace: "testns", + }, + }, + { + Subsets: []api.EndpointSubset{ + { + Addresses: []api.EndpointAddress{ + { + IP: "172.0.0.3", + }, + }, + Ports: []api.EndpointPort{ + { + Port: 80, + Protocol: "tcp", + Name: "http", + }, + }, + }, + }, + ObjectMeta: api.ObjectMeta{ + Name: "hdls1", + Namespace: "testns", + }, + }, + { + Subsets: []api.EndpointSubset{ + { + Addresses: []api.EndpointAddress{ + { + IP: "10.9.8.7", + NodeName: &n, + }, + }, + }, + }, + }, + }, + } +} + +func (APIConnServiceTest) GetNodeByName(name string) (api.Node, error) { + return api.Node{ + ObjectMeta: api.ObjectMeta{ + Name: "test.node.foo.bar", + }, + }, nil +} + +func TestServices(t *testing.T) { + + k := New([]string{"interwebs.test."}) + k.APIConn = &APIConnServiceTest{} + + type svcAns struct { + host string + key string + } + type svcTest struct { + qname string + qtype uint16 + answer svcAns + } + tests := []svcTest{ + // Cluster IP Services + {qname: "svc1.testns.svc.interwebs.test.", qtype: dns.TypeA, answer: svcAns{host: "10.0.0.1", key: "/coredns/test/interwebs/svc/testns/svc1"}}, + {qname: "_http._tcp.svc1.testns.svc.interwebs.test.", qtype: dns.TypeSRV, answer: svcAns{host: "10.0.0.1", key: "/coredns/test/interwebs/svc/testns/svc1"}}, + {qname: "ep1a.svc1.testns.svc.interwebs.test.", qtype: dns.TypeA, answer: svcAns{host: "172.0.0.1", key: "/coredns/test/interwebs/svc/testns/svc1/ep1a"}}, + + // External Services + {qname: "external.testns.svc.interwebs.test.", qtype: dns.TypeCNAME, answer: svcAns{host: "coredns.io", key: "/coredns/test/interwebs/svc/testns/external"}}, + } + + for i, test := range tests { + state := request.Request{ + Req: &dns.Msg{Question: []dns.Question{{Name: test.qname, Qtype: test.qtype}}}, + Zone: "interwebs.test.", // must match from k.Zones[0] + } + svcs, e := k.Services(state, false, plugin.Options{}) + if e != nil { + t.Errorf("Test %d: got error '%v'", i, e) + continue + } + if len(svcs) != 1 { + t.Errorf("Test %d, expected expected 1 answer, got %v", i, len(svcs)) + continue + } + + if test.answer.host != svcs[0].Host { + t.Errorf("Test %d, expected host '%v', got '%v'", i, test.answer.host, svcs[0].Host) + } + if test.answer.key != svcs[0].Key { + t.Errorf("Test %d, expected key '%v', got '%v'", i, test.answer.key, svcs[0].Key) + } + } +} diff --git a/plugin/kubernetes/local.go b/plugin/kubernetes/local.go new file mode 100644 index 000000000..e5b7f1e0f --- /dev/null +++ b/plugin/kubernetes/local.go @@ -0,0 +1,40 @@ +package kubernetes + +import "net" + +func localPodIP() net.IP { + addrs, err := net.InterfaceAddrs() + if err != nil { + return nil + } + + for _, addr := range addrs { + ip, _, _ := net.ParseCIDR(addr.String()) + ip = ip.To4() + if ip == nil || ip.IsLoopback() { + continue + } + return ip + } + return nil +} + +func (k *Kubernetes) localNodeName() string { + localIP := k.interfaceAddrsFunc() + if localIP == nil { + return "" + } + + // Find endpoint matching localIP + endpointsList := k.APIConn.EndpointsList() + for _, ep := range endpointsList.Items { + for _, eps := range ep.Subsets { + for _, addr := range eps.Addresses { + if localIP.Equal(net.ParseIP(addr.IP)) { + return *addr.NodeName + } + } + } + } + return "" +} diff --git a/plugin/kubernetes/ns.go b/plugin/kubernetes/ns.go new file mode 100644 index 000000000..4cacc382f --- /dev/null +++ b/plugin/kubernetes/ns.go @@ -0,0 +1,65 @@ +package kubernetes + +import ( + "net" + "strings" + + "github.com/miekg/dns" + "k8s.io/client-go/1.5/pkg/api" +) + +func isDefaultNS(name, zone string) bool { + return strings.Index(name, defaultNSName) == 0 && strings.Index(name, zone) == len(defaultNSName) +} + +func (k *Kubernetes) nsAddr() *dns.A { + var ( + svcName string + svcNamespace string + ) + + rr := new(dns.A) + localIP := k.interfaceAddrsFunc() + endpointsList := k.APIConn.EndpointsList() + + rr.A = localIP + +FindEndpoint: + for _, ep := range endpointsList.Items { + for _, eps := range ep.Subsets { + for _, addr := range eps.Addresses { + if localIP.Equal(net.ParseIP(addr.IP)) { + svcNamespace = ep.ObjectMeta.Namespace + svcName = ep.ObjectMeta.Name + break FindEndpoint + } + } + } + } + + if len(svcName) == 0 { + rr.Hdr.Name = defaultNSName + rr.A = localIP + return rr + } + // Find service to get ClusterIP + serviceList := k.APIConn.ServiceList() + +FindService: + for _, svc := range serviceList { + if svcName == svc.Name && svcNamespace == svc.Namespace { + if svc.Spec.ClusterIP == api.ClusterIPNone { + rr.A = localIP + } else { + rr.A = net.ParseIP(svc.Spec.ClusterIP) + } + break FindService + } + } + + rr.Hdr.Name = strings.Join([]string{svcName, svcNamespace, "svc."}, ".") + + return rr +} + +const defaultNSName = "ns.dns." diff --git a/plugin/kubernetes/ns_test.go b/plugin/kubernetes/ns_test.go new file mode 100644 index 000000000..8e9e80c71 --- /dev/null +++ b/plugin/kubernetes/ns_test.go @@ -0,0 +1,69 @@ +package kubernetes + +import ( + "testing" + + "k8s.io/client-go/1.5/pkg/api" +) + +type APIConnTest struct{} + +func (APIConnTest) Run() { return } +func (APIConnTest) Stop() error { return nil } +func (APIConnTest) PodIndex(string) []interface{} { return nil } + +func (APIConnTest) ServiceList() []*api.Service { + svc := api.Service{ + ObjectMeta: api.ObjectMeta{ + Name: "dns-service", + Namespace: "kube-system", + }, + Spec: api.ServiceSpec{ + ClusterIP: "10.0.0.111", + }, + } + + return []*api.Service{&svc} + +} + +func (APIConnTest) EndpointsList() api.EndpointsList { + return api.EndpointsList{ + Items: []api.Endpoints{ + { + Subsets: []api.EndpointSubset{ + { + Addresses: []api.EndpointAddress{ + { + IP: "127.0.0.1", + }, + }, + }, + }, + ObjectMeta: api.ObjectMeta{ + Name: "dns-service", + Namespace: "kube-system", + }, + }, + }, + } +} + +func (APIConnTest) GetNodeByName(name string) (api.Node, error) { return api.Node{}, nil } + +func TestNsAddr(t *testing.T) { + + k := New([]string{"inter.webs.test."}) + k.APIConn = &APIConnTest{} + + cdr := k.nsAddr() + expected := "10.0.0.111" + + if cdr.A.String() != expected { + t.Errorf("Expected A to be %q, got %q", expected, cdr.A.String()) + } + expected = "dns-service.kube-system.svc." + if cdr.Hdr.Name != expected { + t.Errorf("Expected Hdr.Name to be %q, got %q", expected, cdr.Hdr.Name) + } +} diff --git a/plugin/kubernetes/parse.go b/plugin/kubernetes/parse.go new file mode 100644 index 000000000..a66e77699 --- /dev/null +++ b/plugin/kubernetes/parse.go @@ -0,0 +1,112 @@ +package kubernetes + +import ( + "github.com/coredns/coredns/plugin/pkg/dnsutil" + "github.com/coredns/coredns/request" + + "github.com/miekg/dns" +) + +type recordRequest struct { + // The named port from the kubernetes DNS spec, this is the service part (think _https) from a well formed + // SRV record. + port string + // The protocol is usually _udp or _tcp (if set), and comes from the protocol part of a well formed + // SRV record. + protocol string + endpoint string + // The servicename used in Kubernetes. + service string + // The namespace used in Kubernetes. + namespace string + // A each name can be for a pod or a service, here we track what we've seen, either "pod" or "service". + podOrSvc string +} + +// parseRequest parses the qname to find all the elements we need for querying k8s. Anything +// that is not parsed will have the wildcard "*" value (except r.endpoint). +// Potential underscores are stripped from _port and _protocol. +func parseRequest(state request.Request) (r recordRequest, err error) { + // 3 Possible cases: + // 1. _port._protocol.service.namespace.pod|svc.zone + // 2. (endpoint): endpoint.service.namespace.pod|svc.zone + // 3. (service): service.namespace.pod|svc.zone + // + // Federations are handled in the federation plugin. And aren't parsed here. + + base, _ := dnsutil.TrimZone(state.Name(), state.Zone) + segs := dns.SplitDomainName(base) + + r.port = "*" + r.protocol = "*" + r.service = "*" + r.namespace = "*" + // r.endpoint is the odd one out, we need to know if it has been set or not. If it is + // empty we should skip the endpoint check in k.get(). Hence we cannot set if to "*". + + // start at the right and fill out recordRequest with the bits we find, so we look for + // pod|svc.namespace.service and then either + // * endpoint + // *_protocol._port + + last := len(segs) - 1 + if last < 0 { + return r, nil + } + r.podOrSvc = segs[last] + if r.podOrSvc != Pod && r.podOrSvc != Svc { + return r, errInvalidRequest + } + last-- + if last < 0 { + return r, nil + } + + r.namespace = segs[last] + last-- + if last < 0 { + return r, nil + } + + r.service = segs[last] + last-- + if last < 0 { + return r, nil + } + + // Because of ambiquity we check the labels left: 1: an endpoint. 2: port and protocol. + // Anything else is a query that is too long to answer and can safely be delegated to return an nxdomain. + switch last { + + case 0: // endpoint only + r.endpoint = segs[last] + case 1: // service and port + r.protocol = stripUnderscore(segs[last]) + r.port = stripUnderscore(segs[last-1]) + + default: // too long + return r, errInvalidRequest + } + + return r, nil +} + +// stripUnderscore removes a prefixed underscore from s. +func stripUnderscore(s string) string { + if s[0] != '_' { + return s + } + return s[1:] +} + +// String return a string representation of r, it just returns all fields concatenated with dots. +// This is mostly used in tests. +func (r recordRequest) String() string { + s := r.port + s += "." + r.protocol + s += "." + r.endpoint + s += "." + r.service + s += "." + r.namespace + s += "." + r.podOrSvc + return s +} diff --git a/plugin/kubernetes/parse_test.go b/plugin/kubernetes/parse_test.go new file mode 100644 index 000000000..06d5a2aaa --- /dev/null +++ b/plugin/kubernetes/parse_test.go @@ -0,0 +1,56 @@ +package kubernetes + +import ( + "testing" + + "github.com/coredns/coredns/request" + + "github.com/miekg/dns" +) + +func TestParseRequest(t *testing.T) { + tests := []struct { + query string + expected string // output from r.String() + }{ + // valid SRV request + {"_http._tcp.webs.mynamespace.svc.inter.webs.test.", "http.tcp..webs.mynamespace.svc"}, + // wildcard acceptance + {"*.any.*.any.svc.inter.webs.test.", "*.any..*.any.svc"}, + // A request of endpoint + {"1-2-3-4.webs.mynamespace.svc.inter.webs.test.", "*.*.1-2-3-4.webs.mynamespace.svc"}, + } + for i, tc := range tests { + m := new(dns.Msg) + m.SetQuestion(tc.query, dns.TypeA) + state := request.Request{Zone: zone, Req: m} + + r, e := parseRequest(state) + if e != nil { + t.Errorf("Test %d, expected no error, got '%v'.", i, e) + } + rs := r.String() + if rs != tc.expected { + t.Errorf("Test %d, expected (stringyfied) recordRequest: %s, got %s", i, tc.expected, rs) + } + } +} + +func TestParseInvalidRequest(t *testing.T) { + invalid := []string{ + "webs.mynamespace.pood.inter.webs.test.", // Request must be for pod or svc subdomain. + "too.long.for.what.I.am.trying.to.pod.inter.webs.tests.", // Too long. + } + + for i, query := range invalid { + m := new(dns.Msg) + m.SetQuestion(query, dns.TypeA) + state := request.Request{Zone: zone, Req: m} + + if _, e := parseRequest(state); e == nil { + t.Errorf("Test %d: expected error from %s, got none", i, query) + } + } +} + +const zone = "intern.webs.tests." diff --git a/plugin/kubernetes/reverse.go b/plugin/kubernetes/reverse.go new file mode 100644 index 000000000..0143b721a --- /dev/null +++ b/plugin/kubernetes/reverse.go @@ -0,0 +1,55 @@ +package kubernetes + +import ( + "strings" + + "github.com/coredns/coredns/plugin" + "github.com/coredns/coredns/plugin/etcd/msg" + "github.com/coredns/coredns/plugin/pkg/dnsutil" + "github.com/coredns/coredns/request" +) + +// Reverse implements the ServiceBackend interface. +func (k *Kubernetes) Reverse(state request.Request, exact bool, opt plugin.Options) ([]msg.Service, error) { + + ip := dnsutil.ExtractAddressFromReverse(state.Name()) + if ip == "" { + return nil, nil + } + + records := k.serviceRecordForIP(ip, state.Name()) + return records, nil +} + +// serviceRecordForIP gets a service record with a cluster ip matching the ip argument +// If a service cluster ip does not match, it checks all endpoints +func (k *Kubernetes) serviceRecordForIP(ip, name string) []msg.Service { + // First check services with cluster ips + svcList := k.APIConn.ServiceList() + + for _, service := range svcList { + if (len(k.Namespaces) > 0) && !k.namespaceExposed(service.Namespace) { + continue + } + if service.Spec.ClusterIP == ip { + domain := strings.Join([]string{service.Name, service.Namespace, Svc, k.primaryZone()}, ".") + return []msg.Service{{Host: domain}} + } + } + // If no cluster ips match, search endpoints + epList := k.APIConn.EndpointsList() + for _, ep := range epList.Items { + if (len(k.Namespaces) > 0) && !k.namespaceExposed(ep.ObjectMeta.Namespace) { + continue + } + for _, eps := range ep.Subsets { + for _, addr := range eps.Addresses { + if addr.IP == ip { + domain := strings.Join([]string{endpointHostname(addr), ep.ObjectMeta.Name, ep.ObjectMeta.Namespace, Svc, k.primaryZone()}, ".") + return []msg.Service{{Host: domain}} + } + } + } + } + return nil +} diff --git a/plugin/kubernetes/reverse_test.go b/plugin/kubernetes/reverse_test.go new file mode 100644 index 000000000..aa9d09585 --- /dev/null +++ b/plugin/kubernetes/reverse_test.go @@ -0,0 +1,125 @@ +package kubernetes + +import ( + "testing" + + "github.com/coredns/coredns/plugin/pkg/dnsrecorder" + "github.com/coredns/coredns/plugin/test" + + "github.com/miekg/dns" + "golang.org/x/net/context" + "k8s.io/client-go/1.5/pkg/api" +) + +type APIConnReverseTest struct{} + +func (APIConnReverseTest) Run() { return } +func (APIConnReverseTest) Stop() error { return nil } +func (APIConnReverseTest) PodIndex(string) []interface{} { return nil } + +func (APIConnReverseTest) ServiceList() []*api.Service { + svcs := []*api.Service{ + { + ObjectMeta: api.ObjectMeta{ + Name: "svc1", + Namespace: "testns", + }, + Spec: api.ServiceSpec{ + ClusterIP: "192.168.1.100", + Ports: []api.ServicePort{{ + Name: "http", + Protocol: "tcp", + Port: 80, + }}, + }, + }, + } + return svcs +} + +func (APIConnReverseTest) EndpointsList() api.EndpointsList { + return api.EndpointsList{ + Items: []api.Endpoints{ + { + Subsets: []api.EndpointSubset{ + { + Addresses: []api.EndpointAddress{ + { + IP: "10.0.0.100", + Hostname: "ep1a", + }, + }, + Ports: []api.EndpointPort{ + { + Port: 80, + Protocol: "tcp", + Name: "http", + }, + }, + }, + }, + ObjectMeta: api.ObjectMeta{ + Name: "svc1", + Namespace: "testns", + }, + }, + }, + } +} + +func (APIConnReverseTest) GetNodeByName(name string) (api.Node, error) { + return api.Node{ + ObjectMeta: api.ObjectMeta{ + Name: "test.node.foo.bar", + }, + }, nil +} + +func TestReverse(t *testing.T) { + + k := New([]string{"cluster.local.", "0.10.in-addr.arpa."}) + k.APIConn = &APIConnReverseTest{} + + tests := []test.Case{ + { + Qname: "100.0.0.10.in-addr.arpa.", Qtype: dns.TypePTR, + Rcode: dns.RcodeSuccess, + Answer: []dns.RR{ + test.PTR("100.0.0.10.in-addr.arpa. 303 IN PTR ep1a.svc1.testns.svc.cluster.local."), + }, + }, + { + Qname: "101.0.0.10.in-addr.arpa.", Qtype: dns.TypePTR, + Rcode: dns.RcodeSuccess, + Ns: []dns.RR{ + test.SOA("0.10.in-addr.arpa. 300 IN SOA ns.dns.0.10.in-addr.arpa. hostmaster.0.10.in-addr.arpa. 1502782828 7200 1800 86400 60"), + }, + }, + { + Qname: "example.org.cluster.local.", Qtype: dns.TypePTR, + Rcode: dns.RcodeSuccess, + Ns: []dns.RR{ + test.SOA("cluster.local. 300 IN SOA ns.dns.cluster.local. hostmaster.cluster.local. 1502989566 7200 1800 86400 60"), + }, + }, + } + + ctx := context.TODO() + for i, tc := range tests { + r := tc.Msg() + + w := dnsrecorder.New(&test.ResponseWriter{}) + + _, err := k.ServeDNS(ctx, w, r) + if err != tc.Error { + t.Errorf("Test %d: expected no error, got %v", i, err) + return + } + + resp := w.Msg + if resp == nil { + t.Fatalf("Test %d: got nil message and no error for: %s %d", i, r.Question[0].Name, r.Question[0].Qtype) + } + test.SortAndCheck(t, resp, tc) + } +} diff --git a/plugin/kubernetes/setup.go b/plugin/kubernetes/setup.go new file mode 100644 index 000000000..e60239d42 --- /dev/null +++ b/plugin/kubernetes/setup.go @@ -0,0 +1,208 @@ +package kubernetes + +import ( + "errors" + "fmt" + "strconv" + "strings" + "time" + + "github.com/coredns/coredns/core/dnsserver" + "github.com/coredns/coredns/plugin" + "github.com/coredns/coredns/plugin/pkg/dnsutil" + "github.com/coredns/coredns/plugin/proxy" + "github.com/miekg/dns" + + "github.com/mholt/caddy" + unversionedapi "k8s.io/client-go/1.5/pkg/api/unversioned" +) + +func init() { + caddy.RegisterPlugin("kubernetes", caddy.Plugin{ + ServerType: "dns", + Action: setup, + }) +} + +func setup(c *caddy.Controller) error { + kubernetes, initOpts, err := kubernetesParse(c) + if err != nil { + return plugin.Error("kubernetes", err) + } + + err = kubernetes.initKubeCache(initOpts) + if err != nil { + return plugin.Error("kubernetes", err) + } + + // Register KubeCache start and stop functions with Caddy + c.OnStartup(func() error { + go kubernetes.APIConn.Run() + if kubernetes.APIProxy != nil { + go kubernetes.APIProxy.Run() + } + return nil + }) + + c.OnShutdown(func() error { + if kubernetes.APIProxy != nil { + kubernetes.APIProxy.Stop() + } + return kubernetes.APIConn.Stop() + }) + + dnsserver.GetConfig(c).AddPlugin(func(next plugin.Handler) plugin.Handler { + kubernetes.Next = next + return kubernetes + }) + + return nil +} + +func kubernetesParse(c *caddy.Controller) (*Kubernetes, dnsControlOpts, error) { + k8s := New([]string{""}) + k8s.interfaceAddrsFunc = localPodIP + k8s.autoPathSearch = searchFromResolvConf() + + opts := dnsControlOpts{ + resyncPeriod: defaultResyncPeriod, + } + + for c.Next() { + zones := c.RemainingArgs() + + if len(zones) != 0 { + k8s.Zones = zones + for i := 0; i < len(k8s.Zones); i++ { + k8s.Zones[i] = plugin.Host(k8s.Zones[i]).Normalize() + } + } else { + k8s.Zones = make([]string, len(c.ServerBlockKeys)) + for i := 0; i < len(c.ServerBlockKeys); i++ { + k8s.Zones[i] = plugin.Host(c.ServerBlockKeys[i]).Normalize() + } + } + + k8s.primaryZoneIndex = -1 + for i, z := range k8s.Zones { + if strings.HasSuffix(z, "in-addr.arpa.") || strings.HasSuffix(z, "ip6.arpa.") { + continue + } + k8s.primaryZoneIndex = i + break + } + + if k8s.primaryZoneIndex == -1 { + return nil, opts, errors.New("non-reverse zone name must be used") + } + + for c.NextBlock() { + switch c.Val() { + case "pods": + args := c.RemainingArgs() + if len(args) == 1 { + switch args[0] { + case podModeDisabled, podModeInsecure, podModeVerified: + k8s.podMode = args[0] + default: + return nil, opts, fmt.Errorf("wrong value for pods: %s, must be one of: disabled, verified, insecure", args[0]) + } + continue + } + return nil, opts, c.ArgErr() + case "namespaces": + args := c.RemainingArgs() + if len(args) > 0 { + for _, a := range args { + k8s.Namespaces[a] = true + } + continue + } + return nil, opts, c.ArgErr() + case "endpoint": + args := c.RemainingArgs() + if len(args) > 0 { + for _, endpoint := range strings.Split(args[0], ",") { + k8s.APIServerList = append(k8s.APIServerList, strings.TrimSpace(endpoint)) + } + continue + } + return nil, opts, c.ArgErr() + case "tls": // cert key cacertfile + args := c.RemainingArgs() + if len(args) == 3 { + k8s.APIClientCert, k8s.APIClientKey, k8s.APICertAuth = args[0], args[1], args[2] + continue + } + return nil, opts, c.ArgErr() + case "resyncperiod": + args := c.RemainingArgs() + if len(args) > 0 { + rp, err := time.ParseDuration(args[0]) + if err != nil { + return nil, opts, fmt.Errorf("unable to parse resync duration value: '%v': %v", args[0], err) + } + opts.resyncPeriod = rp + continue + } + return nil, opts, c.ArgErr() + case "labels": + args := c.RemainingArgs() + if len(args) > 0 { + labelSelectorString := strings.Join(args, " ") + ls, err := unversionedapi.ParseToLabelSelector(labelSelectorString) + if err != nil { + return nil, opts, fmt.Errorf("unable to parse label selector value: '%v': %v", labelSelectorString, err) + } + opts.labelSelector = ls + continue + } + return nil, opts, c.ArgErr() + case "fallthrough": + args := c.RemainingArgs() + if len(args) == 0 { + k8s.Fallthrough = true + continue + } + return nil, opts, c.ArgErr() + case "upstream": + args := c.RemainingArgs() + if len(args) == 0 { + return nil, opts, c.ArgErr() + } + ups, err := dnsutil.ParseHostPortOrFile(args...) + if err != nil { + return nil, opts, err + } + k8s.Proxy = proxy.NewLookup(ups) + case "ttl": + args := c.RemainingArgs() + if len(args) == 0 { + return nil, opts, c.ArgErr() + } + t, err := strconv.Atoi(args[0]) + if err != nil { + return nil, opts, err + } + if t < 5 || t > 3600 { + return nil, opts, c.Errf("ttl must be in range [5, 3600]: %d", t) + } + k8s.ttl = uint32(t) + default: + return nil, opts, c.Errf("unknown property '%s'", c.Val()) + } + } + } + return k8s, opts, nil +} + +func searchFromResolvConf() []string { + rc, err := dns.ClientConfigFromFile("/etc/resolv.conf") + if err != nil { + return nil + } + plugin.Zones(rc.Search).Normalize() + return rc.Search +} + +const defaultResyncPeriod = 5 * time.Minute diff --git a/plugin/kubernetes/setup_reverse_test.go b/plugin/kubernetes/setup_reverse_test.go new file mode 100644 index 000000000..ed51a7410 --- /dev/null +++ b/plugin/kubernetes/setup_reverse_test.go @@ -0,0 +1,35 @@ +package kubernetes + +import ( + "testing" + + "github.com/mholt/caddy" +) + +func TestKubernetesParseReverseZone(t *testing.T) { + tests := []struct { + input string // Corefile data as string + expectedZones []string // expected count of defined zones. + }{ + {`kubernetes coredns.local 10.0.0.0/16`, []string{"coredns.local.", "0.10.in-addr.arpa."}}, + {`kubernetes coredns.local 10.0.0.0/17`, []string{"coredns.local.", "10.0.0.0/17."}}, + } + + for i, tc := range tests { + c := caddy.NewTestController("dns", tc.input) + k, _, err := kubernetesParse(c) + if err != nil { + t.Fatalf("Test %d: Expected no error, got %q", i, err) + } + + zl := len(k.Zones) + if zl != len(tc.expectedZones) { + t.Errorf("Test %d: Expected kubernetes to be initialized with %d zones, found %d zones", i, len(tc.expectedZones), zl) + } + for i, z := range tc.expectedZones { + if k.Zones[i] != z { + t.Errorf("Test %d: Expected zones to be %q, got %q", i, z, k.Zones[i]) + } + } + } +} diff --git a/plugin/kubernetes/setup_test.go b/plugin/kubernetes/setup_test.go new file mode 100644 index 000000000..2fdc38a9c --- /dev/null +++ b/plugin/kubernetes/setup_test.go @@ -0,0 +1,473 @@ +package kubernetes + +import ( + "strings" + "testing" + "time" + + "github.com/mholt/caddy" + "k8s.io/client-go/1.5/pkg/api/unversioned" +) + +func TestKubernetesParse(t *testing.T) { + tests := []struct { + input string // Corefile data as string + shouldErr bool // true if test case is exected to produce an error. + expectedErrContent string // substring from the expected error. Empty for positive cases. + expectedZoneCount int // expected count of defined zones. + expectedNSCount int // expected count of namespaces. + expectedResyncPeriod time.Duration // expected resync period value + expectedLabelSelector string // expected label selector value + expectedPodMode string + expectedFallthrough bool + expectedUpstreams []string + }{ + // positive + { + `kubernetes coredns.local`, + false, + "", + 1, + 0, + defaultResyncPeriod, + "", + podModeDisabled, + false, + nil, + }, + { + `kubernetes coredns.local test.local`, + false, + "", + 2, + 0, + defaultResyncPeriod, + "", + podModeDisabled, + false, + nil, + }, + { + `kubernetes coredns.local { +}`, + false, + "", + 1, + 0, + defaultResyncPeriod, + "", + podModeDisabled, + false, + nil, + }, + { + `kubernetes coredns.local { + endpoint http://localhost:9090 +}`, + false, + "", + 1, + 0, + defaultResyncPeriod, + "", + podModeDisabled, + false, + nil, + }, + { + `kubernetes coredns.local { + namespaces demo +}`, + false, + "", + 1, + 1, + defaultResyncPeriod, + "", + podModeDisabled, + false, + nil, + }, + { + `kubernetes coredns.local { + namespaces demo test +}`, + false, + "", + 1, + 2, + defaultResyncPeriod, + "", + podModeDisabled, + false, + nil, + }, + { + `kubernetes coredns.local { + resyncperiod 30s +}`, + false, + "", + 1, + 0, + 30 * time.Second, + "", + podModeDisabled, + false, + nil, + }, + { + `kubernetes coredns.local { + resyncperiod 15m +}`, + false, + "", + 1, + 0, + 15 * time.Minute, + "", + podModeDisabled, + false, + nil, + }, + { + `kubernetes coredns.local { + labels environment=prod +}`, + false, + "", + 1, + 0, + defaultResyncPeriod, + "environment=prod", + podModeDisabled, + false, + nil, + }, + { + `kubernetes coredns.local { + labels environment in (production, staging, qa),application=nginx +}`, + false, + "", + 1, + 0, + defaultResyncPeriod, + "application=nginx,environment in (production,qa,staging)", + podModeDisabled, + false, + nil, + }, + { + `kubernetes coredns.local test.local { + resyncperiod 15m + endpoint http://localhost:8080 + namespaces demo test + labels environment in (production, staging, qa),application=nginx + fallthrough +}`, + false, + "", + 2, + 2, + 15 * time.Minute, + "application=nginx,environment in (production,qa,staging)", + podModeDisabled, + true, + nil, + }, + // negative + { + `kubernetes coredns.local { + endpoint +}`, + true, + "rong argument count or unexpected line ending", + -1, + -1, + defaultResyncPeriod, + "", + podModeDisabled, + false, + nil, + }, + { + `kubernetes coredns.local { + namespaces +}`, + true, + "rong argument count or unexpected line ending", + -1, + -1, + defaultResyncPeriod, + "", + podModeDisabled, + false, + nil, + }, + { + `kubernetes coredns.local { + resyncperiod +}`, + true, + "rong argument count or unexpected line ending", + -1, + 0, + 0 * time.Minute, + "", + podModeDisabled, + false, + nil, + }, + { + `kubernetes coredns.local { + resyncperiod 15 +}`, + true, + "unable to parse resync duration value", + -1, + 0, + 0 * time.Second, + "", + podModeDisabled, + false, + nil, + }, + { + `kubernetes coredns.local { + resyncperiod abc +}`, + true, + "unable to parse resync duration value", + -1, + 0, + 0 * time.Second, + "", + podModeDisabled, + false, + nil, + }, + { + `kubernetes coredns.local { + labels +}`, + true, + "rong argument count or unexpected line ending", + -1, + 0, + 0 * time.Second, + "", + podModeDisabled, + false, + nil, + }, + { + `kubernetes coredns.local { + labels environment in (production, qa +}`, + true, + "unable to parse label selector", + -1, + 0, + 0 * time.Second, + "", + podModeDisabled, + false, + nil, + }, + // pods disabled + { + `kubernetes coredns.local { + pods disabled +}`, + false, + "", + 1, + 0, + defaultResyncPeriod, + "", + podModeDisabled, + false, + nil, + }, + // pods insecure + { + `kubernetes coredns.local { + pods insecure +}`, + false, + "", + 1, + 0, + defaultResyncPeriod, + "", + podModeInsecure, + false, + nil, + }, + // pods verified + { + `kubernetes coredns.local { + pods verified +}`, + false, + "", + 1, + 0, + defaultResyncPeriod, + "", + podModeVerified, + false, + nil, + }, + // pods invalid + { + `kubernetes coredns.local { + pods giant_seed +}`, + true, + "rong value for pods", + -1, + 0, + defaultResyncPeriod, + "", + podModeVerified, + false, + nil, + }, + // fallthrough invalid + { + `kubernetes coredns.local { + fallthrough junk +}`, + true, + "rong argument count", + -1, + 0, + defaultResyncPeriod, + "", + podModeDisabled, + false, + nil, + }, + // Valid upstream + { + `kubernetes coredns.local { + upstream 13.14.15.16:53 +}`, + false, + "", + 1, + 0, + defaultResyncPeriod, + "", + podModeDisabled, + false, + []string{"13.14.15.16:53"}, + }, + // Invalid upstream + { + `kubernetes coredns.local { + upstream 13.14.15.16orange +}`, + true, + "not an IP address or file: \"13.14.15.16orange\"", + -1, + 0, + defaultResyncPeriod, + "", + podModeDisabled, + false, + nil, + }, + } + + for i, test := range tests { + c := caddy.NewTestController("dns", test.input) + k8sController, opts, err := kubernetesParse(c) + + if test.shouldErr && err == nil { + t.Errorf("Test %d: Expected error, but did not find error for input '%s'. Error was: '%v'", i, test.input, err) + } + + if err != nil { + if !test.shouldErr { + t.Errorf("Test %d: Expected no error but found one for input %s. Error was: %v", i, test.input, err) + continue + } + + if test.shouldErr && (len(test.expectedErrContent) < 1) { + t.Fatalf("Test %d: Test marked as expecting an error, but no expectedErrContent provided for input '%s'. Error was: '%v'", i, test.input, err) + } + + if test.shouldErr && (test.expectedZoneCount >= 0) { + t.Errorf("Test %d: Test marked as expecting an error, but provides value for expectedZoneCount!=-1 for input '%s'. Error was: '%v'", i, test.input, err) + } + + if !strings.Contains(err.Error(), test.expectedErrContent) { + t.Errorf("Test %d: Expected error to contain: %v, found error: %v, input: %s", i, test.expectedErrContent, err, test.input) + } + continue + } + + // No error was raised, so validate initialization of k8sController + // Zones + foundZoneCount := len(k8sController.Zones) + if foundZoneCount != test.expectedZoneCount { + t.Errorf("Test %d: Expected kubernetes controller to be initialized with %d zones, instead found %d zones: '%v' for input '%s'", i, test.expectedZoneCount, foundZoneCount, k8sController.Zones, test.input) + } + + // Namespaces + foundNSCount := len(k8sController.Namespaces) + if foundNSCount != test.expectedNSCount { + t.Errorf("Test %d: Expected kubernetes controller to be initialized with %d namespaces. Instead found %d namespaces: '%v' for input '%s'", i, test.expectedNSCount, foundNSCount, k8sController.Namespaces, test.input) + } + + // ResyncPeriod + foundResyncPeriod := opts.resyncPeriod + if foundResyncPeriod != test.expectedResyncPeriod { + t.Errorf("Test %d: Expected kubernetes controller to be initialized with resync period '%s'. Instead found period '%s' for input '%s'", i, test.expectedResyncPeriod, foundResyncPeriod, test.input) + } + + // Labels + if opts.labelSelector != nil { + foundLabelSelectorString := unversioned.FormatLabelSelector(opts.labelSelector) + if foundLabelSelectorString != test.expectedLabelSelector { + t.Errorf("Test %d: Expected kubernetes controller to be initialized with label selector '%s'. Instead found selector '%s' for input '%s'", i, test.expectedLabelSelector, foundLabelSelectorString, test.input) + } + } + // Pods + foundPodMode := k8sController.podMode + if foundPodMode != test.expectedPodMode { + t.Errorf("Test %d: Expected kubernetes controller to be initialized with pod mode '%s'. Instead found pod mode '%s' for input '%s'", i, test.expectedPodMode, foundPodMode, test.input) + } + + // fallthrough + foundFallthrough := k8sController.Fallthrough + if foundFallthrough != test.expectedFallthrough { + t.Errorf("Test %d: Expected kubernetes controller to be initialized with fallthrough '%v'. Instead found fallthrough '%v' for input '%s'", i, test.expectedFallthrough, foundFallthrough, test.input) + } + // upstream + foundUpstreams := k8sController.Proxy.Upstreams + if test.expectedUpstreams == nil { + if foundUpstreams != nil { + t.Errorf("Test %d: Expected kubernetes controller to not be initialized with upstreams for input '%s'", i, test.input) + } + } else { + if foundUpstreams == nil { + t.Errorf("Test %d: Expected kubernetes controller to be initialized with upstreams for input '%s'", i, test.input) + } else { + if len(*foundUpstreams) != len(test.expectedUpstreams) { + t.Errorf("Test %d: Expected kubernetes controller to be initialized with %d upstreams. Instead found %d upstreams for input '%s'", i, len(test.expectedUpstreams), len(*foundUpstreams), test.input) + } + for j, want := range test.expectedUpstreams { + got := (*foundUpstreams)[j].Select().Name + if got != want { + t.Errorf("Test %d: Expected kubernetes controller to be initialized with upstream '%s'. Instead found upstream '%s' for input '%s'", i, want, got, test.input) + } + } + + } + } + } +} diff --git a/plugin/kubernetes/setup_ttl_test.go b/plugin/kubernetes/setup_ttl_test.go new file mode 100644 index 000000000..d58f91576 --- /dev/null +++ b/plugin/kubernetes/setup_ttl_test.go @@ -0,0 +1,45 @@ +package kubernetes + +import ( + "testing" + + "github.com/mholt/caddy" +) + +func TestKubernetesParseTTL(t *testing.T) { + tests := []struct { + input string // Corefile data as string + expectedTTL uint32 // expected count of defined zones. + shouldErr bool + }{ + {`kubernetes cluster.local { + ttl 56 + }`, 56, false}, + {`kubernetes cluster.local`, defaultTTL, false}, + {`kubernetes cluster.local { + ttl -1 + }`, 0, true}, + {`kubernetes cluster.local { + ttl 3601 + }`, 0, true}, + } + + for i, tc := range tests { + c := caddy.NewTestController("dns", tc.input) + k, _, err := kubernetesParse(c) + if err != nil && !tc.shouldErr { + t.Fatalf("Test %d: Expected no error, got %q", i, err) + } + if err == nil && tc.shouldErr { + t.Fatalf("Test %d: Expected error, got none", i) + } + if err != nil && tc.shouldErr { + // input should error + continue + } + + if k.ttl != tc.expectedTTL { + t.Errorf("Test %d: Expected TTl to be %d, got %d", i, tc.expectedTTL, k.ttl) + } + } +} |