aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.travis/kubernetes/dns-test.yaml34
-rw-r--r--middleware/kubernetes/controller.go51
-rw-r--r--middleware/kubernetes/kubernetes.go36
-rw-r--r--test/kubernetes_test.go21
4 files changed, 136 insertions, 6 deletions
diff --git a/.travis/kubernetes/dns-test.yaml b/.travis/kubernetes/dns-test.yaml
index 91140830f..1f5587ca3 100644
--- a/.travis/kubernetes/dns-test.yaml
+++ b/.travis/kubernetes/dns-test.yaml
@@ -91,6 +91,26 @@ spec:
name: c-port
protocol: UDP
---
+apiVersion: extensions/v1beta1
+kind: Deployment
+metadata:
+ name: de-d1
+ namespace: test-1
+spec:
+ replicas: 2
+ template:
+ metadata:
+ labels:
+ app: app-d
+ spec:
+ containers:
+ - name: app-d-c
+ image: gcr.io/google_containers/pause-amd64:3.0
+ ports:
+ - containerPort: 1234
+ name: c-port
+ protocol: UDP
+---
apiVersion: v1
kind: Service
metadata:
@@ -149,3 +169,17 @@ spec:
- name: c-port
port: 1234
protocol: UDP
+---
+apiVersion: v1
+kind: Service
+metadata:
+ name: headless-svc
+ namespace: test-1
+spec:
+ selector:
+ app: app-d
+ clusterIP: None
+ ports:
+ - name: c-port
+ port: 1234
+ protocol: UDP
diff --git a/middleware/kubernetes/controller.go b/middleware/kubernetes/controller.go
index e387b17fd..150cc843f 100644
--- a/middleware/kubernetes/controller.go
+++ b/middleware/kubernetes/controller.go
@@ -39,9 +39,11 @@ type dnsController struct {
svcController *cache.Controller
nsController *cache.Controller
+ epController *cache.Controller
svcLister cache.StoreToServiceLister
nsLister storeToNamespaceLister
+ epLister cache.StoreToEndpointsLister
// stopLock is used to enforce only a single call to Stop is active.
// Needed because we allow stopping through an http endpoint and
@@ -76,6 +78,13 @@ func newdnsController(kubeClient *kubernetes.Clientset, resyncPeriod time.Durati
},
&api.Namespace{}, resyncPeriod, cache.ResourceEventHandlerFuncs{})
+ dns.epLister.Store, dns.epController = cache.NewInformer(
+ &cache.ListWatch{
+ ListFunc: endpointsListFunc(dns.client, namespace, dns.selector),
+ WatchFunc: endpointsWatchFunc(dns.client, namespace, dns.selector),
+ },
+ &api.Endpoints{}, resyncPeriod, cache.ResourceEventHandlerFuncs{})
+
return &dns
}
@@ -85,6 +94,7 @@ func serviceListFunc(c *kubernetes.Clientset, ns string, s *labels.Selector) fun
opts.LabelSelector = *s
}
listV1, err := c.Core().Services(ns).List(opts)
+
if err != nil {
return nil, err
}
@@ -119,6 +129,14 @@ func v1ToAPIFilter(in watch.Event) (out watch.Event, keep bool) {
return in, true
}
return watch.Event{Type: in.Type, Object: &apiObj}, true
+ case *v1.Endpoints:
+ var apiObj api.Endpoints
+ err := v1.Convert_v1_Endpoints_To_api_Endpoints(v1Obj, &apiObj, nil)
+ if err != nil {
+ log.Printf("[ERROR] Could not convert v1.Endpoint: %s", err)
+ return in, true
+ }
+ return watch.Event{Type: in.Type, Object: &apiObj}, true
}
log.Printf("[WARN] Unhandled v1 type in event: %v", in)
@@ -169,6 +187,38 @@ func namespaceWatchFunc(c *kubernetes.Clientset, s *labels.Selector) func(option
}
}
+func endpointsListFunc(c *kubernetes.Clientset, ns string, s *labels.Selector) func(api.ListOptions) (runtime.Object, error) {
+ return func(opts api.ListOptions) (runtime.Object, error) {
+ if s != nil {
+ opts.LabelSelector = *s
+ }
+ listV1, err := c.Core().Endpoints(ns).List(opts)
+
+ if err != nil {
+ return nil, err
+ }
+ var listAPI api.EndpointsList
+ err = v1.Convert_v1_EndpointsList_To_api_EndpointsList(listV1, &listAPI, nil)
+ if err != nil {
+ return nil, err
+ }
+ return &listAPI, err
+ }
+}
+
+func endpointsWatchFunc(c *kubernetes.Clientset, ns string, s *labels.Selector) func(options api.ListOptions) (watch.Interface, error) {
+ return func(options api.ListOptions) (watch.Interface, error) {
+ if s != nil {
+ options.LabelSelector = *s
+ }
+ w, err := c.Core().Endpoints(ns).Watch(options)
+ if err != nil {
+ return nil, err
+ }
+ return watch.Filter(w, v1ToAPIFilter), nil
+ }
+}
+
func (dns *dnsController) controllersInSync() bool {
return dns.svcController.HasSynced()
}
@@ -193,6 +243,7 @@ func (dns *dnsController) Stop() error {
func (dns *dnsController) Run() {
go dns.svcController.Run(dns.stopCh)
go dns.nsController.Run(dns.stopCh)
+ go dns.epController.Run(dns.stopCh)
<-dns.stopCh
}
diff --git a/middleware/kubernetes/kubernetes.go b/middleware/kubernetes/kubernetes.go
index 78c14ee82..5ab11ca95 100644
--- a/middleware/kubernetes/kubernetes.go
+++ b/middleware/kubernetes/kubernetes.go
@@ -55,7 +55,7 @@ func (k *Kubernetes) Services(state request.Request, exact bool, opt middleware.
}
// PrimaryZone will return the first non-reverse zone being handled by this middleware
-func (k *Kubernetes) PrimaryZone() (string) {
+func (k *Kubernetes) PrimaryZone() string {
return k.Zones[k.primaryZone]
}
@@ -228,15 +228,38 @@ func (k *Kubernetes) getRecordsForServiceItems(serviceItems []*api.Service, zone
var records []msg.Service
for _, item := range serviceItems {
- clusterIP := item.Spec.ClusterIP
- // Create records for each exposed port...
key := k.NameTemplate.RecordNameFromNameValues(nametemplate.NameValues{TypeName: "svc", ServiceName: item.ObjectMeta.Name, Namespace: item.ObjectMeta.Namespace, Zone: zone})
+
key = strings.Replace(key, ".", "/", -1)
- for i, p := range item.Spec.Ports {
- s := msg.Service{Key: msg.Path(strconv.Itoa(i)+"."+key, "coredns"), Host: clusterIP, Port: int(p.Port)}
- records = append(records, s)
+ clusterIP := item.Spec.ClusterIP
+ if clusterIP == api.ClusterIPNone {
+ // This is a headless service, create records for each pod
+ epList, _ := k.APIConn.epLister.List()
+ for _, ep := range epList.Items {
+ if ep.ObjectMeta.Name == item.ObjectMeta.Name && ep.ObjectMeta.Namespace == item.ObjectMeta.Namespace {
+ for _, eps := range ep.Subsets {
+ for i, port := range eps.Ports {
+ for j, addr := range eps.Addresses {
+ refid := strconv.Itoa(j*1024 + i)
+ s := msg.Service{
+ Key: msg.Path(strings.ToLower(refid+"._"+port.Name+"._"+string(port.Protocol)+"."+key), "coredns"),
+ Host: addr.IP, Port: int(port.Port),
+ }
+ records = append(records, s)
+ }
+ }
+ }
+ }
+ }
+ } else {
+ // Create records for each exposed port...
+
+ for _, p := range item.Spec.Ports {
+ s := msg.Service{Key: msg.Path(strings.ToLower("_"+p.Name+"._"+string(p.Protocol)+"."+key), "coredns"), Host: clusterIP, Port: int(p.Port)}
+ records = append(records, s)
+ }
}
}
@@ -265,6 +288,7 @@ func (k *Kubernetes) getServices(namespace string, nsWildcard bool, servicename
if nsWildcard && (len(k.Namespaces) > 0) && (!dnsstrings.StringInSlice(item.Namespace, k.Namespaces)) {
continue
}
+
resultItems = append(resultItems, item)
}
}
diff --git a/test/kubernetes_test.go b/test/kubernetes_test.go
index 68ec319cf..215b8191c 100644
--- a/test/kubernetes_test.go
+++ b/test/kubernetes_test.go
@@ -59,6 +59,8 @@ var dnsTestCases = []test.Case{
test.A("svc-1-a.test-1.svc.cluster.local. 303 IN A 10.0.0.100"),
test.A("svc-1-b.test-1.svc.cluster.local. 303 IN A 10.0.0.110"),
test.A("svc-c.test-1.svc.cluster.local. 303 IN A 10.0.0.115"),
+ test.A("headless-svc.test-1.svc.cluster.local. 303 IN A 172.17.0.5"),
+ test.A("headless-svc.test-1.svc.cluster.local. 303 IN A 172.17.0.6"),
},
},
{
@@ -68,6 +70,8 @@ var dnsTestCases = []test.Case{
test.A("svc-1-a.test-1.svc.cluster.local. 303 IN A 10.0.0.100"),
test.A("svc-1-b.test-1.svc.cluster.local. 303 IN A 10.0.0.110"),
test.A("svc-c.test-1.svc.cluster.local. 303 IN A 10.0.0.115"),
+ test.A("headless-svc.test-1.svc.cluster.local. 303 IN A 172.17.0.5"),
+ test.A("headless-svc.test-1.svc.cluster.local. 303 IN A 172.17.0.6"),
},
},
{
@@ -87,6 +91,16 @@ var dnsTestCases = []test.Case{
test.A("svc-1-a.test-1.svc.cluster.local. 303 IN A 10.0.0.100"),
test.A("svc-1-b.test-1.svc.cluster.local. 303 IN A 10.0.0.110"),
test.A("svc-c.test-1.svc.cluster.local. 303 IN A 10.0.0.115"),
+ test.A("headless-svc.test-1.svc.cluster.local. 303 IN A 172.17.0.5"),
+ test.A("headless-svc.test-1.svc.cluster.local. 303 IN A 172.17.0.6"),
+ },
+ },
+ {
+ Qname: "headless-svc.test-1.svc.cluster.local.", Qtype: dns.TypeA,
+ Rcode: dns.RcodeSuccess,
+ Answer: []dns.RR{
+ test.A("headless-svc.test-1.svc.cluster.local. 303 IN A 172.17.0.5"),
+ test.A("headless-svc.test-1.svc.cluster.local. 303 IN A 172.17.0.6"),
},
},
//TODO: Fix below to all use test.SRV not test.A!
@@ -137,6 +151,8 @@ var dnsTestCases = []test.Case{
test.SRV("_https._tcp.svc-1-a.test-1.svc.cluster.local. 303 IN SRV 10 100 443 svc-1-a.test-1.svc.cluster.local."),
test.SRV("_http._tcp.svc-1-b.test-1.svc.cluster.local. 303 IN SRV 10 100 80 svc-1-b.test-1.svc.cluster.local."),
test.SRV("_c-port._udp.svc-c.test-1.svc.cluster.local. 303 IN SRV 10 100 1234 svc-c.test-1.svc.cluster.local."),
+ test.SRV("_c-port._udp.headless-svc.test-1.svc.cluster.local. 303 IN SRV 10 100 1234 headless-svc.test-1.svc.cluster.local."),
+ test.SRV("_c-port._udp.headless-svc.test-1.svc.cluster.local. 303 IN SRV 10 100 1234 headless-svc.test-1.svc.cluster.local."),
},
},
{
@@ -147,6 +163,8 @@ var dnsTestCases = []test.Case{
test.SRV("_https._tcp.svc-1-a.test-1.svc.cluster.local. 303 IN SRV 10 100 443 svc-1-a.test-1.svc.cluster.local."),
test.SRV("_http._tcp.svc-1-b.test-1.svc.cluster.local. 303 IN SRV 10 100 80 svc-1-b.test-1.svc.cluster.local."),
test.SRV("_c-port._udp.svc-c.test-1.svc.cluster.local. 303 IN SRV 10 100 1234 svc-c.test-1.svc.cluster.local."),
+ test.SRV("_c-port._udp.headless-svc.test-1.svc.cluster.local. 303 IN SRV 10 100 1234 headless-svc.test-1.svc.cluster.local."),
+ test.SRV("_c-port._udp.headless-svc.test-1.svc.cluster.local. 303 IN SRV 10 100 1234 headless-svc.test-1.svc.cluster.local."),
},
},
{
@@ -167,6 +185,8 @@ var dnsTestCases = []test.Case{
test.SRV("_https._tcp.svc-1-a.test-1.svc.cluster.local. 303 IN SRV 10 100 443 svc-1-a.test-1.svc.cluster.local."),
test.SRV("_http._tcp.svc-1-b.test-1.svc.cluster.local. 303 IN SRV 10 100 80 svc-1-b.test-1.svc.cluster.local."),
test.SRV("_c-port._udp.svc-c.test-1.svc.cluster.local. 303 IN SRV 10 100 1234 svc-c.test-1.svc.cluster.local."),
+ test.SRV("_c-port._udp.headless-svc.test-1.svc.cluster.local. 303 IN SRV 10 100 1234 headless-svc.test-1.svc.cluster.local."),
+ test.SRV("_c-port._udp.headless-svc.test-1.svc.cluster.local. 303 IN SRV 10 100 1234 headless-svc.test-1.svc.cluster.local."),
},
},
{
@@ -223,6 +243,7 @@ func TestKubernetesIntegration(t *testing.T) {
time.Sleep(5 * time.Second)
for _, tc := range dnsTestCases {
+
dnsClient := new(dns.Client)
dnsMessage := new(dns.Msg)