aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar Michael Richmond <mrichmon@users.noreply.github.com> 2016-08-05 18:19:51 -0700
committerGravatar GitHub <noreply@github.com> 2016-08-05 18:19:51 -0700
commit6d90b745e0faf5b73472e918cd72965dc0d93348 (patch)
tree9b86325e955a9fe5759c78f90dbe20eadbf5093d
parent604d2a3730fb0ce0939f19f8ec545e7719226f67 (diff)
downloadcoredns-6d90b745e0faf5b73472e918cd72965dc0d93348.tar.gz
coredns-6d90b745e0faf5b73472e918cd72965dc0d93348.tar.zst
coredns-6d90b745e0faf5b73472e918cd72965dc0d93348.zip
Switch over to k8s notification API (#202)
* Merge notification code by @aledbf and update for recent changes. * Fix travis environment to correctly build with k8s.io and forked repositories. * Refactored kubernetes Corefile parser * Added lots of Corefile parsing tests
-rw-r--r--.travis.yml16
-rw-r--r--Makefile18
-rw-r--r--core/setup/kubernetes.go93
-rw-r--r--core/setup/kubernetes_test.go122
-rw-r--r--kubernetes-rc.yaml70
-rw-r--r--middleware/kubernetes/README.md26
-rw-r--r--middleware/kubernetes/controller.go195
-rw-r--r--middleware/kubernetes/handler.go25
-rw-r--r--middleware/kubernetes/kubernetes.go173
-rw-r--r--middleware/kubernetes/lookup.go65
-rw-r--r--middleware/kubernetes/msg/service.go5
-rwxr-xr-xmiddleware/kubernetes/test/20_setup_k8s_services.sh2
-rw-r--r--middleware/kubernetes/util/util.go16
13 files changed, 599 insertions, 227 deletions
diff --git a/.travis.yml b/.travis.yml
index 30f7a61c0..f146d5f32 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -25,15 +25,25 @@ before_install:
- env
before_script:
+ # Fix repo pathname for golang imports.
+ # When building in a forked repo the import path will be incorrect.
+ # Fix is to detect this case and create a symbolic link for the real import name.
+ # Note: This assumes that both the upstream "real" repo and the fork are hosted
+ # at the same domain. (eg. github.com)
+ - ( export UPSTREAM="miekg/coredns" && export REPONAME=`pwd | rev | cut -d "/" -f 1-2 | rev` && test "$REPO" != "$UPSTREAM" && mkdir -p ../../`echo $UPSTREAM | cut -d "/" -f 1` && ln -s ../$REPONAME ../../$UPSTREAM )
+ # Download etcd, unpack and launch
- curl -L https://github.com/coreos/etcd/releases/download/v2.3.1/etcd-v2.3.1-linux-amd64.tar.gz -o etcd-v2.3.1-linux-amd64.tar.gz
- tar xzvf etcd-v2.3.1-linux-amd64.tar.gz
- ./etcd-v2.3.1-linux-amd64/etcd &
- - go get
- - go get github.com/coreos/go-etcd/etcd
+ # If docker is available, pull the kubernetes hyperkube image down and launch kubernetes.
- if which docker &>/dev/null ; then docker pull gcr.io/google_containers/hyperkube-amd64:v1.2.4 ; docker ps -a ; fi
- - pwd
- if which docker &>/dev/null ; then ./middleware/kubernetes/test/00_run_k8s.sh && ./middleware/kubernetes/test/10_setup_kubectl.sh && ./middleware/kubernetes/test/20_setup_k8s_services.sh ; docker ps -a ; fi
+ # Get golang dependencies, and build coredns binary
+ - go get -v -d
+ - go get github.com/coreos/go-etcd/etcd
+ - go build -v -ldflags="-s -w"
script:
- go test -tags etcd -race -bench=. ./...
+ # Run kubernetes integration tests only if kubectl is available. i.e. If kubernetes was launched
- ./middleware/kubernetes/test/kubectl version && go test -tags k8s -race -bench=. -run 'TestK8sIntegration' ./test
diff --git a/Makefile b/Makefile
index db2658695..d8cf42b45 100644
--- a/Makefile
+++ b/Makefile
@@ -4,13 +4,16 @@ BUILD_VERBOSE := -v
TEST_VERBOSE :=
TEST_VERBOSE := -v
+DOCKER_IMAGE_NAME := $$USER/coredns
+
+
all:
- go build $(BUILD_VERBOSE)
+ go build $(BUILD_VERBOSE) -ldflags="-s -w"
.PHONY: docker
-docker:
- GOOS=linux go build -a -tags netgo -installsuffix netgo
- docker build -t $$USER/coredns .
+docker: all
+ GOOS=linux go build -a -tags netgo -installsuffix netgo -ldflags="-s -w"
+ docker build -t $(DOCKER_IMAGE_NAME) .
.PHONY: deps
deps:
@@ -22,8 +25,15 @@ test:
.PHONY: testk8s
testk8s:
+ # With -args --v=100 the k8s API response data will be printed in the log:
+ #go test $(TEST_VERBOSE) -tags=k8s -run 'TestK8sIntegration' ./test -args --v=100
+ # Without the k8s API response data:
go test $(TEST_VERBOSE) -tags=k8s -run 'TestK8sIntegration' ./test
+.PHONY: testk8s-setup
+testk8s-setup:
+ go test -v ./core/setup -run TestKubernetes
+
.PHONY: clean
clean:
go clean
diff --git a/core/setup/kubernetes.go b/core/setup/kubernetes.go
index 2f8b5be44..adfbe01b0 100644
--- a/core/setup/kubernetes.go
+++ b/core/setup/kubernetes.go
@@ -1,31 +1,33 @@
package setup
import (
+ "errors"
"log"
"strings"
+ "time"
"github.com/miekg/coredns/middleware"
"github.com/miekg/coredns/middleware/kubernetes"
- k8sc "github.com/miekg/coredns/middleware/kubernetes/k8sclient"
"github.com/miekg/coredns/middleware/kubernetes/nametemplate"
- "github.com/miekg/coredns/middleware/proxy"
)
const (
- defaultK8sEndpoint = "http://localhost:8080"
defaultNameTemplate = "{service}.{namespace}.{zone}"
+ defaultResyncPeriod = 5 * time.Minute
)
// Kubernetes sets up the kubernetes middleware.
func Kubernetes(c *Controller) (middleware.Middleware, error) {
- log.Printf("[debug] controller %v\n", c)
- // TODO: Determine if subzone support required
-
kubernetes, err := kubernetesParse(c)
+ if err != nil {
+ return nil, err
+ }
+ err = kubernetes.StartKubeCache()
if err != nil {
return nil, err
}
+ log.Printf("[debug] after parse and start KubeCache, APIconn is: %v", kubernetes.APIConn)
return func(next middleware.Handler) middleware.Handler {
kubernetes.Next = next
@@ -34,78 +36,73 @@ func Kubernetes(c *Controller) (middleware.Middleware, error) {
}
func kubernetesParse(c *Controller) (kubernetes.Kubernetes, error) {
+ var err error
+ template := defaultNameTemplate
+
k8s := kubernetes.Kubernetes{
- Proxy: proxy.New([]string{}),
+ ResyncPeriod: defaultResyncPeriod,
}
- var (
- endpoints = []string{defaultK8sEndpoint}
- template = defaultNameTemplate
- namespaces = []string{}
- )
-
- k8s.APIConn = k8sc.NewK8sConnector(endpoints[0])
k8s.NameTemplate = new(nametemplate.NameTemplate)
k8s.NameTemplate.SetTemplate(template)
+ // TODO: expose resync period in Corefile
+
for c.Next() {
if c.Val() == "kubernetes" {
zones := c.RemainingArgs()
+ log.Printf("[debug] Zones: %v", zones)
if len(zones) == 0 {
k8s.Zones = c.ServerBlockHosts
+ log.Printf("[debug] Zones(from ServerBlockHosts): %v", zones)
} else {
// Normalize requested zones
k8s.Zones = kubernetes.NormalizeZoneList(zones)
}
- // TODO: clean this parsing up
-
middleware.Zones(k8s.Zones).FullyQualify()
+ if k8s.Zones == nil || len(k8s.Zones) < 1 {
+ err = errors.New("Zone name must be provided for kubernetes middleware.")
+ log.Printf("[debug] %v\n", err)
+ return kubernetes.Kubernetes{}, err
+ }
- log.Printf("[debug] c data: %v\n", c)
-
- if c.NextBlock() {
- // TODO(miek): 2 switches?
+ for c.NextBlock() {
switch c.Val() {
- case "endpoint":
+ case "template":
args := c.RemainingArgs()
- if len(args) == 0 {
+ if len(args) != 0 {
+ template := strings.Join(args, "")
+ err = k8s.NameTemplate.SetTemplate(template)
+ if err != nil {
+ return kubernetes.Kubernetes{}, err
+ }
+ } else {
+ log.Printf("[debug] 'template' keyword provided without any template value.")
return kubernetes.Kubernetes{}, c.ArgErr()
}
- endpoints = args
- k8s.APIConn = k8sc.NewK8sConnector(endpoints[0])
case "namespaces":
args := c.RemainingArgs()
- if len(args) == 0 {
+ if len(args) != 0 {
+ k8s.Namespaces = append(k8s.Namespaces, args...)
+ } else {
+ log.Printf("[debug] 'namespaces' keyword provided without any namespace values.")
return kubernetes.Kubernetes{}, c.ArgErr()
}
- namespaces = args
- k8s.Namespaces = append(k8s.Namespaces, namespaces...)
- }
- for c.Next() {
- switch c.Val() {
- case "template":
- args := c.RemainingArgs()
- if len(args) == 0 {
- return kubernetes.Kubernetes{}, c.ArgErr()
- }
- template = strings.Join(args, "")
- err := k8s.NameTemplate.SetTemplate(template)
- if err != nil {
- return kubernetes.Kubernetes{}, err
- }
- case "namespaces":
- args := c.RemainingArgs()
- if len(args) == 0 {
- return kubernetes.Kubernetes{}, c.ArgErr()
- }
- namespaces = args
- k8s.Namespaces = append(k8s.Namespaces, namespaces...)
+ case "endpoint":
+ args := c.RemainingArgs()
+ if len(args) != 0 {
+ k8s.APIEndpoint = args[0]
+ } else {
+ log.Printf("[debug] 'endpoint' keyword provided without any endpoint url value.")
+ return kubernetes.Kubernetes{}, c.ArgErr()
}
}
}
return k8s, nil
}
}
- return kubernetes.Kubernetes{}, nil
+ err = errors.New("Kubernetes setup called without keyword 'kubernetes' in Corefile")
+ log.Printf("[ERROR] %v\n", err)
+ return kubernetes.Kubernetes{}, err
}
diff --git a/core/setup/kubernetes_test.go b/core/setup/kubernetes_test.go
index b0141bab0..af7f216d9 100644
--- a/core/setup/kubernetes_test.go
+++ b/core/setup/kubernetes_test.go
@@ -5,55 +5,49 @@ import (
"testing"
)
-/*
-kubernetes coredns.local {
- # Use url for k8s API endpoint
- endpoint http://localhost:8080
- # Assemble k8s record names with the template
- template {service}.{namespace}.{zone}
- # Only expose the k8s namespace "demo"
- #namespaces demo
- }
-*/
-
func TestKubernetesParse(t *testing.T) {
tests := []struct {
+ description string
input string
shouldErr bool
expectedErrContent string // substring from the expected error. Empty for positive cases.
- expectedZoneCount int // expected count of defined zones. '-1' for negative cases.
+ expectedZoneCount int // expected count of defined zones.
expectedNTValid bool // NameTemplate to be initialized and valid
- expectedNSCount int // expected count of namespaces. '-1' for negative cases.
+ expectedNSCount int // expected count of namespaces.
}{
// positive
- // TODO: not specifiying a zone maybe should error out.
{
- `kubernetes`,
+ "kubernetes keyword with one zone",
+ `kubernetes coredns.local`,
false,
"",
- 0,
+ 1,
true,
0,
},
{
- `kubernetes coredns.local`,
+ "kubernetes keyword with multiple zones",
+ `kubernetes coredns.local test.local`,
false,
"",
- 1,
+ 2,
true,
0,
},
{
- `kubernetes coredns.local test.local`,
+ "kubernetes keyword with zone and empty braces",
+ `kubernetes coredns.local {
+}`,
false,
"",
- 2,
+ 1,
true,
0,
},
{
+ "endpoint keyword with url",
`kubernetes coredns.local {
- endpoint http://localhost:9090
+ endpoint http://localhost:9090
}`,
false,
"",
@@ -62,6 +56,7 @@ func TestKubernetesParse(t *testing.T) {
0,
},
{
+ "template keyword with valid template",
`kubernetes coredns.local {
template {service}.{namespace}.{zone}
}`,
@@ -72,6 +67,7 @@ func TestKubernetesParse(t *testing.T) {
0,
},
{
+ "namespaces keyword with one namespace",
`kubernetes coredns.local {
namespaces demo
}`,
@@ -82,6 +78,7 @@ func TestKubernetesParse(t *testing.T) {
1,
},
{
+ "namespaces keyword with multiple namespaces",
`kubernetes coredns.local {
namespaces demo test
}`,
@@ -91,9 +88,40 @@ func TestKubernetesParse(t *testing.T) {
true,
2,
},
-
+ {
+ "fully specified valid config",
+ `kubernetes coredns.local test.local {
+ endpoint http://localhost:8080
+ template {service}.{namespace}.{zone}
+ namespaces demo test
+}`,
+ false,
+ "",
+ 2,
+ true,
+ 2,
+ },
// negative
{
+ "no kubernetes keyword",
+ "",
+ true,
+ "Kubernetes setup called without keyword 'kubernetes' in Corefile",
+ -1,
+ false,
+ -1,
+ },
+ {
+ "kubernetes keyword without a zone",
+ `kubernetes`,
+ true,
+ "Zone name must be provided for kubernetes middleware",
+ -1,
+ true,
+ 0,
+ },
+ {
+ "endpoint keyword without an endpoint value",
`kubernetes coredns.local {
endpoint
}`,
@@ -103,56 +131,56 @@ func TestKubernetesParse(t *testing.T) {
true,
-1,
},
- // No template provided for template line.
{
+ "template keyword without a template value",
`kubernetes coredns.local {
template
}`,
true,
- "",
+ "Wrong argument count or unexpected line ending after 'template'",
-1,
false,
- -1,
+ 0,
},
- // Invalid template provided
{
+ "template keyword with an invalid template value",
`kubernetes coredns.local {
template {namespace}.{zone}
}`,
true,
- "",
+ "Record name template does not pass NameTemplate validation",
-1,
false,
+ 0,
+ },
+ {
+ "namespace keyword without a namespace value",
+ `kubernetes coredns.local {
+ namespaces
+}`,
+ true,
+ "Parse error: Wrong argument count or unexpected line ending after 'namespaces'",
+ -1,
+ true,
-1,
},
- /*
- // No valid provided for namespaces
- {
- `kubernetes coredns.local {
- namespaces
- }`,
- true,
- "",
- -1,
- true,
- -1,
- },
- */
}
+ t.Logf("Parser test cases count: %v", len(tests))
for i, test := range tests {
c := NewTestController(test.input)
k8sController, err := kubernetesParse(c)
- t.Logf("i: %v\n", i)
- t.Logf("controller: %v\n", k8sController)
+ t.Logf("setup test: %2v -- %v\n", i, test.description)
+ //t.Logf("controller: %v\n", k8sController)
if test.shouldErr && err == nil {
- t.Errorf("Test %d: Expected error, but found one for input '%s'. Error was: '%v'", i, test.input, err)
+ t.Errorf("Test %d: Expected error, but did not find error for input '%s'. Error was: '%v'", i, test.input, err)
}
if err != nil {
if !test.shouldErr {
t.Errorf("Test %d: Expected no error but found one for input %s. Error was: %v", i, test.input, err)
+ continue
}
if test.shouldErr && (len(test.expectedErrContent) < 1) {
@@ -160,14 +188,13 @@ func TestKubernetesParse(t *testing.T) {
}
if test.shouldErr && (test.expectedZoneCount >= 0) {
- t.Fatalf("Test %d: Test marked as expecting an error, but provides value for expectedZoneCount!=-1 for input '%s'. Error was: '%v'", i, test.input, err)
+ t.Errorf("Test %d: Test marked as expecting an error, but provides value for expectedZoneCount!=-1 for input '%s'. Error was: '%v'", i, test.input, err)
}
if !strings.Contains(err.Error(), test.expectedErrContent) {
t.Errorf("Test %d: Expected error to contain: %v, found error: %v, input: %s", i, test.expectedErrContent, err, test.input)
}
-
- return
+ continue
}
// No error was raised, so validate initialization of k8sController
@@ -191,7 +218,8 @@ func TestKubernetesParse(t *testing.T) {
foundNSCount := len(k8sController.Namespaces)
if foundNSCount != test.expectedNSCount {
t.Errorf("Test %d: Expected kubernetes controller to be initialized with %d namespaces. Instead found %d namespaces: '%v' for input '%s'", i, test.expectedNSCount, foundNSCount, k8sController.Namespaces, test.input)
+ t.Logf("k8sController is: %v", k8sController)
+ t.Logf("k8sController.Namespaces is: %v", k8sController.Namespaces)
}
-
}
}
diff --git a/kubernetes-rc.yaml b/kubernetes-rc.yaml
new file mode 100644
index 000000000..87d29418c
--- /dev/null
+++ b/kubernetes-rc.yaml
@@ -0,0 +1,70 @@
+apiVersion: v1
+kind: ConfigMap
+metadata:
+ name: coredns-configmap
+ namespace: kube-system
+data:
+ corefile: |
+ .:53 {
+ kubernetes coredns.local {
+ }
+ #cache 160 coredns.local
+ errors stdout
+ log stdout
+ }
+---
+apiVersion: v1
+kind: ReplicationController
+metadata:
+ labels:
+ k8s-app: kube-dns
+ kubernetes.io/cluster-service: "true"
+ version: v20
+ name: kube-dns-v20
+ namespace: kube-system
+spec:
+ replicas: 1
+ selector:
+ k8s-app: kube-dns
+ version: v20
+ template:
+ metadata:
+ labels:
+ k8s-app: kube-dns
+ kubernetes.io/cluster-service: "true"
+ version: v20
+ spec:
+ containers:
+ - args:
+ - -conf=/cfg/corefile
+ image: aledbf/kube-coredns:0.6
+ imagePullPolicy: IfNotPresent
+ name: kube-dns
+ ports:
+ - containerPort: 53
+ name: dns
+ protocol: UDP
+ - containerPort: 53
+ name: dns-tcp
+ protocol: TCP
+ volumeMounts:
+ - name: config-volume
+ mountPath: /cfg
+ - args:
+ - -cmd=nslookup kubernetes.default.svc.cluster.local localhost >/dev/null
+ - -port=8080
+ image: gcr.io/google_containers/exechealthz:1.0
+ imagePullPolicy: IfNotPresent
+ name: healthz
+ ports:
+ - containerPort: 8080
+ protocol: TCP
+ resources:
+ limits:
+ cpu: 10m
+ memory: 20Mi
+ dnsPolicy: Default
+ volumes:
+ - name: config-volume
+ configMap:
+ name: coredns-configmap
diff --git a/middleware/kubernetes/README.md b/middleware/kubernetes/README.md
index 66cee605e..6d8006404 100644
--- a/middleware/kubernetes/README.md
+++ b/middleware/kubernetes/README.md
@@ -276,11 +276,13 @@ TBD:
* Update kubernetes middleware documentation to describe running CoreDNS as a
SkyDNS replacement. (Include descriptions of different ways to pass CoreFile
to coredns command.)
+ * Remove dependency on healthz for health checking in
+ `kubernetes-rc.yaml` file.
* Expose load-balancer IP addresses.
* Calculate SRV priority based on number of instances running.
(See SkyDNS README.md)
* Functional work
- * (done) ~~Implement wildcard-based lookup. Minimally support `*`, consider `?` as well.~~
+ * (done. '?' not supported yet) ~~Implement wildcard-based lookup. Minimally support `*`, consider `?` as well.~~
* (done) ~~Note from Miek on PR 181: "SkyDNS also supports the word `any`.~~
* Implement SkyDNS-style synthetic zones such as "svc" to group k8s objects. (This
should be optional behavior.) Also look at "pod" synthetic zones.
@@ -303,17 +305,14 @@ TBD:
* Performance
* Improve lookup to reduce size of query result obtained from k8s API.
(namespace-based?, other ideas?)
- * Caching of k8s API dataset.
+ * Caching/notification of k8s API dataset. (See aledbf fork for
+ implementation ideas.)
* DNS response caching is good, but we should also cache at the http query
level as well. (Take a look at https://github.com/patrickmn/go-cache as
a potential expiring cache implementation for the http API queries.)
- * Push notifications from k8s for data changes rather than pull via API?
* Additional features:
- * Implement namespace filtering to different zones. That is, zone "a.b"
- publishes services from namespace "foo", and zone "x.y" publishes services
- from namespaces "bar" and "baz". (Basic version implemented -- need test cases.)
* Reverse IN-ADDR entries for services. (Is there any value in supporting
- reverse lookup records?
+ reverse lookup records?)
* How to support label specification in Corefile to allow use of labels to
indicate zone? (Is this even useful?) For example, the following
configuration exposes all services labeled for the "staging" environment
@@ -334,11 +333,14 @@ TBD:
flattening to lower case and mapping of non-DNS characters to DNS characters
in a standard way.)
* Expose arbitrary kubernetes repository data as TXT records?
- * Support custom user-provided templates for k8s names. A string provided
+ * (done) ~~Support custom user-provided templates for k8s names. A string provided
in the middleware configuration like `{service}.{namespace}.{type}` defines
the template of how to construct record names for the zone. This example
would produce `myservice.mynamespace.svc.cluster.local`. (Basic template
- implemented. Need to slice zone out of current template implementation.)
+ implemented. Need to slice zone out of current template implementation.)~~
+ * (done) ~~Implement namespace filtering to different zones. That is, zone "a.b"
+ publishes services from namespace "foo", and zone "x.y" publishes services
+ from namespaces "bar" and "baz". (Basic version implemented -- need test cases.)~~
* DNS Correctness
* Do we need to generate synthetic zone records for namespaces?
* Do we need to generate synthetic zone records for the skydns synthetic zones?
@@ -347,10 +349,10 @@ TBD:
using the `cache` directive. Tested working using 20s cache timeout
and A-record queries. Automate testing with cache in place.
* Automate CoreDNS performance tests. Initially for zone files, and for
- pre-loaded k8s API cache.
+ pre-loaded k8s API cache. With and without CoreDNS response caching.
* Try to get rid of kubernetes launch scripts by moving operations into
.travis.yml file.
* ~~Implement test cases for http data parsing using dependency injection
for http get operations.~~
- * ~~Automate integration testing with kubernetes. (k8s launch and service start-up
- automation is in middleware/kubernetes/tests)~~
+ * ~~Automate integration testing with kubernetes. (k8s launch and service
+ start-up automation is in middleware/kubernetes/tests)~~
diff --git a/middleware/kubernetes/controller.go b/middleware/kubernetes/controller.go
new file mode 100644
index 000000000..6c94bdae1
--- /dev/null
+++ b/middleware/kubernetes/controller.go
@@ -0,0 +1,195 @@
+package kubernetes
+
+import (
+ "fmt"
+ "log"
+ "sync"
+ "time"
+
+ "github.com/miekg/coredns/middleware/kubernetes/util"
+
+ "k8s.io/kubernetes/pkg/api"
+ "k8s.io/kubernetes/pkg/client/cache"
+ client "k8s.io/kubernetes/pkg/client/unversioned"
+ "k8s.io/kubernetes/pkg/controller/framework"
+ "k8s.io/kubernetes/pkg/runtime"
+ "k8s.io/kubernetes/pkg/watch"
+)
+
+var (
+ namespace = api.NamespaceAll
+)
+
+type dnsController struct {
+ client *client.Client
+
+ endpController *framework.Controller
+ svcController *framework.Controller
+ nsController *framework.Controller
+
+ svcLister cache.StoreToServiceLister
+ endpLister cache.StoreToEndpointsLister
+ nsLister util.StoreToNamespaceLister
+
+ // stopLock is used to enforce only a single call to Stop is active.
+ // Needed because we allow stopping through an http endpoint and
+ // allowing concurrent stoppers leads to stack traces.
+ stopLock sync.Mutex
+ shutdown bool
+ stopCh chan struct{}
+}
+
+// newDNSController creates a controller for coredns
+func newdnsController(kubeClient *client.Client, resyncPeriod time.Duration) *dnsController {
+ dns := dnsController{
+ client: kubeClient,
+ stopCh: make(chan struct{}),
+ }
+
+ dns.endpLister.Store, dns.endpController = framework.NewInformer(
+ &cache.ListWatch{
+ ListFunc: endpointsListFunc(dns.client, namespace),
+ WatchFunc: endpointsWatchFunc(dns.client, namespace),
+ },
+ &api.Endpoints{}, resyncPeriod, framework.ResourceEventHandlerFuncs{})
+
+ dns.svcLister.Store, dns.svcController = framework.NewInformer(
+ &cache.ListWatch{
+ ListFunc: serviceListFunc(dns.client, namespace),
+ WatchFunc: serviceWatchFunc(dns.client, namespace),
+ },
+ &api.Service{}, resyncPeriod, framework.ResourceEventHandlerFuncs{})
+
+ dns.nsLister.Store, dns.nsController = framework.NewInformer(
+ &cache.ListWatch{
+ ListFunc: namespaceListFunc(dns.client),
+ WatchFunc: namespaceWatchFunc(dns.client),
+ },
+ &api.Namespace{}, resyncPeriod, framework.ResourceEventHandlerFuncs{})
+
+ return &dns
+}
+
+func serviceListFunc(c *client.Client, ns string) func(api.ListOptions) (runtime.Object, error) {
+ return func(opts api.ListOptions) (runtime.Object, error) {
+ return c.Services(ns).List(opts)
+ }
+}
+
+func serviceWatchFunc(c *client.Client, ns string) func(options api.ListOptions) (watch.Interface, error) {
+ return func(options api.ListOptions) (watch.Interface, error) {
+ return c.Services(ns).Watch(options)
+ }
+}
+
+func endpointsListFunc(c *client.Client, ns string) func(api.ListOptions) (runtime.Object, error) {
+ return func(opts api.ListOptions) (runtime.Object, error) {
+ return c.Endpoints(ns).List(opts)
+ }
+}
+
+func endpointsWatchFunc(c *client.Client, ns string) func(options api.ListOptions) (watch.Interface, error) {
+ return func(options api.ListOptions) (watch.Interface, error) {
+ return c.Endpoints(ns).Watch(options)
+ }
+}
+
+func namespaceListFunc(c *client.Client) func(api.ListOptions) (runtime.Object, error) {
+ return func(opts api.ListOptions) (runtime.Object, error) {
+ return c.Namespaces().List(opts)
+ }
+}
+
+func namespaceWatchFunc(c *client.Client) func(options api.ListOptions) (watch.Interface, error) {
+ return func(options api.ListOptions) (watch.Interface, error) {
+ return c.Namespaces().Watch(options)
+ }
+}
+
+func (dns *dnsController) controllersInSync() bool {
+ return dns.svcController.HasSynced() && dns.endpController.HasSynced()
+}
+
+// Stop stops the controller.
+func (dns *dnsController) Stop() error {
+ dns.stopLock.Lock()
+ defer dns.stopLock.Unlock()
+
+ // Only try draining the workqueue if we haven't already.
+ if !dns.shutdown {
+ close(dns.stopCh)
+ log.Println("shutting down controller queues")
+ dns.shutdown = true
+
+ return nil
+ }
+
+ return fmt.Errorf("shutdown already in progress")
+}
+
+// Run starts the controller.
+func (dns *dnsController) Run() {
+ log.Println("[debug] starting coredns controller")
+
+ go dns.endpController.Run(dns.stopCh)
+ go dns.svcController.Run(dns.stopCh)
+ go dns.nsController.Run(dns.stopCh)
+
+ <-dns.stopCh
+ log.Println("[debug] shutting down coredns controller")
+}
+
+func (dns *dnsController) GetNamespaceList() *api.NamespaceList {
+ nsList, err := dns.nsLister.List()
+ if err != nil {
+ return &api.NamespaceList{}
+ }
+
+ return &nsList
+}
+
+func (dns *dnsController) GetServiceList() *api.ServiceList {
+ log.Printf("[debug] here in GetServiceList")
+ svcList, err := dns.svcLister.List()
+ if err != nil {
+ return &api.ServiceList{}
+ }
+
+ return &svcList
+}
+
+// GetServicesByNamespace returns a map of
+// namespacename :: [ kubernetesService ]
+func (dns *dnsController) GetServicesByNamespace() map[string][]api.Service {
+ k8sServiceList := dns.GetServiceList()
+ if k8sServiceList == nil {
+ return nil
+ }
+
+ items := make(map[string][]api.Service, len(k8sServiceList.Items))
+ for _, i := range k8sServiceList.Items {
+ namespace := i.Namespace
+ items[namespace] = append(items[namespace], i)
+ }
+
+ return items
+}
+
+// GetServiceInNamespace returns the Service that matches
+// servicename in the namespace
+func (dns *dnsController) GetServiceInNamespace(namespace string, servicename string) *api.Service {
+ svcKey := fmt.Sprintf("%v/%v", namespace, servicename)
+ svcObj, svcExists, err := dns.svcLister.Store.GetByKey(svcKey)
+
+ if err != nil {
+ log.Printf("error getting service %v from the cache: %v\n", svcKey, err)
+ return nil
+ }
+
+ if !svcExists {
+ log.Printf("service %v does not exists\n", svcKey)
+ return nil
+ }
+
+ return svcObj.(*api.Service)
+}
diff --git a/middleware/kubernetes/handler.go b/middleware/kubernetes/handler.go
index a78873426..05dfba934 100644
--- a/middleware/kubernetes/handler.go
+++ b/middleware/kubernetes/handler.go
@@ -3,6 +3,7 @@ package kubernetes
import (
"fmt"
"log"
+ "strings"
"github.com/miekg/coredns/middleware"
@@ -18,6 +19,26 @@ func (k Kubernetes) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.M
return dns.RcodeServerFailure, fmt.Errorf("can only deal with ClassINET")
}
+ m := new(dns.Msg)
+ m.SetReply(r)
+ m.Authoritative, m.RecursionAvailable, m.Compress = true, true, true
+
+ // TODO: find an alternative to this block
+ if strings.HasSuffix(state.Name(), arpaSuffix) {
+ ip, _ := extractIP(state.Name())
+ records := k.getServiceRecordForIP(ip, state.Name())
+ if len(records) > 0 {
+ srvPTR := &records[0]
+ m.Answer = append(m.Answer, srvPTR.NewPTR(state.QName(), ip))
+
+ m = dedup(m)
+ state.SizeAndDo(m)
+ m, _ = state.Scrub(m)
+ w.WriteMsg(m)
+ return dns.RcodeSuccess, nil
+ }
+ }
+
// Check that query matches one of the zones served by this middleware,
// otherwise delegate to the next in the pipeline.
zone := middleware.Zones(k.Zones).Matches(state.Name())
@@ -28,10 +49,6 @@ func (k Kubernetes) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.M
return k.Next.ServeDNS(ctx, w, r)
}
- m := new(dns.Msg)
- m.SetReply(r)
- m.Authoritative, m.RecursionAvailable, m.Compress = true, true, true
-
var (
records, extra []dns.RR
err error
diff --git a/middleware/kubernetes/kubernetes.go b/middleware/kubernetes/kubernetes.go
index 6288cb394..a8d33bb7e 100644
--- a/middleware/kubernetes/kubernetes.go
+++ b/middleware/kubernetes/kubernetes.go
@@ -4,33 +4,70 @@ package kubernetes
import (
"errors"
"log"
+ "strings"
"time"
"github.com/miekg/coredns/middleware"
- k8sc "github.com/miekg/coredns/middleware/kubernetes/k8sclient"
"github.com/miekg/coredns/middleware/kubernetes/msg"
"github.com/miekg/coredns/middleware/kubernetes/nametemplate"
"github.com/miekg/coredns/middleware/kubernetes/util"
"github.com/miekg/coredns/middleware/proxy"
"github.com/miekg/dns"
+ "k8s.io/kubernetes/pkg/api"
+ "k8s.io/kubernetes/pkg/client/unversioned"
+ "k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
+ clientcmdapi "k8s.io/kubernetes/pkg/client/unversioned/clientcmd/api"
+)
+
+const (
+ defaultResyncPeriod = 5 * time.Minute
)
type Kubernetes struct {
Next middleware.Handler
Zones []string
Proxy proxy.Proxy // Proxy for looking up names during the resolution process
- APIConn *k8sc.K8sConnector
+ APIEndpoint string
+ APIConn *dnsController
+ ResyncPeriod time.Duration
NameTemplate *nametemplate.NameTemplate
Namespaces []string
}
+func (g *Kubernetes) StartKubeCache() error {
+ // For a custom api server or running outside a k8s cluster
+ // set URL in env.KUBERNETES_MASTER
+ loadingRules := clientcmd.NewDefaultClientConfigLoadingRules()
+ overrides := &clientcmd.ConfigOverrides{}
+ if len(g.APIEndpoint) > 0 {
+ overrides.ClusterInfo = clientcmdapi.Cluster{Server: g.APIEndpoint}
+ }
+ clientConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loadingRules, overrides)
+ config, err := clientConfig.ClientConfig()
+ if err != nil {
+ log.Printf("[debug] error connecting to the client: %v", err)
+ return err
+ }
+ kubeClient, err := unversioned.New(config)
+
+ if err != nil {
+ log.Printf("[ERROR] Failed to create kubernetes notification controller: %v", err)
+ return err
+ }
+ g.APIConn = newdnsController(kubeClient, g.ResyncPeriod)
+
+ go g.APIConn.Run()
+
+ return err
+}
+
// getZoneForName returns the zone string that matches the name and a
// list of the DNS labels from name that are within the zone.
// For example, if "coredns.local" is a zone configured for the
// Kubernetes middleware, then getZoneForName("a.b.coredns.local")
// will return ("coredns.local", ["a", "b"]).
-func (g Kubernetes) getZoneForName(name string) (string, []string) {
+func (g *Kubernetes) getZoneForName(name string) (string, []string) {
var zone string
var serviceSegments []string
@@ -51,7 +88,14 @@ func (g Kubernetes) getZoneForName(name string) (string, []string) {
// If exact is true, it will lookup just
// this name. This is used when find matches when completing SRV lookups
// for instance.
-func (g Kubernetes) Records(name string, exact bool) ([]msg.Service, error) {
+func (g *Kubernetes) Records(name string, exact bool) ([]msg.Service, error) {
+ // TODO: refector this.
+ // Right now GetNamespaceFromSegmentArray do not supports PRE queries
+ if strings.HasSuffix(name, arpaSuffix) {
+ ip, _ := extractIP(name)
+ records := g.getServiceRecordForIP(ip, name)
+ return records, nil
+ }
var (
serviceName string
namespace string
@@ -99,6 +143,7 @@ func (g Kubernetes) Records(name string, exact bool) ([]msg.Service, error) {
return nil, nil
}
+ log.Printf("before g.Get(namespace, nsWildcard, serviceName, serviceWildcard): %v %v %v %v", namespace, nsWildcard, serviceName, serviceWildcard)
k8sItems, err := g.Get(namespace, nsWildcard, serviceName, serviceWildcard)
log.Printf("[debug] k8s items: %v\n", k8sItems)
if err != nil {
@@ -115,7 +160,7 @@ func (g Kubernetes) Records(name string, exact bool) ([]msg.Service, error) {
}
// TODO: assemble name from parts found in k8s data based on name template rather than reusing query string
-func (g Kubernetes) getRecordsForServiceItems(serviceItems []k8sc.ServiceItem, values nametemplate.NameValues) []msg.Service {
+func (g *Kubernetes) getRecordsForServiceItems(serviceItems []api.Service, values nametemplate.NameValues) []msg.Service {
var records []msg.Service
for _, item := range serviceItems {
@@ -131,7 +176,7 @@ func (g Kubernetes) getRecordsForServiceItems(serviceItems []k8sc.ServiceItem, v
// Create records for each exposed port...
for _, p := range item.Spec.Ports {
log.Printf("[debug] port: %v\n", p.Port)
- s := msg.Service{Host: clusterIP, Port: p.Port}
+ s := msg.Service{Host: clusterIP, Port: int(p.Port)}
records = append(records, s)
}
}
@@ -141,22 +186,24 @@ func (g Kubernetes) getRecordsForServiceItems(serviceItems []k8sc.ServiceItem, v
}
// Get performs the call to the Kubernetes http API.
-func (g Kubernetes) Get(namespace string, nsWildcard bool, servicename string, serviceWildcard bool) ([]k8sc.ServiceItem, error) {
- serviceList, err := g.APIConn.GetServiceList()
+func (g *Kubernetes) Get(namespace string, nsWildcard bool, servicename string, serviceWildcard bool) ([]api.Service, error) {
+ serviceList := g.APIConn.GetServiceList()
+ /* TODO: Remove?
if err != nil {
log.Printf("[ERROR] Getting service list produced error: %v", err)
return nil, err
}
+ */
- var resultItems []k8sc.ServiceItem
+ var resultItems []api.Service
for _, item := range serviceList.Items {
- if symbolMatches(namespace, item.Metadata.Namespace, nsWildcard) && symbolMatches(servicename, item.Metadata.Name, serviceWildcard) {
+ if symbolMatches(namespace, item.Namespace, nsWildcard) && symbolMatches(servicename, item.Name, serviceWildcard) {
// If namespace has a wildcard, filter results against Corefile namespace list.
// (Namespaces without a wildcard were filtered before the call to this function.)
- if nsWildcard && (len(g.Namespaces) > 0) && (!util.StringInSlice(item.Metadata.Namespace, g.Namespaces)) {
- log.Printf("[debug] Namespace '%v' is not published by Corefile\n", item.Metadata.Namespace)
+ if nsWildcard && (len(g.Namespaces) > 0) && (!util.StringInSlice(item.Namespace, g.Namespaces)) {
+ log.Printf("[debug] Namespace '%v' is not published by Corefile\n", item.Namespace)
continue
}
resultItems = append(resultItems, item)
@@ -179,102 +226,24 @@ func symbolMatches(queryString string, candidateString string, wildcard bool) bo
return result
}
-// TODO: Remove these unused functions. One is related to Ttl calculation
-// Implement Ttl and priority calculation based on service count before
-// removing this code.
-/*
-// splitDNSName separates the name into DNS segments and reverses the segments.
-func (g Kubernetes) splitDNSName(name string) []string {
- l := dns.SplitDomainName(name)
-
- for i, j := 0, len(l)-1; i < j; i, j = i+1, j-1 {
- l[i], l[j] = l[j], l[i]
- }
-
- return l
+// kubernetesNameError checks if the error is ErrorCodeKeyNotFound from kubernetes.
+func isKubernetesNameError(err error) bool {
+ return false
}
-*/
-// skydns/local/skydns/east/staging/web
-// skydns/local/skydns/west/production/web
-//
-// skydns/local/skydns/*/*/web
-// skydns/local/skydns/*/web
-/*
-// loopNodes recursively loops through the nodes and returns all the values. The nodes' keyname
-// will be match against any wildcards when star is true.
-func (g Kubernetes) loopNodes(ns []*etcdc.Node, nameParts []string, star bool, bx map[msg.Service]bool) (sx []msg.Service, err error) {
- if bx == nil {
- bx = make(map[msg.Service]bool)
- }
-Nodes:
- for _, n := range ns {
- if n.Dir {
- nodes, err := g.loopNodes(n.Nodes, nameParts, star, bx)
- if err != nil {
- return nil, err
- }
- sx = append(sx, nodes...)
- continue
- }
- if star {
- keyParts := strings.Split(n.Key, "/")
- for i, n := range nameParts {
- if i > len(keyParts)-1 {
- // name is longer than key
- continue Nodes
- }
- if n == "*" || n == "any" {
- continue
- }
- if keyParts[i] != n {
- continue Nodes
- }
- }
- }
- serv := new(msg.Service)
- if err := json.Unmarshal([]byte(n.Value), serv); err != nil {
- return nil, err
- }
- b := msg.Service{Host: serv.Host, Port: serv.Port, Priority: serv.Priority, Weight: serv.Weight, Text: serv.Text, Key: n.Key}
- if _, ok := bx[b]; ok {
- continue
- }
- bx[b] = true
- serv.Key = n.Key
- serv.Ttl = g.Ttl(n, serv)
- if serv.Priority == 0 {
- serv.Priority = priority
- }
- sx = append(sx, *serv)
+func (g *Kubernetes) getServiceRecordForIP(ip, name string) []msg.Service {
+ svcList, err := g.APIConn.svcLister.List()
+ if err != nil {
+ return nil
}
- return sx, nil
-}
-// Ttl returns the smaller of the kubernetes TTL and the service's
-// TTL. If neither of these are set (have a zero value), a default is used.
-func (g Kubernetes) Ttl(node *etcdc.Node, serv *msg.Service) uint32 {
- kubernetesTtl := uint32(node.TTL)
-
- if kubernetesTtl == 0 && serv.Ttl == 0 {
- return ttl
- }
- if kubernetesTtl == 0 {
- return serv.Ttl
- }
- if serv.Ttl == 0 {
- return kubernetesTtl
- }
- if kubernetesTtl < serv.Ttl {
- return kubernetesTtl
+ for _, service := range svcList.Items {
+ if service.Spec.ClusterIP == ip {
+ return []msg.Service{msg.Service{Host: ip}}
+ }
}
- return serv.Ttl
-}
-*/
-// kubernetesNameError checks if the error is ErrorCodeKeyNotFound from kubernetes.
-func isKubernetesNameError(err error) bool {
- return false
+ return nil
}
const (
diff --git a/middleware/kubernetes/lookup.go b/middleware/kubernetes/lookup.go
index b490d6a4b..0096e1fdb 100644
--- a/middleware/kubernetes/lookup.go
+++ b/middleware/kubernetes/lookup.go
@@ -4,6 +4,7 @@ import (
"fmt"
"math"
"net"
+ "strings"
"time"
"github.com/miekg/coredns/middleware"
@@ -12,6 +13,11 @@ import (
"github.com/miekg/dns"
)
+const (
+ // arpaSuffix is the standard suffix for PTR IP reverse lookups.
+ arpaSuffix = ".in-addr.arpa."
+)
+
func (k Kubernetes) records(state middleware.State, exact bool) ([]msg.Service, error) {
services, err := k.Records(state.Name(), exact)
if err != nil {
@@ -64,13 +70,13 @@ func (k Kubernetes) A(zone string, state middleware.State, previousRecords []dns
// We should already have found it
continue
}
- m1, e1 := k.Proxy.Lookup(state, target, state.QType())
- if e1 != nil {
+ mes, err := k.Proxy.Lookup(state, target, state.QType())
+ if err != nil {
continue
}
- // Len(m1.Answer) > 0 here is well?
+ // Len(mes.Answer) > 0 here is well?
records = append(records, newRecord)
- records = append(records, m1.Answer...)
+ records = append(records, mes.Answer...)
continue
case ip.To4() != nil:
records = append(records, serv.NewA(state.QName(), ip.To4()))
@@ -285,7 +291,33 @@ func (k Kubernetes) SOA(zone string, state middleware.State) *dns.SOA {
}
}
-// TODO(miek): DNSKEY and friends... intercepted by the DNSSEC middleware?
+func (k Kubernetes) PTR(zone string, state middleware.State) ([]dns.RR, error) {
+ reverseIP, ok := extractIP(state.Name())
+ if !ok {
+ return nil, fmt.Errorf("does not support reverse lookup for %s", state.QName())
+ }
+
+ records := make([]dns.RR, 1)
+ services, err := k.records(state, false)
+ if err != nil {
+ return nil, err
+ }
+
+ for _, serv := range services {
+ ip := net.ParseIP(serv.Host)
+ if reverseIP != serv.Host {
+ continue
+ }
+ switch {
+ case ip.To4() != nil:
+ records = append(records, serv.NewPTR(state.QName(), ip.To4().String()))
+ break
+ case ip.To4() == nil:
+ // nodata?
+ }
+ }
+ return records, nil
+}
func isDuplicateCNAME(r *dns.CNAME, records []dns.RR) bool {
for _, rec := range records {
@@ -300,6 +332,27 @@ func isDuplicateCNAME(r *dns.CNAME, records []dns.RR) bool {
func copyState(state middleware.State, target string, typ uint16) middleware.State {
state1 := middleware.State{W: state.W, Req: state.Req.Copy()}
- state1.Req.Question[0] = dns.Question{dns.Fqdn(target), dns.ClassINET, typ}
+ state1.Req.Question[0] = dns.Question{Name: dns.Fqdn(target), Qtype: dns.ClassINET, Qclass: typ}
return state1
}
+
+// extractIP turns a standard PTR reverse record lookup name
+// into an IP address
+func extractIP(reverseName string) (string, bool) {
+ if !strings.HasSuffix(reverseName, arpaSuffix) {
+ return "", false
+ }
+ search := strings.TrimSuffix(reverseName, arpaSuffix)
+
+ // reverse the segments and then combine them
+ segments := reverseArray(strings.Split(search, "."))
+ return strings.Join(segments, "."), true
+}
+
+func reverseArray(arr []string) []string {
+ for i := 0; i < len(arr)/2; i++ {
+ j := len(arr) - i - 1
+ arr[i], arr[j] = arr[j], arr[i]
+ }
+ return arr
+}
diff --git a/middleware/kubernetes/msg/service.go b/middleware/kubernetes/msg/service.go
index 588e7b33c..24af6b4fd 100644
--- a/middleware/kubernetes/msg/service.go
+++ b/middleware/kubernetes/msg/service.go
@@ -77,6 +77,11 @@ func (s *Service) NewNS(name string) *dns.NS {
return &dns.NS{Hdr: dns.RR_Header{Name: name, Rrtype: dns.TypeNS, Class: dns.ClassINET, Ttl: s.Ttl}, Ns: host}
}
+// NewPTR returns a new PTR record based on the Service.
+func (s *Service) NewPTR(name string, target string) *dns.PTR {
+ return &dns.PTR{Hdr: dns.RR_Header{Name: name, Rrtype: dns.TypePTR, Class: dns.ClassINET, Ttl: s.Ttl}, Ptr: dns.Fqdn(target)}
+}
+
// Group checks the services in sx, it looks for a Group attribute on the shortest
// keys. If there are multiple shortest keys *and* the group attribute disagrees (and
// is not empty), we don't consider it a group.
diff --git a/middleware/kubernetes/test/20_setup_k8s_services.sh b/middleware/kubernetes/test/20_setup_k8s_services.sh
index d5c221a84..1eb993543 100755
--- a/middleware/kubernetes/test/20_setup_k8s_services.sh
+++ b/middleware/kubernetes/test/20_setup_k8s_services.sh
@@ -69,7 +69,7 @@ run_and_expose_service() {
wait_until_k8s_ready
-NAMESPACES="demo test"
+NAMESPACES="demo poddemo test"
create_namespaces
echo ""
diff --git a/middleware/kubernetes/util/util.go b/middleware/kubernetes/util/util.go
index 259eaf596..89cc2b592 100644
--- a/middleware/kubernetes/util/util.go
+++ b/middleware/kubernetes/util/util.go
@@ -3,6 +3,9 @@ package util
import (
"strings"
+
+ "k8s.io/kubernetes/pkg/api"
+ "k8s.io/kubernetes/pkg/client/cache"
)
// StringInSlice check whether string a is a member of slice.
@@ -24,3 +27,16 @@ const (
WildcardStar = "*"
WildcardAny = "any"
)
+
+// StoreToNamespaceLister makes a Store that lists Namespaces.
+type StoreToNamespaceLister struct {
+ cache.Store
+}
+
+// List lists all Namespaces in the store.
+func (s *StoreToNamespaceLister) List() (ns api.NamespaceList, err error) {
+ for _, m := range s.Store.List() {
+ ns.Items = append(ns.Items, *(m.(*api.Namespace)))
+ }
+ return ns, nil
+}