aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--plugin/federation/kubernetes_api_test.go179
-rw-r--r--plugin/kubernetes/autopath.go5
-rw-r--r--plugin/kubernetes/controller.go290
-rw-r--r--plugin/kubernetes/controller_test.go46
-rw-r--r--plugin/kubernetes/handler_test.go284
-rw-r--r--plugin/kubernetes/kubernetes.go56
-rw-r--r--plugin/kubernetes/kubernetes_test.go327
-rw-r--r--plugin/kubernetes/local.go2
-rw-r--r--plugin/kubernetes/ns.go8
-rw-r--r--plugin/kubernetes/ns_test.go54
-rw-r--r--plugin/kubernetes/object/endpoint.go162
-rw-r--r--plugin/kubernetes/object/informer.go51
-rw-r--r--plugin/kubernetes/object/object.go94
-rw-r--r--plugin/kubernetes/object/pod.go72
-rw-r--r--plugin/kubernetes/object/service.go89
-rw-r--r--plugin/kubernetes/reverse.go4
-rw-r--r--plugin/kubernetes/reverse_test.go107
-rw-r--r--plugin/kubernetes/watch.go162
-rw-r--r--plugin/kubernetes/watch_test.go50
-rw-r--r--plugin/kubernetes/xfr.go12
-rw-r--r--plugin/kubernetes/xfr_test.go84
21 files changed, 1155 insertions, 983 deletions
diff --git a/plugin/federation/kubernetes_api_test.go b/plugin/federation/kubernetes_api_test.go
index f15b0fa23..4b62605d1 100644
--- a/plugin/federation/kubernetes_api_test.go
+++ b/plugin/federation/kubernetes_api_test.go
@@ -2,6 +2,7 @@ package federation
import (
"github.com/coredns/coredns/plugin/kubernetes"
+ "github.com/coredns/coredns/plugin/kubernetes/object"
"github.com/coredns/coredns/plugin/pkg/watch"
api "k8s.io/api/core/v1"
@@ -12,167 +13,111 @@ type APIConnFederationTest struct {
zone, region string
}
-func (APIConnFederationTest) HasSynced() bool { return true }
-func (APIConnFederationTest) Run() { return }
-func (APIConnFederationTest) Stop() error { return nil }
-func (APIConnFederationTest) SvcIndexReverse(string) []*api.Service { return nil }
-func (APIConnFederationTest) EpIndexReverse(string) []*api.Endpoints { return nil }
-func (APIConnFederationTest) Modified() int64 { return 0 }
-func (APIConnFederationTest) SetWatchChan(watch.Chan) {}
-func (APIConnFederationTest) Watch(string) error { return nil }
-func (APIConnFederationTest) StopWatching(string) {}
+func (APIConnFederationTest) HasSynced() bool { return true }
+func (APIConnFederationTest) Run() { return }
+func (APIConnFederationTest) Stop() error { return nil }
+func (APIConnFederationTest) SvcIndexReverse(string) []*object.Service { return nil }
+func (APIConnFederationTest) EpIndexReverse(string) []*object.Endpoints { return nil }
+func (APIConnFederationTest) Modified() int64 { return 0 }
+func (APIConnFederationTest) SetWatchChan(watch.Chan) {}
+func (APIConnFederationTest) Watch(string) error { return nil }
+func (APIConnFederationTest) StopWatching(string) {}
-func (APIConnFederationTest) PodIndex(string) []*api.Pod {
- a := []*api.Pod{{
- ObjectMeta: meta.ObjectMeta{
- Namespace: "podns",
- },
- Status: api.PodStatus{
- PodIP: "10.240.0.1", // Remote IP set in test.ResponseWriter
- },
- }}
- return a
+func (APIConnFederationTest) PodIndex(string) []*object.Pod {
+ return []*object.Pod{
+ {Namespace: "podns", PodIP: "10.240.0.1"}, // Remote IP set in test.ResponseWriter
+ }
}
-func (APIConnFederationTest) SvcIndex(string) []*api.Service {
- svcs := []*api.Service{
+func (APIConnFederationTest) SvcIndex(string) []*object.Service {
+ svcs := []*object.Service{
{
- ObjectMeta: meta.ObjectMeta{
- Name: "svc1",
- Namespace: "testns",
- },
- Spec: api.ServiceSpec{
- ClusterIP: "10.0.0.1",
- Ports: []api.ServicePort{{
- Name: "http",
- Protocol: "tcp",
- Port: 80,
- }},
+ Name: "svc1",
+ Namespace: "testns",
+ ClusterIP: "10.0.0.1",
+ Ports: []api.ServicePort{
+ {Name: "http", Protocol: "tcp", Port: 80},
},
},
{
- ObjectMeta: meta.ObjectMeta{
- Name: "hdls1",
- Namespace: "testns",
- },
- Spec: api.ServiceSpec{
- ClusterIP: api.ClusterIPNone,
- },
+ Name: "hdls1",
+ Namespace: "testns",
+ ClusterIP: api.ClusterIPNone,
},
{
- ObjectMeta: meta.ObjectMeta{
- Name: "external",
- Namespace: "testns",
- },
- Spec: api.ServiceSpec{
- ExternalName: "ext.interwebs.test",
- Ports: []api.ServicePort{{
- Name: "http",
- Protocol: "tcp",
- Port: 80,
- }},
+ Name: "external",
+ Namespace: "testns",
+ ExternalName: "ext.interwebs.test",
+ Ports: []api.ServicePort{
+ {Name: "http", Protocol: "tcp", Port: 80},
},
},
}
return svcs
}
-func (APIConnFederationTest) ServiceList() []*api.Service {
- svcs := []*api.Service{
+func (APIConnFederationTest) ServiceList() []*object.Service {
+ svcs := []*object.Service{
{
- ObjectMeta: meta.ObjectMeta{
- Name: "svc1",
- Namespace: "testns",
- },
- Spec: api.ServiceSpec{
- ClusterIP: "10.0.0.1",
- Ports: []api.ServicePort{{
- Name: "http",
- Protocol: "tcp",
- Port: 80,
- }},
+ Name: "svc1",
+ Namespace: "testns",
+ ClusterIP: "10.0.0.1",
+ Ports: []api.ServicePort{
+ {Name: "http", Protocol: "tcp", Port: 80},
},
},
{
- ObjectMeta: meta.ObjectMeta{
- Name: "hdls1",
- Namespace: "testns",
- },
- Spec: api.ServiceSpec{
- ClusterIP: api.ClusterIPNone,
- },
+ Name: "hdls1",
+ Namespace: "testns",
+ ClusterIP: api.ClusterIPNone,
},
{
- ObjectMeta: meta.ObjectMeta{
- Name: "external",
- Namespace: "testns",
- },
- Spec: api.ServiceSpec{
- ExternalName: "ext.interwebs.test",
- Ports: []api.ServicePort{{
- Name: "http",
- Protocol: "tcp",
- Port: 80,
- }},
+ Name: "external",
+ Namespace: "testns",
+ ExternalName: "ext.interwebs.test",
+ Ports: []api.ServicePort{
+ {Name: "http", Protocol: "tcp", Port: 80},
},
},
}
return svcs
}
-func (APIConnFederationTest) EpIndex(string) []*api.Endpoints {
- eps := []*api.Endpoints{
+func (APIConnFederationTest) EpIndex(string) []*object.Endpoints {
+ eps := []*object.Endpoints{
{
- Subsets: []api.EndpointSubset{
+ Subsets: []object.EndpointSubset{
{
- Addresses: []api.EndpointAddress{
- {
- IP: "172.0.0.1",
- Hostname: "ep1a",
- },
+ Addresses: []object.EndpointAddress{
+ {IP: "172.0.0.1", Hostname: "ep1a"},
},
- Ports: []api.EndpointPort{
- {
- Port: 80,
- Protocol: "tcp",
- Name: "http",
- },
+ Ports: []object.EndpointPort{
+ {Port: 80, Protocol: "tcp", Name: "http"},
},
},
},
- ObjectMeta: meta.ObjectMeta{
- Name: "svc1",
- Namespace: "testns",
- },
+ Name: "svc1",
+ Namespace: "testns",
},
}
return eps
}
-func (APIConnFederationTest) EndpointsList() []*api.Endpoints {
- eps := []*api.Endpoints{
+func (APIConnFederationTest) EndpointsList() []*object.Endpoints {
+ eps := []*object.Endpoints{
{
- Subsets: []api.EndpointSubset{
+ Subsets: []object.EndpointSubset{
{
- Addresses: []api.EndpointAddress{
- {
- IP: "172.0.0.1",
- Hostname: "ep1a",
- },
+ Addresses: []object.EndpointAddress{
+ {IP: "172.0.0.1", Hostname: "ep1a"},
},
- Ports: []api.EndpointPort{
- {
- Port: 80,
- Protocol: "tcp",
- Name: "http",
- },
+ Ports: []object.EndpointPort{
+ {Port: 80, Protocol: "tcp", Name: "http"},
},
},
},
- ObjectMeta: meta.ObjectMeta{
- Name: "svc1",
- Namespace: "testns",
- },
+ Name: "svc1",
+ Namespace: "testns",
},
}
return eps
diff --git a/plugin/kubernetes/autopath.go b/plugin/kubernetes/autopath.go
index 4d991a38f..71506ee3d 100644
--- a/plugin/kubernetes/autopath.go
+++ b/plugin/kubernetes/autopath.go
@@ -2,9 +2,8 @@ package kubernetes
import (
"github.com/coredns/coredns/plugin"
+ "github.com/coredns/coredns/plugin/kubernetes/object"
"github.com/coredns/coredns/request"
-
- api "k8s.io/api/core/v1"
)
// AutoPath implements the AutoPathFunc call from the autopath plugin.
@@ -40,7 +39,7 @@ func (k *Kubernetes) AutoPath(state request.Request) []string {
}
// podWithIP return the api.Pod for source IP ip. It returns nil if nothing can be found.
-func (k *Kubernetes) podWithIP(ip string) *api.Pod {
+func (k *Kubernetes) podWithIP(ip string) *object.Pod {
ps := k.APIConn.PodIndex(ip)
if len(ps) == 0 {
return nil
diff --git a/plugin/kubernetes/controller.go b/plugin/kubernetes/controller.go
index a4ec8bf07..1c41b6ddf 100644
--- a/plugin/kubernetes/controller.go
+++ b/plugin/kubernetes/controller.go
@@ -7,16 +7,16 @@ import (
"sync/atomic"
"time"
+ "github.com/coredns/coredns/plugin/kubernetes/object"
dnswatch "github.com/coredns/coredns/plugin/pkg/watch"
api "k8s.io/api/core/v1"
- "k8s.io/client-go/kubernetes"
- "k8s.io/client-go/tools/cache"
-
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
+ "k8s.io/client-go/kubernetes"
+ "k8s.io/client-go/tools/cache"
)
const (
@@ -28,13 +28,13 @@ const (
)
type dnsController interface {
- ServiceList() []*api.Service
- SvcIndex(string) []*api.Service
- SvcIndexReverse(string) []*api.Service
- PodIndex(string) []*api.Pod
- EpIndex(string) []*api.Endpoints
- EpIndexReverse(string) []*api.Endpoints
- EndpointsList() []*api.Endpoints
+ ServiceList() []*object.Service
+ EndpointsList() []*object.Endpoints
+ SvcIndex(string) []*object.Service
+ SvcIndexReverse(string) []*object.Service
+ PodIndex(string) []*object.Pod
+ EpIndex(string) []*object.Endpoints
+ EpIndexReverse(string) []*object.Endpoints
GetNodeByName(string) (*api.Node, error)
GetNamespaceByName(string) (*api.Namespace, error)
@@ -110,30 +110,34 @@ func newdnsController(kubeClient kubernetes.Interface, opts dnsControlOpts) *dns
endpointNameMode: opts.endpointNameMode,
}
- dns.svcLister, dns.svcController = cache.NewIndexerInformer(
+ dns.svcLister, dns.svcController = object.NewIndexerInformer(
&cache.ListWatch{
ListFunc: serviceListFunc(dns.client, api.NamespaceAll, dns.selector),
WatchFunc: serviceWatchFunc(dns.client, api.NamespaceAll, dns.selector),
},
- &api.Service{},
+ &object.Service{},
opts.resyncPeriod,
cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete},
- cache.Indexers{svcNameNamespaceIndex: svcNameNamespaceIndexFunc, svcIPIndex: svcIPIndexFunc})
+ cache.Indexers{svcNameNamespaceIndex: svcNameNamespaceIndexFunc, svcIPIndex: svcIPIndexFunc},
+ object.ToService,
+ )
if opts.initPodCache {
- dns.podLister, dns.podController = cache.NewIndexerInformer(
+ dns.podLister, dns.podController = object.NewIndexerInformer(
&cache.ListWatch{
ListFunc: podListFunc(dns.client, api.NamespaceAll, dns.selector),
WatchFunc: podWatchFunc(dns.client, api.NamespaceAll, dns.selector),
},
- &api.Pod{},
+ &object.Pod{},
opts.resyncPeriod,
cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete},
- cache.Indexers{podIPIndex: podIPIndexFunc})
+ cache.Indexers{podIPIndex: podIPIndexFunc},
+ object.ToPod,
+ )
}
if opts.initEndpointsCache {
- dns.epLister, dns.epController = cache.NewIndexerInformer(
+ dns.epLister, dns.epController = object.NewIndexerInformer(
&cache.ListWatch{
ListFunc: endpointsListFunc(dns.client, api.NamespaceAll, dns.selector),
WatchFunc: endpointsWatchFunc(dns.client, api.NamespaceAll, dns.selector),
@@ -141,7 +145,8 @@ func newdnsController(kubeClient kubernetes.Interface, opts dnsControlOpts) *dns
&api.Endpoints{},
opts.resyncPeriod,
cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete},
- cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc, epIPIndex: epIPIndexFunc})
+ cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc, epIPIndex: epIPIndexFunc},
+ object.ToEndpoints)
}
dns.nsLister, dns.nsController = cache.NewInformer(
@@ -155,49 +160,43 @@ func newdnsController(kubeClient kubernetes.Interface, opts dnsControlOpts) *dns
}
func podIPIndexFunc(obj interface{}) ([]string, error) {
- p, ok := obj.(*api.Pod)
+ p, ok := obj.(*object.Pod)
if !ok {
return nil, errObj
}
- return []string{p.Status.PodIP}, nil
+ return []string{p.PodIP}, nil
}
func svcIPIndexFunc(obj interface{}) ([]string, error) {
- svc, ok := obj.(*api.Service)
+ svc, ok := obj.(*object.Service)
if !ok {
return nil, errObj
}
- return []string{svc.Spec.ClusterIP}, nil
+ return []string{svc.ClusterIP}, nil
}
func svcNameNamespaceIndexFunc(obj interface{}) ([]string, error) {
- s, ok := obj.(*api.Service)
+ s, ok := obj.(*object.Service)
if !ok {
return nil, errObj
}
- return []string{s.ObjectMeta.Name + "." + s.ObjectMeta.Namespace}, nil
+ return []string{s.Index}, nil
}
func epNameNamespaceIndexFunc(obj interface{}) ([]string, error) {
- s, ok := obj.(*api.Endpoints)
+ s, ok := obj.(*object.Endpoints)
if !ok {
return nil, errObj
}
- return []string{s.ObjectMeta.Name + "." + s.ObjectMeta.Namespace}, nil
+ return []string{s.Index}, nil
}
func epIPIndexFunc(obj interface{}) ([]string, error) {
- ep, ok := obj.(*api.Endpoints)
+ ep, ok := obj.(*object.Endpoints)
if !ok {
return nil, errObj
}
- var idx []string
- for _, eps := range ep.Subsets {
- for _, addr := range eps.Addresses {
- idx = append(idx, addr.IP)
- }
- }
- return idx, nil
+ return ep.IndexIP, nil
}
func serviceListFunc(c kubernetes.Interface, ns string, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) {
@@ -335,10 +334,10 @@ func (dns *dnsControl) HasSynced() bool {
return a && b && c && d
}
-func (dns *dnsControl) ServiceList() (svcs []*api.Service) {
+func (dns *dnsControl) ServiceList() (svcs []*object.Service) {
os := dns.svcLister.List()
for _, o := range os {
- s, ok := o.(*api.Service)
+ s, ok := o.(*object.Service)
if !ok {
continue
}
@@ -347,16 +346,25 @@ func (dns *dnsControl) ServiceList() (svcs []*api.Service) {
return svcs
}
-func (dns *dnsControl) PodIndex(ip string) (pods []*api.Pod) {
- if dns.podLister == nil {
- return nil
+func (dns *dnsControl) EndpointsList() (eps []*object.Endpoints) {
+ os := dns.epLister.List()
+ for _, o := range os {
+ ep, ok := o.(*object.Endpoints)
+ if !ok {
+ continue
+ }
+ eps = append(eps, ep)
}
+ return eps
+}
+
+func (dns *dnsControl) PodIndex(ip string) (pods []*object.Pod) {
os, err := dns.podLister.ByIndex(podIPIndex, ip)
if err != nil {
return nil
}
for _, o := range os {
- p, ok := o.(*api.Pod)
+ p, ok := o.(*object.Pod)
if !ok {
continue
}
@@ -365,16 +373,13 @@ func (dns *dnsControl) PodIndex(ip string) (pods []*api.Pod) {
return pods
}
-func (dns *dnsControl) SvcIndex(idx string) (svcs []*api.Service) {
- if dns.svcLister == nil {
- return nil
- }
+func (dns *dnsControl) SvcIndex(idx string) (svcs []*object.Service) {
os, err := dns.svcLister.ByIndex(svcNameNamespaceIndex, idx)
if err != nil {
return nil
}
for _, o := range os {
- s, ok := o.(*api.Service)
+ s, ok := o.(*object.Service)
if !ok {
continue
}
@@ -383,17 +388,14 @@ func (dns *dnsControl) SvcIndex(idx string) (svcs []*api.Service) {
return svcs
}
-func (dns *dnsControl) SvcIndexReverse(ip string) (svcs []*api.Service) {
- if dns.svcLister == nil {
- return nil
- }
+func (dns *dnsControl) SvcIndexReverse(ip string) (svcs []*object.Service) {
os, err := dns.svcLister.ByIndex(svcIPIndex, ip)
if err != nil {
return nil
}
for _, o := range os {
- s, ok := o.(*api.Service)
+ s, ok := o.(*object.Service)
if !ok {
continue
}
@@ -402,16 +404,13 @@ func (dns *dnsControl) SvcIndexReverse(ip string) (svcs []*api.Service) {
return svcs
}
-func (dns *dnsControl) EpIndex(idx string) (ep []*api.Endpoints) {
- if dns.epLister == nil {
- return nil
- }
+func (dns *dnsControl) EpIndex(idx string) (ep []*object.Endpoints) {
os, err := dns.epLister.ByIndex(epNameNamespaceIndex, idx)
if err != nil {
return nil
}
for _, o := range os {
- e, ok := o.(*api.Endpoints)
+ e, ok := o.(*object.Endpoints)
if !ok {
continue
}
@@ -420,16 +419,13 @@ func (dns *dnsControl) EpIndex(idx string) (ep []*api.Endpoints) {
return ep
}
-func (dns *dnsControl) EpIndexReverse(ip string) (ep []*api.Endpoints) {
- if dns.svcLister == nil {
- return nil
- }
+func (dns *dnsControl) EpIndexReverse(ip string) (ep []*object.Endpoints) {
os, err := dns.epLister.ByIndex(epIPIndex, ip)
if err != nil {
return nil
}
for _, o := range os {
- e, ok := o.(*api.Endpoints)
+ e, ok := o.(*object.Endpoints)
if !ok {
continue
}
@@ -438,21 +434,6 @@ func (dns *dnsControl) EpIndexReverse(ip string) (ep []*api.Endpoints) {
return ep
}
-func (dns *dnsControl) EndpointsList() (eps []*api.Endpoints) {
- if dns.epLister == nil {
- return nil
- }
- os := dns.epLister.List()
- for _, o := range os {
- ep, ok := o.(*api.Endpoints)
- if !ok {
- continue
- }
- eps = append(eps, ep)
- }
- return eps
-}
-
// GetNodeByName return the node by name. If nothing is found an error is
// returned. This query causes a roundtrip to the k8s API server, so use
// sparingly. Currently this is only used for Federation.
@@ -461,8 +442,7 @@ func (dns *dnsControl) GetNodeByName(name string) (*api.Node, error) {
return v1node, err
}
-// GetNamespaceByName returns the namespace by name. If nothing is found an
-// error is returned.
+// GetNamespaceByName returns the namespace by name. If nothing is found an error is returned.
func (dns *dnsControl) GetNamespaceByName(name string) (*api.Namespace, error) {
os := dns.nsLister.List()
for _, o := range os {
@@ -488,160 +468,4 @@ func (dns *dnsControl) updateModifed() {
atomic.StoreInt64(&dns.modified, unix)
}
-func (dns *dnsControl) sendServiceUpdates(s *api.Service) {
- for i := range dns.zones {
- name := serviceFQDN(s, dns.zones[i])
- if _, ok := dns.watched[name]; ok {
- dns.watchChan <- name
- }
- }
-}
-
-func (dns *dnsControl) sendPodUpdates(p *api.Pod) {
- for i := range dns.zones {
- name := podFQDN(p, dns.zones[i])
- if _, ok := dns.watched[name]; ok {
- dns.watchChan <- name
- }
- }
-}
-
-func (dns *dnsControl) sendEndpointsUpdates(ep *api.Endpoints) {
- for _, zone := range dns.zones {
- names := append(endpointFQDN(ep, zone, dns.endpointNameMode), serviceFQDN(ep, zone))
- for _, name := range names {
- if _, ok := dns.watched[name]; ok {
- dns.watchChan <- name
- }
- }
- }
-}
-
-// endpointsSubsetDiffs returns an Endpoints struct containing the Subsets that have changed between a and b.
-// When we notify clients of changed endpoints we only want to notify them of endpoints that have changed.
-// The Endpoints API object holds more than one endpoint, held in a list of Subsets. Each Subset refers to
-// an endpoint. So, here we create a new Endpoints struct, and populate it with only the endpoints that have changed.
-// This new Endpoints object is later used to generate the list of endpoint FQDNs to send to the client.
-// This function computes this literally by combining the sets (in a and not in b) union (in b and not in a).
-func endpointsSubsetDiffs(a, b *api.Endpoints) *api.Endpoints {
- c := b.DeepCopy()
- c.Subsets = []api.EndpointSubset{}
-
- // In the following loop, the first iteration computes (in a but not in b).
- // The second iteration then adds (in b but not in a)
- // The end result is an Endpoints that only contains the subsets (endpoints) that are different between a and b.
- for _, abba := range [][]*api.Endpoints{{a, b}, {b, a}} {
- a := abba[0]
- b := abba[1]
- left:
- for _, as := range a.Subsets {
- for _, bs := range b.Subsets {
- if subsetsEquivalent(as, bs) {
- continue left
- }
- }
- c.Subsets = append(c.Subsets, as)
- }
- }
- return c
-}
-
-// sendUpdates sends a notification to the server if a watch is enabled for the qname.
-func (dns *dnsControl) sendUpdates(oldObj, newObj interface{}) {
- // If both objects have the same resource version, they are identical.
- if newObj != nil && oldObj != nil && (oldObj.(meta.Object).GetResourceVersion() == newObj.(meta.Object).GetResourceVersion()) {
- return
- }
- obj := newObj
- if obj == nil {
- obj = oldObj
- }
- switch ob := obj.(type) {
- case *api.Service:
- dns.updateModifed()
- dns.sendServiceUpdates(ob)
- case *api.Endpoints:
- if newObj == nil || oldObj == nil {
- dns.updateModifed()
- dns.sendEndpointsUpdates(ob)
- return
- }
- p := oldObj.(*api.Endpoints)
- // endpoint updates can come frequently, make sure it's a change we care about
- if endpointsEquivalent(p, ob) {
- return
- }
- dns.updateModifed()
- dns.sendEndpointsUpdates(endpointsSubsetDiffs(p, ob))
- case *api.Pod:
- dns.updateModifed()
- dns.sendPodUpdates(ob)
- default:
- log.Warningf("Updates for %T not supported.", ob)
- }
-}
-
-func (dns *dnsControl) Add(obj interface{}) { dns.sendUpdates(nil, obj) }
-func (dns *dnsControl) Delete(obj interface{}) { dns.sendUpdates(obj, nil) }
-func (dns *dnsControl) Update(oldObj, newObj interface{}) { dns.sendUpdates(oldObj, newObj) }
-
-// subsetsEquivalent checks if two endpoint subsets are significantly equivalent
-// I.e. that they have the same ready addresses, host names, ports (including protocol
-// and service names for SRV)
-func subsetsEquivalent(sa, sb api.EndpointSubset) bool {
- if len(sa.Addresses) != len(sb.Addresses) {
- return false
- }
- if len(sa.Ports) != len(sb.Ports) {
- return false
- }
-
- // in Addresses and Ports, we should be able to rely on
- // these being sorted and able to be compared
- // they are supposed to be in a canonical format
- for addr, aaddr := range sa.Addresses {
- baddr := sb.Addresses[addr]
- if aaddr.IP != baddr.IP {
- return false
- }
- if aaddr.Hostname != baddr.Hostname {
- return false
- }
- }
-
- for port, aport := range sa.Ports {
- bport := sb.Ports[port]
- if aport.Name != bport.Name {
- return false
- }
- if aport.Port != bport.Port {
- return false
- }
- if aport.Protocol != bport.Protocol {
- return false
- }
- }
- return true
-}
-
-// endpointsEquivalent checks if the update to an endpoint is something
-// that matters to us or if they are effectively equivalent.
-func endpointsEquivalent(a, b *api.Endpoints) bool {
-
- if len(a.Subsets) != len(b.Subsets) {
- return false
- }
-
- // we should be able to rely on
- // these being sorted and able to be compared
- // they are supposed to be in a canonical format
- for i, sa := range a.Subsets {
- sb := b.Subsets[i]
- if !subsetsEquivalent(sa, sb) {
- return false
- }
- }
- return true
-}
-
var errObj = errors.New("obj was not of the correct type")
diff --git a/plugin/kubernetes/controller_test.go b/plugin/kubernetes/controller_test.go
index 1663e4c6a..99d7e92f7 100644
--- a/plugin/kubernetes/controller_test.go
+++ b/plugin/kubernetes/controller_test.go
@@ -4,10 +4,10 @@ import (
"context"
"net"
"strconv"
- "strings"
"testing"
"github.com/coredns/coredns/plugin/test"
+
"github.com/miekg/dns"
api "k8s.io/api/core/v1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -15,50 +15,6 @@ import (
"k8s.io/client-go/kubernetes/fake"
)
-func endpointSubsets(addrs ...string) (eps []api.EndpointSubset) {
- for _, ap := range addrs {
- apa := strings.Split(ap, ":")
- address := apa[0]
- port, _ := strconv.Atoi(apa[1])
- eps = append(eps, api.EndpointSubset{Addresses: []api.EndpointAddress{{IP: address}}, Ports: []api.EndpointPort{{Port: int32(port)}}})
- }
- return eps
-}
-
-func TestEndpointsSubsetDiffs(t *testing.T) {
- var tests = []struct {
- a, b, expected api.Endpoints
- }{
- { // From a->b: Nothing changes
- api.Endpoints{Subsets: endpointSubsets("10.0.0.1:80", "10.0.0.2:8080")},
- api.Endpoints{Subsets: endpointSubsets("10.0.0.1:80", "10.0.0.2:8080")},
- api.Endpoints{},
- },
- { // From a->b: Everything goes away
- api.Endpoints{Subsets: endpointSubsets("10.0.0.1:80", "10.0.0.2:8080")},
- api.Endpoints{},
- api.Endpoints{Subsets: endpointSubsets("10.0.0.1:80", "10.0.0.2:8080")},
- },
- { // From a->b: Everything is new
- api.Endpoints{},
- api.Endpoints{Subsets: endpointSubsets("10.0.0.1:80", "10.0.0.2:8080")},
- api.Endpoints{Subsets: endpointSubsets("10.0.0.1:80", "10.0.0.2:8080")},
- },
- { // From a->b: One goes away, one is new
- api.Endpoints{Subsets: endpointSubsets("10.0.0.2:8080")},
- api.Endpoints{Subsets: endpointSubsets("10.0.0.1:80")},
- api.Endpoints{Subsets: endpointSubsets("10.0.0.2:8080", "10.0.0.1:80")},
- },
- }
-
- for i, te := range tests {
- got := endpointsSubsetDiffs(&te.a, &te.b)
- if !endpointsEquivalent(got, &te.expected) {
- t.Errorf("Expected '%v' for test %v, got '%v'.", te.expected, i, got)
- }
- }
-}
-
func inc(ip net.IP) {
for j := len(ip) - 1; j >= 0; j-- {
ip[j]++
diff --git a/plugin/kubernetes/handler_test.go b/plugin/kubernetes/handler_test.go
index fa4a698ad..604f00fab 100644
--- a/plugin/kubernetes/handler_test.go
+++ b/plugin/kubernetes/handler_test.go
@@ -5,6 +5,7 @@ import (
"testing"
"time"
+ "github.com/coredns/coredns/plugin/kubernetes/object"
"github.com/coredns/coredns/plugin/pkg/dnstest"
"github.com/coredns/coredns/plugin/pkg/watch"
"github.com/coredns/coredns/plugin/test"
@@ -185,6 +186,16 @@ var dnsTestCases = []test.Case{
test.A("dup-name.hdls1.testns.svc.cluster.local. 5 IN A 172.0.0.5"),
},
},
+ { // An A record query for an existing headless service should return a record for each of its ipv4 endpoints
+ Qname: "hdls1.testns.svc.cluster.local.", Qtype: dns.TypeA,
+ Rcode: dns.RcodeSuccess,
+ Answer: []dns.RR{
+ test.A("hdls1.testns.svc.cluster.local. 5 IN A 172.0.0.2"),
+ test.A("hdls1.testns.svc.cluster.local. 5 IN A 172.0.0.3"),
+ test.A("hdls1.testns.svc.cluster.local. 5 IN A 172.0.0.4"),
+ test.A("hdls1.testns.svc.cluster.local. 5 IN A 172.0.0.5"),
+ },
+ },
// SRV Service (Headless and portless)
{
Qname: "*.*.hdlsprtls.testns.svc.cluster.local.", Qtype: dns.TypeSRV,
@@ -368,263 +379,188 @@ func TestServeDNS(t *testing.T) {
type APIConnServeTest struct{}
-func (APIConnServeTest) HasSynced() bool { return true }
-func (APIConnServeTest) Run() { return }
-func (APIConnServeTest) Stop() error { return nil }
-func (APIConnServeTest) EpIndexReverse(string) []*api.Endpoints { return nil }
-func (APIConnServeTest) SvcIndexReverse(string) []*api.Service { return nil }
-func (APIConnServeTest) Modified() int64 { return time.Now().Unix() }
-func (APIConnServeTest) SetWatchChan(watch.Chan) {}
-func (APIConnServeTest) Watch(string) error { return nil }
-func (APIConnServeTest) StopWatching(string) {}
+func (APIConnServeTest) HasSynced() bool { return true }
+func (APIConnServeTest) Run() { return }
+func (APIConnServeTest) Stop() error { return nil }
+func (APIConnServeTest) EpIndexReverse(string) []*object.Endpoints { return nil }
+func (APIConnServeTest) SvcIndexReverse(string) []*object.Service { return nil }
+func (APIConnServeTest) Modified() int64 { return time.Now().Unix() }
+func (APIConnServeTest) SetWatchChan(watch.Chan) {}
+func (APIConnServeTest) Watch(string) error { return nil }
+func (APIConnServeTest) StopWatching(string) {}
-func (APIConnServeTest) PodIndex(string) []*api.Pod {
- a := []*api.Pod{{
- ObjectMeta: meta.ObjectMeta{
- Namespace: "podns",
- },
- Status: api.PodStatus{
- PodIP: "10.240.0.1", // Remote IP set in test.ResponseWriter
- },
- }}
+func (APIConnServeTest) PodIndex(string) []*object.Pod {
+ a := []*object.Pod{
+ {Namespace: "podns", PodIP: "10.240.0.1"}, // Remote IP set in test.ResponseWriter
+ }
return a
}
-var svcIndex = map[string][]*api.Service{
- "svc1.testns": {{
- ObjectMeta: meta.ObjectMeta{
+var svcIndex = map[string][]*object.Service{
+ "svc1.testns": {
+ {
Name: "svc1",
Namespace: "testns",
- },
- Spec: api.ServiceSpec{
Type: api.ServiceTypeClusterIP,
ClusterIP: "10.0.0.1",
- Ports: []api.ServicePort{{
- Name: "http",
- Protocol: "tcp",
- Port: 80,
- }},
+ Ports: []api.ServicePort{
+ {Name: "http", Protocol: "tcp", Port: 80},
+ },
},
- }},
- "svcempty.testns": {{
- ObjectMeta: meta.ObjectMeta{
+ },
+ "svcempty.testns": {
+ {
Name: "svcempty",
Namespace: "testns",
- },
- Spec: api.ServiceSpec{
Type: api.ServiceTypeClusterIP,
ClusterIP: "10.0.0.1",
- Ports: []api.ServicePort{{
- Name: "http",
- Protocol: "tcp",
- Port: 80,
- }},
+ Ports: []api.ServicePort{
+ {Name: "http", Protocol: "tcp", Port: 80},
+ },
},
- }},
- "svc6.testns": {{
- ObjectMeta: meta.ObjectMeta{
+ },
+ "svc6.testns": {
+ {
Name: "svc6",
Namespace: "testns",
- },
- Spec: api.ServiceSpec{
Type: api.ServiceTypeClusterIP,
ClusterIP: "1234:abcd::1",
- Ports: []api.ServicePort{{
- Name: "http",
- Protocol: "tcp",
- Port: 80,
- }},
+ Ports: []api.ServicePort{
+ {Name: "http", Protocol: "tcp", Port: 80},
+ },
},
- }},
- "hdls1.testns": {{
- ObjectMeta: meta.ObjectMeta{
+ },
+ "hdls1.testns": {
+ {
Name: "hdls1",
Namespace: "testns",
- },
- Spec: api.ServiceSpec{
Type: api.ServiceTypeClusterIP,
ClusterIP: api.ClusterIPNone,
},
- }},
- "external.testns": {{
- ObjectMeta: meta.ObjectMeta{
- Name: "external",
- Namespace: "testns",
- },
- Spec: api.ServiceSpec{
+ },
+ "external.testns": {
+ {
+ Name: "external",
+ Namespace: "testns",
ExternalName: "ext.interwebs.test",
- Ports: []api.ServicePort{{
- Name: "http",
- Protocol: "tcp",
- Port: 80,
- }},
- Type: api.ServiceTypeExternalName,
- },
- }},
- "external-to-service.testns": {{
- ObjectMeta: meta.ObjectMeta{
- Name: "external-to-service",
- Namespace: "testns",
+ Type: api.ServiceTypeExternalName,
+ Ports: []api.ServicePort{
+ {Name: "http", Protocol: "tcp", Port: 80},
+ },
},
- Spec: api.ServiceSpec{
+ },
+ "external-to-service.testns": {
+ {
+ Name: "external-to-service",
+ Namespace: "testns",
ExternalName: "svc1.testns.svc.cluster.local.",
- Ports: []api.ServicePort{{
- Name: "http",
- Protocol: "tcp",
- Port: 80,
- }},
- Type: api.ServiceTypeExternalName,
+ Type: api.ServiceTypeExternalName,
+ Ports: []api.ServicePort{
+ {Name: "http", Protocol: "tcp", Port: 80},
+ },
},
- }},
- "hdlsprtls.testns": {{
- ObjectMeta: meta.ObjectMeta{
+ },
+ "hdlsprtls.testns": {
+ {
Name: "hdlsprtls",
Namespace: "testns",
- },
- Spec: api.ServiceSpec{
Type: api.ServiceTypeClusterIP,
ClusterIP: api.ClusterIPNone,
},
- }},
- "svc1.unexposedns": {{
- ObjectMeta: meta.ObjectMeta{
+ },
+ "svc1.unexposedns": {
+ {
Name: "svc1",
Namespace: "unexposedns",
- },
- Spec: api.ServiceSpec{
Type: api.ServiceTypeClusterIP,
ClusterIP: "10.0.0.2",
- Ports: []api.ServicePort{{
- Name: "http",
- Protocol: "tcp",
- Port: 80,
- }},
+ Ports: []api.ServicePort{
+ {Name: "http", Protocol: "tcp", Port: 80},
+ },
},
- }},
+ },
}
-func (APIConnServeTest) SvcIndex(s string) []*api.Service {
- return svcIndex[s]
-}
+func (APIConnServeTest) SvcIndex(s string) []*object.Service { return svcIndex[s] }
-func (APIConnServeTest) ServiceList() []*api.Service {
- var svcs []*api.Service
+func (APIConnServeTest) ServiceList() []*object.Service {
+ var svcs []*object.Service
for _, svc := range svcIndex {
svcs = append(svcs, svc...)
}
return svcs
}
-var epsIndex = map[string][]*api.Endpoints{
+var epsIndex = map[string][]*object.Endpoints{
"svc1.testns": {{
- Subsets: []api.EndpointSubset{
+ Subsets: []object.EndpointSubset{
{
- Addresses: []api.EndpointAddress{
- {
- IP: "172.0.0.1",
- Hostname: "ep1a",
- },
+ Addresses: []object.EndpointAddress{
+ {IP: "172.0.0.1", Hostname: "ep1a"},
},
- Ports: []api.EndpointPort{
- {
- Port: 80,
- Protocol: "tcp",
- Name: "http",
- },
+ Ports: []object.EndpointPort{
+ {Port: 80, Protocol: "tcp", Name: "http"},
},
},
},
- ObjectMeta: meta.ObjectMeta{
- Name: "svc1",
- Namespace: "testns",
- },
+ Name: "svc1",
+ Namespace: "testns",
}},
"svcempty.testns": {{
- Subsets: []api.EndpointSubset{
+ Subsets: []object.EndpointSubset{
{
Addresses: nil,
- Ports: []api.EndpointPort{
- {
- Port: 80,
- Protocol: "tcp",
- Name: "http",
- },
+ Ports: []object.EndpointPort{
+ {Port: 80, Protocol: "tcp", Name: "http"},
},
},
},
- ObjectMeta: meta.ObjectMeta{
- Name: "svcempty",
- Namespace: "testns",
- },
+ Name: "svcempty",
+ Namespace: "testns",
}},
"hdls1.testns": {{
- Subsets: []api.EndpointSubset{
+ Subsets: []object.EndpointSubset{
{
- Addresses: []api.EndpointAddress{
- {
- IP: "172.0.0.2",
- },
- {
- IP: "172.0.0.3",
- },
- {
- IP: "172.0.0.4",
- Hostname: "dup-name",
- },
- {
- IP: "172.0.0.5",
- Hostname: "dup-name",
- },
- {
- IP: "5678:abcd::1",
- },
- {
- IP: "5678:abcd::2",
- },
+ Addresses: []object.EndpointAddress{
+ {IP: "172.0.0.2"},
+ {IP: "172.0.0.3"},
+ {IP: "172.0.0.4", Hostname: "dup-name"},
+ {IP: "172.0.0.5", Hostname: "dup-name"},
+ {IP: "5678:abcd::1"},
+ {IP: "5678:abcd::2"},
},
- Ports: []api.EndpointPort{
- {
- Port: 80,
- Protocol: "tcp",
- Name: "http",
- },
+ Ports: []object.EndpointPort{
+ {Port: 80, Protocol: "tcp", Name: "http"},
},
},
},
- ObjectMeta: meta.ObjectMeta{
- Name: "hdls1",
- Namespace: "testns",
- },
+ Name: "hdls1",
+ Namespace: "testns",
}},
"hdlsprtls.testns": {{
- Subsets: []api.EndpointSubset{
+ Subsets: []object.EndpointSubset{
{
- Addresses: []api.EndpointAddress{
- {
- IP: "172.0.0.20",
- },
+ Addresses: []object.EndpointAddress{
+ {IP: "172.0.0.20"},
},
- Ports: []api.EndpointPort{},
+ Ports: []object.EndpointPort{{Port: -1}},
},
},
- ObjectMeta: meta.ObjectMeta{
- Name: "hdlsprtls",
- Namespace: "testns",
- },
+ Name: "hdlsprtls",
+ Namespace: "testns",
}},
}
-func (APIConnServeTest) EpIndex(s string) []*api.Endpoints {
+func (APIConnServeTest) EpIndex(s string) []*object.Endpoints {
return epsIndex[s]
}
-func (APIConnServeTest) EndpointsList() []*api.Endpoints {
- var eps []*api.Endpoints
+func (APIConnServeTest) EndpointsList() []*object.Endpoints {
+ var eps []*object.Endpoints
for _, ep := range epsIndex {
eps = append(eps, ep...)
}
return eps
-
}
func (APIConnServeTest) GetNodeByName(name string) (*api.Node, error) {
diff --git a/plugin/kubernetes/kubernetes.go b/plugin/kubernetes/kubernetes.go
index 5be8e558e..81cf19492 100644
--- a/plugin/kubernetes/kubernetes.go
+++ b/plugin/kubernetes/kubernetes.go
@@ -11,6 +11,7 @@ import (
"github.com/coredns/coredns/plugin"
"github.com/coredns/coredns/plugin/etcd/msg"
+ "github.com/coredns/coredns/plugin/kubernetes/object"
"github.com/coredns/coredns/plugin/pkg/dnsutil"
"github.com/coredns/coredns/plugin/pkg/fall"
"github.com/coredns/coredns/plugin/pkg/healthcheck"
@@ -304,18 +305,18 @@ func serviceFQDN(obj meta.Object, zone string) string {
}
// podFQDN returns the k8s cluster dns spec FQDN for the pod.
-func podFQDN(p *api.Pod, zone string) string {
- if strings.Contains(p.Status.PodIP, ".") {
- name := strings.Replace(p.Status.PodIP, ".", "-", -1)
+func podFQDN(p *object.Pod, zone string) string {
+ if strings.Contains(p.PodIP, ".") {
+ name := strings.Replace(p.PodIP, ".", "-", -1)
return dnsutil.Join(name, p.GetNamespace(), Pod, zone)
}
- name := strings.Replace(p.Status.PodIP, ":", "-", -1)
+ name := strings.Replace(p.PodIP, ":", "-", -1)
return dnsutil.Join(name, p.GetNamespace(), Pod, zone)
}
// endpointFQDN returns a list of k8s cluster dns spec service FQDNs for each subset in the endpoint.
-func endpointFQDN(ep *api.Endpoints, zone string, endpointNameMode bool) []string {
+func endpointFQDN(ep *object.Endpoints, zone string, endpointNameMode bool) []string {
var names []string
for _, ss := range ep.Subsets {
for _, addr := range ss.Addresses {
@@ -325,12 +326,12 @@ func endpointFQDN(ep *api.Endpoints, zone string, endpointNameMode bool) []strin
return names
}
-func endpointHostname(addr api.EndpointAddress, endpointNameMode bool) string {
+func endpointHostname(addr object.EndpointAddress, endpointNameMode bool) string {
if addr.Hostname != "" {
return addr.Hostname
}
- if endpointNameMode && addr.TargetRef != nil && addr.TargetRef.Name != "" {
- return addr.TargetRef.Name
+ if endpointNameMode && addr.TargetRefName != "" {
+ return addr.TargetRefName
}
if strings.Contains(addr.IP, ".") {
return strings.Replace(addr.IP, ".", "-", -1)
@@ -396,12 +397,12 @@ func (k *Kubernetes) findPods(r recordRequest, zone string) (pods []msg.Service,
}
// exclude pods in the process of termination
- if !p.ObjectMeta.DeletionTimestamp.IsZero() {
+ if p.Deleting {
continue
}
// check for matching ip and namespace
- if ip == p.Status.PodIP && match(namespace, p.Namespace) {
+ if ip == p.PodIP && match(namespace, p.Namespace) {
s := msg.Service{Key: strings.Join([]string{zonePath, Pod, namespace, podname}, "/"), Host: ip, TTL: k.ttl}
pods = append(pods, s)
@@ -424,9 +425,9 @@ func (k *Kubernetes) findServices(r recordRequest, zone string) (services []msg.
}
var (
- endpointsListFunc func() []*api.Endpoints
- endpointsList []*api.Endpoints
- serviceList []*api.Service
+ endpointsListFunc func() []*object.Endpoints
+ endpointsList []*object.Endpoints
+ serviceList []*object.Service
)
// handle empty service name
@@ -441,11 +442,11 @@ func (k *Kubernetes) findServices(r recordRequest, zone string) (services []msg.
if wildcard(r.service) || wildcard(r.namespace) {
serviceList = k.APIConn.ServiceList()
- endpointsListFunc = func() []*api.Endpoints { return k.APIConn.EndpointsList() }
+ endpointsListFunc = func() []*object.Endpoints { return k.APIConn.EndpointsList() }
} else {
- idx := r.service + "." + r.namespace
+ idx := object.ServiceKey(r.service, r.namespace)
serviceList = k.APIConn.SvcIndex(idx)
- endpointsListFunc = func() []*api.Endpoints { return k.APIConn.EpIndex(idx) }
+ endpointsListFunc = func() []*object.Endpoints { return k.APIConn.EpIndex(idx) }
}
for _, svc := range serviceList {
@@ -459,7 +460,7 @@ func (k *Kubernetes) findServices(r recordRequest, zone string) (services []msg.
continue
}
- if k.opts.ignoreEmptyService && svc.Spec.ClusterIP != api.ClusterIPNone {
+ if k.opts.ignoreEmptyService && svc.ClusterIP != api.ClusterIPNone {
// serve NXDOMAIN if no endpoint is able to answer
podsCount := 0
for _, ep := range endpointsListFunc() {
@@ -474,12 +475,12 @@ func (k *Kubernetes) findServices(r recordRequest, zone string) (services []msg.
}
// Endpoint query or headless service
- if svc.Spec.ClusterIP == api.ClusterIPNone || r.endpoint != "" {
+ if svc.ClusterIP == api.ClusterIPNone || r.endpoint != "" {
if endpointsList == nil {
endpointsList = endpointsListFunc()
}
for _, ep := range endpointsList {
- if ep.ObjectMeta.Name != svc.Name || ep.ObjectMeta.Namespace != svc.Namespace {
+ if ep.Name != svc.Name || ep.Namespace != svc.Namespace {
continue
}
@@ -493,11 +494,6 @@ func (k *Kubernetes) findServices(r recordRequest, zone string) (services []msg.
}
}
- if len(eps.Ports) == 0 {
- // add a sentinel port (-1) entry so we create records for services without any declared ports
- eps.Ports = append(eps.Ports, api.EndpointPort{Port: -1})
- }
-
for _, p := range eps.Ports {
if !(match(r.port, p.Name) && match(r.protocol, string(p.Protocol))) {
continue
@@ -516,8 +512,8 @@ func (k *Kubernetes) findServices(r recordRequest, zone string) (services []msg.
}
// External service
- if svc.Spec.Type == api.ServiceTypeExternalName {
- s := msg.Service{Key: strings.Join([]string{zonePath, Svc, svc.Namespace, svc.Name}, "/"), Host: svc.Spec.ExternalName, TTL: k.ttl}
+ if svc.Type == api.ServiceTypeExternalName {
+ s := msg.Service{Key: strings.Join([]string{zonePath, Svc, svc.Namespace, svc.Name}, "/"), Host: svc.ExternalName, TTL: k.ttl}
if t, _ := s.HostType(); t == dns.TypeCNAME {
s.Key = strings.Join([]string{zonePath, Svc, svc.Namespace, svc.Name}, "/")
services = append(services, s)
@@ -528,18 +524,14 @@ func (k *Kubernetes) findServices(r recordRequest, zone string) (services []msg.
}
// ClusterIP service
- if len(svc.Spec.Ports) == 0 {
- // add a sentinel port (-1) entry so we create records for services without any declared ports
- svc.Spec.Ports = append(svc.Spec.Ports, api.ServicePort{Port: -1})
- }
- for _, p := range svc.Spec.Ports {
+ for _, p := range svc.Ports {
if !(match(r.port, p.Name) && match(r.protocol, string(p.Protocol))) {
continue
}
err = nil
- s := msg.Service{Host: svc.Spec.ClusterIP, Port: int(p.Port), TTL: k.ttl}
+ s := msg.Service{Host: svc.ClusterIP, Port: int(p.Port), TTL: k.ttl}
s.Key = strings.Join([]string{zonePath, Svc, svc.Namespace, svc.Name}, "/")
services = append(services, s)
diff --git a/plugin/kubernetes/kubernetes_test.go b/plugin/kubernetes/kubernetes_test.go
index 36d00a92f..f35c9cd2c 100644
--- a/plugin/kubernetes/kubernetes_test.go
+++ b/plugin/kubernetes/kubernetes_test.go
@@ -4,6 +4,7 @@ import (
"testing"
"github.com/coredns/coredns/plugin"
+ "github.com/coredns/coredns/plugin/kubernetes/object"
"github.com/coredns/coredns/plugin/pkg/watch"
"github.com/coredns/coredns/request"
@@ -49,7 +50,7 @@ func TestEndpointHostname(t *testing.T) {
{"10.11.12.13", "", "hello-abcde", "hello-abcde", true},
}
for _, test := range tests {
- result := endpointHostname(api.EndpointAddress{IP: test.ip, Hostname: test.hostname, TargetRef: &api.ObjectReference{Name: test.podName}}, test.endpointNameMode)
+ result := endpointHostname(object.EndpointAddress{IP: test.ip, Hostname: test.hostname, TargetRefName: test.podName}, test.endpointNameMode)
if result != test.expected {
t.Errorf("Expected endpoint name for (ip:%v hostname:%v) to be '%v', but got '%v'", test.ip, test.hostname, test.expected, result)
}
@@ -58,184 +59,122 @@ func TestEndpointHostname(t *testing.T) {
type APIConnServiceTest struct{}
-func (APIConnServiceTest) HasSynced() bool { return true }
-func (APIConnServiceTest) Run() { return }
-func (APIConnServiceTest) Stop() error { return nil }
-func (APIConnServiceTest) PodIndex(string) []*api.Pod { return nil }
-func (APIConnServiceTest) SvcIndexReverse(string) []*api.Service { return nil }
-func (APIConnServiceTest) EpIndexReverse(string) []*api.Endpoints { return nil }
-func (APIConnServiceTest) Modified() int64 { return 0 }
-func (APIConnServiceTest) SetWatchChan(watch.Chan) {}
-func (APIConnServiceTest) Watch(string) error { return nil }
-func (APIConnServiceTest) StopWatching(string) {}
+func (APIConnServiceTest) HasSynced() bool { return true }
+func (APIConnServiceTest) Run() { return }
+func (APIConnServiceTest) Stop() error { return nil }
+func (APIConnServiceTest) PodIndex(string) []*object.Pod { return nil }
+func (APIConnServiceTest) SvcIndexReverse(string) []*object.Service { return nil }
+func (APIConnServiceTest) EpIndexReverse(string) []*object.Endpoints { return nil }
+func (APIConnServiceTest) Modified() int64 { return 0 }
+func (APIConnServiceTest) SetWatchChan(watch.Chan) {}
+func (APIConnServiceTest) Watch(string) error { return nil }
+func (APIConnServiceTest) StopWatching(string) {}
-func (APIConnServiceTest) SvcIndex(string) []*api.Service {
- svcs := []*api.Service{
+func (APIConnServiceTest) SvcIndex(string) []*object.Service {
+ svcs := []*object.Service{
{
- ObjectMeta: meta.ObjectMeta{
- Name: "svc1",
- Namespace: "testns",
- },
- Spec: api.ServiceSpec{
- ClusterIP: "10.0.0.1",
- Ports: []api.ServicePort{{
- Name: "http",
- Protocol: "tcp",
- Port: 80,
- }},
+ Name: "svc1",
+ Namespace: "testns",
+ ClusterIP: "10.0.0.1",
+ Ports: []api.ServicePort{
+ {Name: "http", Protocol: "tcp", Port: 80},
},
},
{
- ObjectMeta: meta.ObjectMeta{
- Name: "hdls1",
- Namespace: "testns",
- },
- Spec: api.ServiceSpec{
- ClusterIP: api.ClusterIPNone,
- },
+ Name: "hdls1",
+ Namespace: "testns",
+ ClusterIP: api.ClusterIPNone,
},
{
- ObjectMeta: meta.ObjectMeta{
- Name: "external",
- Namespace: "testns",
- },
- Spec: api.ServiceSpec{
- ExternalName: "coredns.io",
- Ports: []api.ServicePort{{
- Name: "http",
- Protocol: "tcp",
- Port: 80,
- }},
- Type: api.ServiceTypeExternalName,
+ Name: "external",
+ Namespace: "testns",
+ ExternalName: "coredns.io",
+ Type: api.ServiceTypeExternalName,
+ Ports: []api.ServicePort{
+ {Name: "http", Protocol: "tcp", Port: 80},
},
},
}
return svcs
}
-func (APIConnServiceTest) ServiceList() []*api.Service {
- svcs := []*api.Service{
+func (APIConnServiceTest) ServiceList() []*object.Service {
+ svcs := []*object.Service{
{
- ObjectMeta: meta.ObjectMeta{
- Name: "svc1",
- Namespace: "testns",
- },
- Spec: api.ServiceSpec{
- ClusterIP: "10.0.0.1",
- Ports: []api.ServicePort{{
- Name: "http",
- Protocol: "tcp",
- Port: 80,
- }},
+ Name: "svc1",
+ Namespace: "testns",
+ ClusterIP: "10.0.0.1",
+ Ports: []api.ServicePort{
+ {Name: "http", Protocol: "tcp", Port: 80},
},
},
{
- ObjectMeta: meta.ObjectMeta{
- Name: "hdls1",
- Namespace: "testns",
- },
- Spec: api.ServiceSpec{
- ClusterIP: api.ClusterIPNone,
- },
+ Name: "hdls1",
+ Namespace: "testns",
+ ClusterIP: api.ClusterIPNone,
},
{
- ObjectMeta: meta.ObjectMeta{
- Name: "external",
- Namespace: "testns",
- },
- Spec: api.ServiceSpec{
- ExternalName: "coredns.io",
- Ports: []api.ServicePort{{
- Name: "http",
- Protocol: "tcp",
- Port: 80,
- }},
- Type: api.ServiceTypeExternalName,
+ Name: "external",
+ Namespace: "testns",
+ ExternalName: "coredns.io",
+ Type: api.ServiceTypeExternalName,
+ Ports: []api.ServicePort{
+ {Name: "http", Protocol: "tcp", Port: 80},
},
},
}
return svcs
}
-func (APIConnServiceTest) EpIndex(string) []*api.Endpoints {
- n := "test.node.foo.bar"
-
- eps := []*api.Endpoints{
+func (APIConnServiceTest) EpIndex(string) []*object.Endpoints {
+ eps := []*object.Endpoints{
{
- Subsets: []api.EndpointSubset{
+ Subsets: []object.EndpointSubset{
{
- Addresses: []api.EndpointAddress{
- {
- IP: "172.0.0.1",
- Hostname: "ep1a",
- },
+ Addresses: []object.EndpointAddress{
+ {IP: "172.0.0.1", Hostname: "ep1a"},
},
- Ports: []api.EndpointPort{
- {
- Port: 80,
- Protocol: "tcp",
- Name: "http",
- },
+ Ports: []object.EndpointPort{
+ {Port: 80, Protocol: "tcp", Name: "http"},
},
},
},
- ObjectMeta: meta.ObjectMeta{
- Name: "svc1",
- Namespace: "testns",
- },
+ Name: "svc1",
+ Namespace: "testns",
},
{
- Subsets: []api.EndpointSubset{
+ Subsets: []object.EndpointSubset{
{
- Addresses: []api.EndpointAddress{
- {
- IP: "172.0.0.2",
- },
+ Addresses: []object.EndpointAddress{
+ {IP: "172.0.0.2"},
},
- Ports: []api.EndpointPort{
- {
- Port: 80,
- Protocol: "tcp",
- Name: "http",
- },
+ Ports: []object.EndpointPort{
+ {Port: 80, Protocol: "tcp", Name: "http"},
},
},
},
- ObjectMeta: meta.ObjectMeta{
- Name: "hdls1",
- Namespace: "testns",
- },
+ Name: "hdls1",
+ Namespace: "testns",
},
{
- Subsets: []api.EndpointSubset{
+ Subsets: []object.EndpointSubset{
{
- Addresses: []api.EndpointAddress{
- {
- IP: "172.0.0.3",
- },
+ Addresses: []object.EndpointAddress{
+ {IP: "172.0.0.3"},
},
- Ports: []api.EndpointPort{
- {
- Port: 80,
- Protocol: "tcp",
- Name: "http",
- },
+ Ports: []object.EndpointPort{
+ {Port: 80, Protocol: "tcp", Name: "http"},
},
},
},
- ObjectMeta: meta.ObjectMeta{
- Name: "hdls1",
- Namespace: "testns",
- },
+ Name: "hdls1",
+ Namespace: "testns",
},
{
- Subsets: []api.EndpointSubset{
+ Subsets: []object.EndpointSubset{
{
- Addresses: []api.EndpointAddress{
- {
- IP: "10.9.8.7",
- NodeName: &n,
- },
+ Addresses: []object.EndpointAddress{
+ {IP: "10.9.8.7", NodeName: "test.node.foo.bar"},
},
},
},
@@ -244,85 +183,55 @@ func (APIConnServiceTest) EpIndex(string) []*api.Endpoints {
return eps
}
-func (APIConnServiceTest) EndpointsList() []*api.Endpoints {
- n := "test.node.foo.bar"
-
- eps := []*api.Endpoints{
+func (APIConnServiceTest) EndpointsList() []*object.Endpoints {
+ eps := []*object.Endpoints{
{
- Subsets: []api.EndpointSubset{
+ Subsets: []object.EndpointSubset{
{
- Addresses: []api.EndpointAddress{
- {
- IP: "172.0.0.1",
- Hostname: "ep1a",
- },
+ Addresses: []object.EndpointAddress{
+ {IP: "172.0.0.1", Hostname: "ep1a"},
},
- Ports: []api.EndpointPort{
- {
- Port: 80,
- Protocol: "tcp",
- Name: "http",
- },
+ Ports: []object.EndpointPort{
+ {Port: 80, Protocol: "tcp", Name: "http"},
},
},
},
- ObjectMeta: meta.ObjectMeta{
- Name: "svc1",
- Namespace: "testns",
- },
+ Name: "svc1",
+ Namespace: "testns",
},
{
- Subsets: []api.EndpointSubset{
+ Subsets: []object.EndpointSubset{
{
- Addresses: []api.EndpointAddress{
- {
- IP: "172.0.0.2",
- },
+ Addresses: []object.EndpointAddress{
+ {IP: "172.0.0.2"},
},
- Ports: []api.EndpointPort{
- {
- Port: 80,
- Protocol: "tcp",
- Name: "http",
- },
+ Ports: []object.EndpointPort{
+ {Port: 80, Protocol: "tcp", Name: "http"},
},
},
},
- ObjectMeta: meta.ObjectMeta{
- Name: "hdls1",
- Namespace: "testns",
- },
+ Name: "hdls1",
+ Namespace: "testns",
},
{
- Subsets: []api.EndpointSubset{
+ Subsets: []object.EndpointSubset{
{
- Addresses: []api.EndpointAddress{
- {
- IP: "172.0.0.3",
- },
+ Addresses: []object.EndpointAddress{
+ {IP: "172.0.0.3"},
},
- Ports: []api.EndpointPort{
- {
- Port: 80,
- Protocol: "tcp",
- Name: "http",
- },
+ Ports: []object.EndpointPort{
+ {Port: 80, Protocol: "tcp", Name: "http"},
},
},
},
- ObjectMeta: meta.ObjectMeta{
- Name: "hdls1",
- Namespace: "testns",
- },
+ Name: "hdls1",
+ Namespace: "testns",
},
{
- Subsets: []api.EndpointSubset{
+ Subsets: []object.EndpointSubset{
{
- Addresses: []api.EndpointAddress{
- {
- IP: "10.9.8.7",
- NodeName: &n,
- },
+ Addresses: []object.EndpointAddress{
+ {IP: "10.9.8.7", NodeName: "test.node.foo.bar"},
},
},
},
@@ -397,11 +306,9 @@ func TestServices(t *testing.T) {
func TestServiceFQDN(t *testing.T) {
fqdn := serviceFQDN(
- &api.Service{
- ObjectMeta: meta.ObjectMeta{
- Name: "svc1",
- Namespace: "testns",
- },
+ &object.Service{
+ Name: "svc1",
+ Namespace: "testns",
}, "cluster.local")
expected := "svc1.testns.svc.cluster.local."
@@ -412,14 +319,10 @@ func TestServiceFQDN(t *testing.T) {
func TestPodFQDN(t *testing.T) {
fqdn := podFQDN(
- &api.Pod{
- ObjectMeta: meta.ObjectMeta{
- Name: "pod1",
- Namespace: "testns",
- },
- Status: api.PodStatus{
- PodIP: "10.10.0.10",
- },
+ &object.Pod{
+ Name: "pod1",
+ Namespace: "testns",
+ PodIP: "10.10.0.10",
}, "cluster.local")
expected := "10-10-0-10.testns.pod.cluster.local."
@@ -427,14 +330,10 @@ func TestPodFQDN(t *testing.T) {
t.Errorf("Expected '%v', got '%v'.", expected, fqdn)
}
fqdn = podFQDN(
- &api.Pod{
- ObjectMeta: meta.ObjectMeta{
- Name: "pod1",
- Namespace: "testns",
- },
- Status: api.PodStatus{
- PodIP: "aaaa:bbbb:cccc::zzzz",
- },
+ &object.Pod{
+ Name: "pod1",
+ Namespace: "testns",
+ PodIP: "aaaa:bbbb:cccc::zzzz",
}, "cluster.local")
expected = "aaaa-bbbb-cccc--zzzz.testns.pod.cluster.local."
@@ -445,10 +344,10 @@ func TestPodFQDN(t *testing.T) {
func TestEndpointFQDN(t *testing.T) {
fqdns := endpointFQDN(
- &api.Endpoints{
- Subsets: []api.EndpointSubset{
+ &object.Endpoints{
+ Subsets: []object.EndpointSubset{
{
- Addresses: []api.EndpointAddress{
+ Addresses: []object.EndpointAddress{
{
IP: "172.0.0.1",
Hostname: "ep1a",
@@ -459,10 +358,8 @@ func TestEndpointFQDN(t *testing.T) {
},
},
},
- ObjectMeta: meta.ObjectMeta{
- Name: "svc1",
- Namespace: "testns",
- },
+ Name: "svc1",
+ Namespace: "testns",
}, "cluster.local", false)
expected := []string{
diff --git a/plugin/kubernetes/local.go b/plugin/kubernetes/local.go
index 961eb9410..e15fec497 100644
--- a/plugin/kubernetes/local.go
+++ b/plugin/kubernetes/local.go
@@ -32,7 +32,7 @@ func (k *Kubernetes) localNodeName() string {
for _, eps := range ep.Subsets {
for _, addr := range eps.Addresses {
if localIP.Equal(net.ParseIP(addr.IP)) {
- return *addr.NodeName
+ return addr.NodeName
}
}
}
diff --git a/plugin/kubernetes/ns.go b/plugin/kubernetes/ns.go
index af5c86450..2ccb51ef3 100644
--- a/plugin/kubernetes/ns.go
+++ b/plugin/kubernetes/ns.go
@@ -27,8 +27,8 @@ FindEndpoint:
for _, eps := range ep.Subsets {
for _, addr := range eps.Addresses {
if localIP.Equal(net.ParseIP(addr.IP)) {
- svcNamespace = ep.ObjectMeta.Namespace
- svcName = ep.ObjectMeta.Name
+ svcNamespace = ep.Namespace
+ svcName = ep.Name
break FindEndpoint
}
}
@@ -44,10 +44,10 @@ FindEndpoint:
FindService:
for _, svc := range k.APIConn.ServiceList() {
if svcName == svc.Name && svcNamespace == svc.Namespace {
- if svc.Spec.ClusterIP == api.ClusterIPNone {
+ if svc.ClusterIP == api.ClusterIPNone {
rr.A = localIP
} else {
- rr.A = net.ParseIP(svc.Spec.ClusterIP)
+ rr.A = net.ParseIP(svc.ClusterIP)
}
break FindService
}
diff --git a/plugin/kubernetes/ns_test.go b/plugin/kubernetes/ns_test.go
index f331d3231..fd781bc14 100644
--- a/plugin/kubernetes/ns_test.go
+++ b/plugin/kubernetes/ns_test.go
@@ -3,58 +3,52 @@ package kubernetes
import (
"testing"
+ "github.com/coredns/coredns/plugin/kubernetes/object"
"github.com/coredns/coredns/plugin/pkg/watch"
api "k8s.io/api/core/v1"
- meta "k8s.io/apimachinery/pkg/apis/meta/v1"
)
type APIConnTest struct{}
-func (APIConnTest) HasSynced() bool { return true }
-func (APIConnTest) Run() { return }
-func (APIConnTest) Stop() error { return nil }
-func (APIConnTest) PodIndex(string) []*api.Pod { return nil }
-func (APIConnTest) SvcIndex(string) []*api.Service { return nil }
-func (APIConnTest) SvcIndexReverse(string) []*api.Service { return nil }
-func (APIConnTest) EpIndex(string) []*api.Endpoints { return nil }
-func (APIConnTest) EndpointsList() []*api.Endpoints { return nil }
-func (APIConnTest) Modified() int64 { return 0 }
-func (APIConnTest) SetWatchChan(watch.Chan) {}
-func (APIConnTest) Watch(string) error { return nil }
-func (APIConnTest) StopWatching(string) {}
+func (APIConnTest) HasSynced() bool { return true }
+func (APIConnTest) Run() { return }
+func (APIConnTest) Stop() error { return nil }
+func (APIConnTest) PodIndex(string) []*object.Pod { return nil }
+func (APIConnTest) SvcIndex(string) []*object.Service { return nil }
+func (APIConnTest) SvcIndexReverse(string) []*object.Service { return nil }
+func (APIConnTest) EpIndex(string) []*object.Endpoints { return nil }
+func (APIConnTest) EndpointsList() []*object.Endpoints { return nil }
+func (APIConnTest) Modified() int64 { return 0 }
+func (APIConnTest) SetWatchChan(watch.Chan) {}
+func (APIConnTest) Watch(string) error { return nil }
+func (APIConnTest) StopWatching(string) {}
-func (APIConnTest) ServiceList() []*api.Service {
- svcs := []*api.Service{
+func (APIConnTest) ServiceList() []*object.Service {
+ svcs := []*object.Service{
{
- ObjectMeta: meta.ObjectMeta{
- Name: "dns-service",
- Namespace: "kube-system",
- },
- Spec: api.ServiceSpec{
- ClusterIP: "10.0.0.111",
- },
+ Name: "dns-service",
+ Namespace: "kube-system",
+ ClusterIP: "10.0.0.111",
},
}
return svcs
}
-func (APIConnTest) EpIndexReverse(string) []*api.Endpoints {
- eps := []*api.Endpoints{
+func (APIConnTest) EpIndexReverse(string) []*object.Endpoints {
+ eps := []*object.Endpoints{
{
- Subsets: []api.EndpointSubset{
+ Subsets: []object.EndpointSubset{
{
- Addresses: []api.EndpointAddress{
+ Addresses: []object.EndpointAddress{
{
IP: "127.0.0.1",
},
},
},
},
- ObjectMeta: meta.ObjectMeta{
- Name: "dns-service",
- Namespace: "kube-system",
- },
+ Name: "dns-service",
+ Namespace: "kube-system",
},
}
return eps
diff --git a/plugin/kubernetes/object/endpoint.go b/plugin/kubernetes/object/endpoint.go
new file mode 100644
index 000000000..b8531f050
--- /dev/null
+++ b/plugin/kubernetes/object/endpoint.go
@@ -0,0 +1,162 @@
+package object
+
+import (
+ api "k8s.io/api/core/v1"
+ "k8s.io/apimachinery/pkg/runtime"
+)
+
+// Endpoints is a stripped down api.Endpoints with only the items we need for CoreDNS.
+type Endpoints struct {
+ Version string
+ Name string
+ Namespace string
+ Index string
+ IndexIP []string
+ Subsets []EndpointSubset
+
+ *Empty
+}
+
+// EndpointSubset is a group of addresses with a common set of ports. The
+// expanded set of endpoints is the Cartesian product of Addresses x Ports.
+type EndpointSubset struct {
+ Addresses []EndpointAddress
+ Ports []EndpointPort
+}
+
+// EndpointAddress is a tuple that describes single IP address.
+type EndpointAddress struct {
+ IP string
+ Hostname string
+ NodeName string
+ TargetRefName string
+}
+
+// EndpointPort is a tuple that describes a single port.
+type EndpointPort struct {
+ Port int32
+ Name string
+ Protocol string
+}
+
+// EndpointsKey return a string using for the index.
+func EndpointsKey(name, namespace string) string { return name + "." + namespace }
+
+// ToEndpoints converts an api.Service to a *Service.
+func ToEndpoints(obj interface{}) interface{} {
+ end, ok := obj.(*api.Endpoints)
+ if !ok {
+ return nil
+ }
+
+ e := &Endpoints{
+ Version: end.GetResourceVersion(),
+ Name: end.GetName(),
+ Namespace: end.GetNamespace(),
+ Index: EndpointsKey(end.GetName(), end.GetNamespace()),
+ Subsets: make([]EndpointSubset, len(end.Subsets)),
+ }
+ for i, eps := range end.Subsets {
+ sub := EndpointSubset{
+ Addresses: make([]EndpointAddress, len(eps.Addresses)),
+ }
+ if len(eps.Ports) == 0 {
+ // Add sentinal if there are no ports.
+ sub.Ports = []EndpointPort{{Port: -1}}
+ } else {
+ sub.Ports = make([]EndpointPort, len(eps.Ports))
+ }
+
+ for j, a := range eps.Addresses {
+ ea := EndpointAddress{IP: a.IP, Hostname: a.Hostname}
+ if a.NodeName != nil {
+ ea.NodeName = *a.NodeName
+ }
+ if a.TargetRef != nil {
+ ea.TargetRefName = a.TargetRef.Name
+ }
+ sub.Addresses[j] = ea
+ }
+
+ for k, p := range eps.Ports {
+ ep := EndpointPort{Port: p.Port, Name: p.Name, Protocol: string(p.Protocol)}
+ sub.Ports[k] = ep
+ }
+
+ e.Subsets[i] = sub
+ }
+
+ for _, eps := range end.Subsets {
+ for _, a := range eps.Addresses {
+ e.IndexIP = append(e.IndexIP, a.IP)
+ }
+ }
+
+ *end = api.Endpoints{}
+
+ return e
+}
+
+// CopyWithoutSubsets copies e, without the subsets.
+func (e *Endpoints) CopyWithoutSubsets() *Endpoints {
+ e1 := &Endpoints{
+ Version: e.Version,
+ Name: e.Name,
+ Namespace: e.Namespace,
+ Index: e.Index,
+ IndexIP: make([]string, len(e.IndexIP)),
+ }
+ copy(e1.IndexIP, e.IndexIP)
+ return e1
+}
+
+var _ runtime.Object = &Endpoints{}
+
+// DeepCopyObject implements the ObjectKind interface.
+func (e *Endpoints) DeepCopyObject() runtime.Object {
+ e1 := &Endpoints{
+ Version: e.Version,
+ Name: e.Name,
+ Namespace: e.Namespace,
+ Index: e.Index,
+ IndexIP: make([]string, len(e.IndexIP)),
+ Subsets: make([]EndpointSubset, len(e.Subsets)),
+ }
+ copy(e1.IndexIP, e.IndexIP)
+
+ for i, eps := range e.Subsets {
+ sub := EndpointSubset{
+ Addresses: make([]EndpointAddress, len(eps.Addresses)),
+ Ports: make([]EndpointPort, len(eps.Ports)),
+ }
+ for j, a := range eps.Addresses {
+ ea := EndpointAddress{IP: a.IP, Hostname: a.Hostname, NodeName: a.NodeName, TargetRefName: a.TargetRefName}
+ sub.Addresses[j] = ea
+ }
+ for k, p := range eps.Ports {
+ ep := EndpointPort{Port: p.Port, Name: p.Name, Protocol: p.Protocol}
+ sub.Ports[k] = ep
+ }
+
+ e1.Subsets[i] = sub
+ }
+ return e1
+}
+
+// GetNamespace implements the metav1.Object interface.
+func (e *Endpoints) GetNamespace() string { return e.Namespace }
+
+// SetNamespace implements the metav1.Object interface.
+func (e *Endpoints) SetNamespace(namespace string) {}
+
+// GetName implements the metav1.Object interface.
+func (e *Endpoints) GetName() string { return e.Name }
+
+// SetName implements the metav1.Object interface.
+func (e *Endpoints) SetName(name string) {}
+
+// GetResourceVersion implements the metav1.Object interface.
+func (e *Endpoints) GetResourceVersion() string { return e.Version }
+
+// SetResourceVersion implements the metav1.Object interface.
+func (e *Endpoints) SetResourceVersion(version string) {}
diff --git a/plugin/kubernetes/object/informer.go b/plugin/kubernetes/object/informer.go
new file mode 100644
index 000000000..9336571dc
--- /dev/null
+++ b/plugin/kubernetes/object/informer.go
@@ -0,0 +1,51 @@
+package object
+
+import (
+ "time"
+
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/client-go/tools/cache"
+)
+
+// NewIndexerInformer is a copy of the cache.NewIndexInformer function, but allows Process to have a conversion function (ToFunc).
+func NewIndexerInformer(lw cache.ListerWatcher, objType runtime.Object, resyncPeriod time.Duration, h cache.ResourceEventHandler, indexers cache.Indexers, convert ToFunc) (cache.Indexer, cache.Controller) {
+ clientState := cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, indexers)
+
+ fifo := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, clientState)
+
+ cfg := &cache.Config{
+ Queue: fifo,
+ ListerWatcher: lw,
+ ObjectType: objType,
+ FullResyncPeriod: resyncPeriod,
+ RetryOnError: false,
+ Process: func(obj interface{}) error {
+ for _, d := range obj.(cache.Deltas) {
+
+ obj := convert(d.Object)
+
+ switch d.Type {
+ case cache.Sync, cache.Added, cache.Updated:
+ if old, exists, err := clientState.Get(obj); err == nil && exists {
+ if err := clientState.Update(obj); err != nil {
+ return err
+ }
+ h.OnUpdate(old, obj)
+ } else {
+ if err := clientState.Add(obj); err != nil {
+ return err
+ }
+ h.OnAdd(obj)
+ }
+ case cache.Deleted:
+ if err := clientState.Delete(obj); err != nil {
+ return err
+ }
+ h.OnDelete(obj)
+ }
+ }
+ return nil
+ },
+ }
+ return clientState, cache.New(cfg)
+}
diff --git a/plugin/kubernetes/object/object.go b/plugin/kubernetes/object/object.go
new file mode 100644
index 000000000..fb944b7ad
--- /dev/null
+++ b/plugin/kubernetes/object/object.go
@@ -0,0 +1,94 @@
+package object
+
+import (
+ "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/runtime/schema"
+ "k8s.io/apimachinery/pkg/types"
+)
+
+// ToFunc converts one empty interface to another.
+type ToFunc func(interface{}) interface{}
+
+// Empty is an empty struct.
+type Empty struct{}
+
+// GetObjectKind implementss the ObjectKind interface as a noop.
+func (e *Empty) GetObjectKind() schema.ObjectKind { return schema.EmptyObjectKind }
+
+// GetGenerateName implements the metav1.Object interface.
+func (e *Empty) GetGenerateName() string { return "" }
+
+// SetGenerateName implements the metav1.Object interface.
+func (e *Empty) SetGenerateName(name string) {}
+
+// GetUID implements the metav1.Object interface.
+func (e *Empty) GetUID() types.UID { return "" }
+
+// SetUID implements the metav1.Object interface.
+func (e *Empty) SetUID(uid types.UID) {}
+
+// GetGeneration implements the metav1.Object interface.
+func (e *Empty) GetGeneration() int64 { return 0 }
+
+// SetGeneration implements the metav1.Object interface.
+func (e *Empty) SetGeneration(generation int64) {}
+
+// GetSelfLink implements the metav1.Object interface.
+func (e *Empty) GetSelfLink() string { return "" }
+
+// SetSelfLink implements the metav1.Object interface.
+func (e *Empty) SetSelfLink(selfLink string) {}
+
+// GetCreationTimestamp implements the metav1.Object interface.
+func (e *Empty) GetCreationTimestamp() v1.Time { return v1.Time{} }
+
+// SetCreationTimestamp implements the metav1.Object interface.
+func (e *Empty) SetCreationTimestamp(timestamp v1.Time) {}
+
+// GetDeletionTimestamp implements the metav1.Object interface.
+func (e *Empty) GetDeletionTimestamp() *v1.Time { return &v1.Time{} }
+
+// SetDeletionTimestamp implements the metav1.Object interface.
+func (e *Empty) SetDeletionTimestamp(timestamp *v1.Time) {}
+
+// GetDeletionGracePeriodSeconds implements the metav1.Object interface.
+func (e *Empty) GetDeletionGracePeriodSeconds() *int64 { return nil }
+
+// SetDeletionGracePeriodSeconds implements the metav1.Object interface.
+func (e *Empty) SetDeletionGracePeriodSeconds(*int64) {}
+
+// GetLabels implements the metav1.Object interface.
+func (e *Empty) GetLabels() map[string]string { return nil }
+
+// SetLabels implements the metav1.Object interface.
+func (e *Empty) SetLabels(labels map[string]string) {}
+
+// GetAnnotations implements the metav1.Object interface.
+func (e *Empty) GetAnnotations() map[string]string { return nil }
+
+// SetAnnotations implements the metav1.Object interface.
+func (e *Empty) SetAnnotations(annotations map[string]string) {}
+
+// GetInitializers implements the metav1.Object interface.
+func (e *Empty) GetInitializers() *v1.Initializers { return nil }
+
+// SetInitializers implements the metav1.Object interface.
+func (e *Empty) SetInitializers(initializers *v1.Initializers) {}
+
+// GetFinalizers implements the metav1.Object interface.
+func (e *Empty) GetFinalizers() []string { return nil }
+
+// SetFinalizers implements the metav1.Object interface.
+func (e *Empty) SetFinalizers(finalizers []string) {}
+
+// GetOwnerReferences implements the metav1.Object interface.
+func (e *Empty) GetOwnerReferences() []v1.OwnerReference { return nil }
+
+// SetOwnerReferences implements the metav1.Object interface.
+func (e *Empty) SetOwnerReferences([]v1.OwnerReference) {}
+
+// GetClusterName implements the metav1.Object interface.
+func (e *Empty) GetClusterName() string { return "" }
+
+// SetClusterName implements the metav1.Object interface.
+func (e *Empty) SetClusterName(clusterName string) {}
diff --git a/plugin/kubernetes/object/pod.go b/plugin/kubernetes/object/pod.go
new file mode 100644
index 000000000..9f0ba48ad
--- /dev/null
+++ b/plugin/kubernetes/object/pod.go
@@ -0,0 +1,72 @@
+package object
+
+import (
+ api "k8s.io/api/core/v1"
+ "k8s.io/apimachinery/pkg/runtime"
+)
+
+// Pod is a stripped down api.Pod with only the items we need for CoreDNS.
+type Pod struct {
+ Version string
+ PodIP string
+ Name string
+ Namespace string
+ Deleting bool
+
+ *Empty
+}
+
+// ToPod converts an api.Pod to a *Pod.
+func ToPod(obj interface{}) interface{} {
+ pod, ok := obj.(*api.Pod)
+ if !ok {
+ return nil
+ }
+
+ p := &Pod{
+ Version: pod.GetResourceVersion(),
+ PodIP: pod.Status.PodIP,
+ Namespace: pod.GetNamespace(),
+ Name: pod.GetName(),
+ }
+ t := pod.ObjectMeta.DeletionTimestamp
+ if t != nil {
+ p.Deleting = !(*t).Time.IsZero()
+ }
+
+ *pod = api.Pod{}
+
+ return p
+}
+
+var _ runtime.Object = &Pod{}
+
+// DeepCopyObject implements the ObjectKind interface.
+func (p *Pod) DeepCopyObject() runtime.Object {
+ p1 := &Pod{
+ Version: p.Version,
+ PodIP: p.PodIP,
+ Namespace: p.Namespace,
+ Name: p.Name,
+ Deleting: p.Deleting,
+ }
+ return p1
+}
+
+// GetNamespace implements the metav1.Object interface.
+func (p *Pod) GetNamespace() string { return p.Namespace }
+
+// SetNamespace implements the metav1.Object interface.
+func (p *Pod) SetNamespace(namespace string) {}
+
+// GetName implements the metav1.Object interface.
+func (p *Pod) GetName() string { return p.Name }
+
+// SetName implements the metav1.Object interface.
+func (p *Pod) SetName(name string) {}
+
+// GetResourceVersion implements the metav1.Object interface.
+func (p *Pod) GetResourceVersion() string { return p.Version }
+
+// SetResourceVersion implements the metav1.Object interface.
+func (p *Pod) SetResourceVersion(version string) {}
diff --git a/plugin/kubernetes/object/service.go b/plugin/kubernetes/object/service.go
new file mode 100644
index 000000000..be010e96b
--- /dev/null
+++ b/plugin/kubernetes/object/service.go
@@ -0,0 +1,89 @@
+package object
+
+import (
+ api "k8s.io/api/core/v1"
+ "k8s.io/apimachinery/pkg/runtime"
+)
+
+// Service is a stripped down api.Service with only the items we need for CoreDNS.
+type Service struct {
+ Version string
+ Name string
+ Namespace string
+ Index string
+ ClusterIP string
+ Type api.ServiceType
+ ExternalName string
+ Ports []api.ServicePort
+
+ *Empty
+}
+
+// ServiceKey return a string using for the index.
+func ServiceKey(name, namespace string) string { return name + "." + namespace }
+
+// ToService converts an api.Service to a *Service.
+func ToService(obj interface{}) interface{} {
+ svc, ok := obj.(*api.Service)
+ if !ok {
+ return nil
+ }
+
+ s := &Service{
+ Version: svc.GetResourceVersion(),
+ Name: svc.GetName(),
+ Namespace: svc.GetNamespace(),
+ Index: ServiceKey(svc.GetName(), svc.GetNamespace()),
+ ClusterIP: svc.Spec.ClusterIP,
+ Type: svc.Spec.Type,
+ ExternalName: svc.Spec.ExternalName,
+ }
+
+ if len(svc.Spec.Ports) == 0 {
+ // Add sentinal if there are no ports.
+ s.Ports = []api.ServicePort{{Port: -1}}
+ } else {
+ s.Ports = make([]api.ServicePort, len(svc.Spec.Ports))
+ copy(s.Ports, svc.Spec.Ports)
+ }
+
+ *svc = api.Service{}
+
+ return s
+}
+
+var _ runtime.Object = &Service{}
+
+// DeepCopyObject implements the ObjectKind interface.
+func (s *Service) DeepCopyObject() runtime.Object {
+ s1 := &Service{
+ Version: s.Version,
+ Name: s.Name,
+ Namespace: s.Namespace,
+ Index: s.Index,
+ ClusterIP: s.ClusterIP,
+ Type: s.Type,
+ ExternalName: s.ExternalName,
+ Ports: make([]api.ServicePort, len(s.Ports)),
+ }
+ copy(s1.Ports, s.Ports)
+ return s1
+}
+
+// GetNamespace implements the metav1.Object interface.
+func (s *Service) GetNamespace() string { return s.Namespace }
+
+// SetNamespace implements the metav1.Object interface.
+func (s *Service) SetNamespace(namespace string) {}
+
+// GetName implements the metav1.Object interface.
+func (s *Service) GetName() string { return s.Name }
+
+// SetName implements the metav1.Object interface.
+func (s *Service) SetName(name string) {}
+
+// GetResourceVersion implements the metav1.Object interface.
+func (s *Service) GetResourceVersion() string { return s.Version }
+
+// SetResourceVersion implements the metav1.Object interface.
+func (s *Service) SetResourceVersion(version string) {}
diff --git a/plugin/kubernetes/reverse.go b/plugin/kubernetes/reverse.go
index fd783a22d..5873bcbc8 100644
--- a/plugin/kubernetes/reverse.go
+++ b/plugin/kubernetes/reverse.go
@@ -38,13 +38,13 @@ func (k *Kubernetes) serviceRecordForIP(ip, name string) []msg.Service {
}
// If no cluster ips match, search endpoints
for _, ep := range k.APIConn.EpIndexReverse(ip) {
- if len(k.Namespaces) > 0 && !k.namespaceExposed(ep.ObjectMeta.Namespace) {
+ if len(k.Namespaces) > 0 && !k.namespaceExposed(ep.Namespace) {
continue
}
for _, eps := range ep.Subsets {
for _, addr := range eps.Addresses {
if addr.IP == ip {
- domain := strings.Join([]string{endpointHostname(addr, k.endpointNameMode), ep.ObjectMeta.Name, ep.ObjectMeta.Namespace, Svc, k.primaryZone()}, ".")
+ domain := strings.Join([]string{endpointHostname(addr, k.endpointNameMode), ep.Name, ep.Namespace, Svc, k.primaryZone()}, ".")
return []msg.Service{{Host: domain, TTL: k.ttl}}
}
}
diff --git a/plugin/kubernetes/reverse_test.go b/plugin/kubernetes/reverse_test.go
index 681172021..a706b7585 100644
--- a/plugin/kubernetes/reverse_test.go
+++ b/plugin/kubernetes/reverse_test.go
@@ -4,6 +4,7 @@ import (
"context"
"testing"
+ "github.com/coredns/coredns/plugin/kubernetes/object"
"github.com/coredns/coredns/plugin/pkg/dnstest"
"github.com/coredns/coredns/plugin/pkg/watch"
"github.com/coredns/coredns/plugin/test"
@@ -15,66 +16,50 @@ import (
type APIConnReverseTest struct{}
-func (APIConnReverseTest) HasSynced() bool { return true }
-func (APIConnReverseTest) Run() { return }
-func (APIConnReverseTest) Stop() error { return nil }
-func (APIConnReverseTest) PodIndex(string) []*api.Pod { return nil }
-func (APIConnReverseTest) EpIndex(string) []*api.Endpoints { return nil }
-func (APIConnReverseTest) EndpointsList() []*api.Endpoints { return nil }
-func (APIConnReverseTest) ServiceList() []*api.Service { return nil }
-func (APIConnReverseTest) Modified() int64 { return 0 }
-func (APIConnReverseTest) SetWatchChan(watch.Chan) {}
-func (APIConnReverseTest) Watch(string) error { return nil }
-func (APIConnReverseTest) StopWatching(string) {}
-
-func (APIConnReverseTest) SvcIndex(svc string) []*api.Service {
+func (APIConnReverseTest) HasSynced() bool { return true }
+func (APIConnReverseTest) Run() { return }
+func (APIConnReverseTest) Stop() error { return nil }
+func (APIConnReverseTest) PodIndex(string) []*object.Pod { return nil }
+func (APIConnReverseTest) EpIndex(string) []*object.Endpoints { return nil }
+func (APIConnReverseTest) EndpointsList() []*object.Endpoints { return nil }
+func (APIConnReverseTest) ServiceList() []*object.Service { return nil }
+func (APIConnReverseTest) Modified() int64 { return 0 }
+func (APIConnReverseTest) SetWatchChan(watch.Chan) {}
+func (APIConnReverseTest) Watch(string) error { return nil }
+func (APIConnReverseTest) StopWatching(string) {}
+
+func (APIConnReverseTest) SvcIndex(svc string) []*object.Service {
if svc != "svc1.testns" {
return nil
}
- svcs := []*api.Service{
+ svcs := []*object.Service{
{
- ObjectMeta: meta.ObjectMeta{
- Name: "svc1",
- Namespace: "testns",
- },
- Spec: api.ServiceSpec{
- ClusterIP: "192.168.1.100",
- Ports: []api.ServicePort{{
- Name: "http",
- Protocol: "tcp",
- Port: 80,
- }},
- },
+ Name: "svc1",
+ Namespace: "testns",
+ ClusterIP: "192.168.1.100",
+ Ports: []api.ServicePort{{Name: "http", Protocol: "tcp", Port: 80}},
},
}
return svcs
}
-func (APIConnReverseTest) SvcIndexReverse(ip string) []*api.Service {
+func (APIConnReverseTest) SvcIndexReverse(ip string) []*object.Service {
if ip != "192.168.1.100" {
return nil
}
- svcs := []*api.Service{
+ svcs := []*object.Service{
{
- ObjectMeta: meta.ObjectMeta{
- Name: "svc1",
- Namespace: "testns",
- },
- Spec: api.ServiceSpec{
- ClusterIP: "192.168.1.100",
- Ports: []api.ServicePort{{
- Name: "http",
- Protocol: "tcp",
- Port: 80,
- }},
- },
+ Name: "svc1",
+ Namespace: "testns",
+ ClusterIP: "192.168.1.100",
+ Ports: []api.ServicePort{{Name: "http", Protocol: "tcp", Port: 80}},
},
}
return svcs
}
-func (APIConnReverseTest) EpIndexReverse(ip string) []*api.Endpoints {
+func (APIConnReverseTest) EpIndexReverse(ip string) []*object.Endpoints {
switch ip {
case "10.0.0.100":
case "1234:abcd::1":
@@ -83,41 +68,23 @@ func (APIConnReverseTest) EpIndexReverse(ip string) []*api.Endpoints {
default:
return nil
}
- eps := []*api.Endpoints{
+ eps := []*object.Endpoints{
{
- Subsets: []api.EndpointSubset{
+ Subsets: []object.EndpointSubset{
{
- Addresses: []api.EndpointAddress{
- {
- IP: "10.0.0.100",
- Hostname: "ep1a",
- },
- {
- IP: "1234:abcd::1",
- Hostname: "ep1b",
- },
- {
- IP: "fd00:77:30::a",
- Hostname: "ip6svc1ex",
- },
- {
- IP: "fd00:77:30::2:9ba6",
- Hostname: "ip6svc1in",
- },
+ Addresses: []object.EndpointAddress{
+ {IP: "10.0.0.100", Hostname: "ep1a"},
+ {IP: "1234:abcd::1", Hostname: "ep1b"},
+ {IP: "fd00:77:30::a", Hostname: "ip6svc1ex"},
+ {IP: "fd00:77:30::2:9ba6", Hostname: "ip6svc1in"},
},
- Ports: []api.EndpointPort{
- {
- Port: 80,
- Protocol: "tcp",
- Name: "http",
- },
+ Ports: []object.EndpointPort{
+ {Port: 80, Protocol: "tcp", Name: "http"},
},
},
},
- ObjectMeta: meta.ObjectMeta{
- Name: "svc1",
- Namespace: "testns",
- },
+ Name: "svc1",
+ Namespace: "testns",
},
}
return eps
diff --git a/plugin/kubernetes/watch.go b/plugin/kubernetes/watch.go
index 488540444..5c52cc4f9 100644
--- a/plugin/kubernetes/watch.go
+++ b/plugin/kubernetes/watch.go
@@ -1,7 +1,9 @@
package kubernetes
import (
+ "github.com/coredns/coredns/plugin/kubernetes/object"
"github.com/coredns/coredns/plugin/pkg/watch"
+ meta "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// SetWatchChan implements watch.Watchable
@@ -18,3 +20,163 @@ func (k *Kubernetes) Watch(qname string) error {
func (k *Kubernetes) StopWatching(qname string) {
k.APIConn.StopWatching(qname)
}
+
+var _ watch.Watchable = &Kubernetes{}
+
+func (dns *dnsControl) sendServiceUpdates(s *object.Service) {
+ for i := range dns.zones {
+ name := serviceFQDN(s, dns.zones[i])
+ if _, ok := dns.watched[name]; ok {
+ dns.watchChan <- name
+ }
+ }
+}
+
+func (dns *dnsControl) sendPodUpdates(p *object.Pod) {
+ for i := range dns.zones {
+ name := podFQDN(p, dns.zones[i])
+ if _, ok := dns.watched[name]; ok {
+ dns.watchChan <- name
+ }
+ }
+}
+
+func (dns *dnsControl) sendEndpointsUpdates(ep *object.Endpoints) {
+ for _, zone := range dns.zones {
+ for _, name := range endpointFQDN(ep, zone, dns.endpointNameMode) {
+ if _, ok := dns.watched[name]; ok {
+ dns.watchChan <- name
+ }
+ }
+ name := serviceFQDN(ep, zone)
+ if _, ok := dns.watched[name]; ok {
+ dns.watchChan <- name
+ }
+ }
+}
+
+// endpointsSubsetDiffs returns an Endpoints struct containing the Subsets that have changed between a and b.
+// When we notify clients of changed endpoints we only want to notify them of endpoints that have changed.
+// The Endpoints API object holds more than one endpoint, held in a list of Subsets. Each Subset refers to
+// an endpoint. So, here we create a new Endpoints struct, and populate it with only the endpoints that have changed.
+// This new Endpoints object is later used to generate the list of endpoint FQDNs to send to the client.
+// This function computes this literally by combining the sets (in a and not in b) union (in b and not in a).
+func endpointsSubsetDiffs(a, b *object.Endpoints) *object.Endpoints {
+ c := b.CopyWithoutSubsets()
+
+ // In the following loop, the first iteration computes (in a but not in b).
+ // The second iteration then adds (in b but not in a)
+ // The end result is an Endpoints that only contains the subsets (endpoints) that are different between a and b.
+ for _, abba := range [][]*object.Endpoints{{a, b}, {b, a}} {
+ a := abba[0]
+ b := abba[1]
+ left:
+ for _, as := range a.Subsets {
+ for _, bs := range b.Subsets {
+ if subsetsEquivalent(as, bs) {
+ continue left
+ }
+ }
+ c.Subsets = append(c.Subsets, as)
+ }
+ }
+ return c
+}
+
+// sendUpdates sends a notification to the server if a watch is enabled for the qname.
+func (dns *dnsControl) sendUpdates(oldObj, newObj interface{}) {
+ // If both objects have the same resource version, they are identical.
+ if newObj != nil && oldObj != nil && (oldObj.(meta.Object).GetResourceVersion() == newObj.(meta.Object).GetResourceVersion()) {
+ return
+ }
+ obj := newObj
+ if obj == nil {
+ obj = oldObj
+ }
+ switch ob := obj.(type) {
+ case *object.Service:
+ dns.updateModifed()
+ dns.sendServiceUpdates(ob)
+ case *object.Endpoints:
+ if newObj == nil || oldObj == nil {
+ dns.updateModifed()
+ dns.sendEndpointsUpdates(ob)
+ return
+ }
+ p := oldObj.(*object.Endpoints)
+ // endpoint updates can come frequently, make sure it's a change we care about
+ if endpointsEquivalent(p, ob) {
+ return
+ }
+ dns.updateModifed()
+ dns.sendEndpointsUpdates(endpointsSubsetDiffs(p, ob))
+ case *object.Pod:
+ dns.updateModifed()
+ dns.sendPodUpdates(ob)
+ default:
+ log.Warningf("Updates for %T not supported.", ob)
+ }
+}
+
+func (dns *dnsControl) Add(obj interface{}) { dns.sendUpdates(nil, obj) }
+func (dns *dnsControl) Delete(obj interface{}) { dns.sendUpdates(obj, nil) }
+func (dns *dnsControl) Update(oldObj, newObj interface{}) { dns.sendUpdates(oldObj, newObj) }
+
+// subsetsEquivalent checks if two endpoint subsets are significantly equivalent
+// I.e. that they have the same ready addresses, host names, ports (including protocol
+// and service names for SRV)
+func subsetsEquivalent(sa, sb object.EndpointSubset) bool {
+ if len(sa.Addresses) != len(sb.Addresses) {
+ return false
+ }
+ if len(sa.Ports) != len(sb.Ports) {
+ return false
+ }
+
+ // in Addresses and Ports, we should be able to rely on
+ // these being sorted and able to be compared
+ // they are supposed to be in a canonical format
+ for addr, aaddr := range sa.Addresses {
+ baddr := sb.Addresses[addr]
+ if aaddr.IP != baddr.IP {
+ return false
+ }
+ if aaddr.Hostname != baddr.Hostname {
+ return false
+ }
+ }
+
+ for port, aport := range sa.Ports {
+ bport := sb.Ports[port]
+ if aport.Name != bport.Name {
+ return false
+ }
+ if aport.Port != bport.Port {
+ return false
+ }
+ if aport.Protocol != bport.Protocol {
+ return false
+ }
+ }
+ return true
+}
+
+// endpointsEquivalent checks if the update to an endpoint is something
+// that matters to us or if they are effectively equivalent.
+func endpointsEquivalent(a, b *object.Endpoints) bool {
+
+ if len(a.Subsets) != len(b.Subsets) {
+ return false
+ }
+
+ // we should be able to rely on
+ // these being sorted and able to be compared
+ // they are supposed to be in a canonical format
+ for i, sa := range a.Subsets {
+ sb := b.Subsets[i]
+ if !subsetsEquivalent(sa, sb) {
+ return false
+ }
+ }
+ return true
+}
diff --git a/plugin/kubernetes/watch_test.go b/plugin/kubernetes/watch_test.go
index 46b2e5dc4..08eda894b 100644
--- a/plugin/kubernetes/watch_test.go
+++ b/plugin/kubernetes/watch_test.go
@@ -1,15 +1,53 @@
package kubernetes
import (
+ "strconv"
+ "strings"
"testing"
- "github.com/coredns/coredns/plugin/pkg/watch"
+ "github.com/coredns/coredns/plugin/kubernetes/object"
)
-func TestIsWatchable(t *testing.T) {
- k := &Kubernetes{}
- var i interface{} = k
- if _, ok := i.(watch.Watchable); !ok {
- t.Error("Kubernetes should implement watch.Watchable and does not")
+func endpointSubsets(addrs ...string) (eps []object.EndpointSubset) {
+ for _, ap := range addrs {
+ apa := strings.Split(ap, ":")
+ address := apa[0]
+ port, _ := strconv.Atoi(apa[1])
+ eps = append(eps, object.EndpointSubset{Addresses: []object.EndpointAddress{{IP: address}}, Ports: []object.EndpointPort{{Port: int32(port)}}})
+ }
+ return eps
+}
+
+func TestEndpointsSubsetDiffs(t *testing.T) {
+ var tests = []struct {
+ a, b, expected object.Endpoints
+ }{
+ { // From a->b: Nothing changes
+ object.Endpoints{Subsets: endpointSubsets("10.0.0.1:80", "10.0.0.2:8080")},
+ object.Endpoints{Subsets: endpointSubsets("10.0.0.1:80", "10.0.0.2:8080")},
+ object.Endpoints{},
+ },
+ { // From a->b: Everything goes away
+ object.Endpoints{Subsets: endpointSubsets("10.0.0.1:80", "10.0.0.2:8080")},
+ object.Endpoints{},
+ object.Endpoints{Subsets: endpointSubsets("10.0.0.1:80", "10.0.0.2:8080")},
+ },
+ { // From a->b: Everything is new
+ object.Endpoints{},
+ object.Endpoints{Subsets: endpointSubsets("10.0.0.1:80", "10.0.0.2:8080")},
+ object.Endpoints{Subsets: endpointSubsets("10.0.0.1:80", "10.0.0.2:8080")},
+ },
+ { // From a->b: One goes away, one is new
+ object.Endpoints{Subsets: endpointSubsets("10.0.0.2:8080")},
+ object.Endpoints{Subsets: endpointSubsets("10.0.0.1:80")},
+ object.Endpoints{Subsets: endpointSubsets("10.0.0.2:8080", "10.0.0.1:80")},
+ },
+ }
+
+ for i, te := range tests {
+ got := endpointsSubsetDiffs(&te.a, &te.b)
+ if !endpointsEquivalent(got, &te.expected) {
+ t.Errorf("Expected '%v' for test %v, got '%v'.", te.expected, i, got)
+ }
}
}
diff --git a/plugin/kubernetes/xfr.go b/plugin/kubernetes/xfr.go
index 1b958102b..eaf554c6a 100644
--- a/plugin/kubernetes/xfr.go
+++ b/plugin/kubernetes/xfr.go
@@ -82,13 +82,13 @@ func (k *Kubernetes) transfer(c chan dns.RR, zone string) {
continue
}
svcBase := []string{zonePath, Svc, svc.Namespace, svc.Name}
- switch svc.Spec.Type {
+ switch svc.Type {
case api.ServiceTypeClusterIP, api.ServiceTypeNodePort, api.ServiceTypeLoadBalancer:
- clusterIP := net.ParseIP(svc.Spec.ClusterIP)
+ clusterIP := net.ParseIP(svc.ClusterIP)
if clusterIP != nil {
- for _, p := range svc.Spec.Ports {
+ for _, p := range svc.Ports {
- s := msg.Service{Host: svc.Spec.ClusterIP, Port: int(p.Port), TTL: k.ttl}
+ s := msg.Service{Host: svc.ClusterIP, Port: int(p.Port), TTL: k.ttl}
s.Key = strings.Join(svcBase, "/")
// Change host from IP to Name for SRV records
@@ -117,7 +117,7 @@ func (k *Kubernetes) transfer(c chan dns.RR, zone string) {
endpointsList := k.APIConn.EpIndex(svc.Name + "." + svc.Namespace)
for _, ep := range endpointsList {
- if ep.ObjectMeta.Name != svc.Name || ep.ObjectMeta.Namespace != svc.Namespace {
+ if ep.Name != svc.Name || ep.Namespace != svc.Namespace {
continue
}
@@ -153,7 +153,7 @@ func (k *Kubernetes) transfer(c chan dns.RR, zone string) {
case api.ServiceTypeExternalName:
- s := msg.Service{Key: strings.Join(svcBase, "/"), Host: svc.Spec.ExternalName, TTL: k.ttl}
+ s := msg.Service{Key: strings.Join(svcBase, "/"), Host: svc.ExternalName, TTL: k.ttl}
if t, _ := s.HostType(); t == dns.TypeCNAME {
c <- s.NewCNAME(msg.Domain(s.Key), s.Host)
}
diff --git a/plugin/kubernetes/xfr_test.go b/plugin/kubernetes/xfr_test.go
index 61bacf66e..0117ebf06 100644
--- a/plugin/kubernetes/xfr_test.go
+++ b/plugin/kubernetes/xfr_test.go
@@ -5,10 +5,9 @@ import (
"strings"
"testing"
+ "github.com/coredns/coredns/plugin/kubernetes/object"
"github.com/coredns/coredns/plugin/pkg/dnstest"
"github.com/coredns/coredns/plugin/test"
- api "k8s.io/api/core/v1"
- meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/miekg/dns"
)
@@ -89,12 +88,11 @@ func TestKubernetesXFR(t *testing.T) {
diff = difference(gotRRs, testRRs)
if len(diff) != 0 {
- t.Errorf("Found %d records we're missing tham test cases, should be 0:", len(diff))
+ t.Errorf("Found %d records we're missing, should be 0:", len(diff))
for _, rec := range diff {
t.Errorf("%+v", rec)
}
}
-
}
// difference shows what we're missing when comparing two RR slices
@@ -114,72 +112,68 @@ func difference(testRRs []dns.RR, gotRRs []dns.RR) []dns.RR {
}
func TestEndpointsEquivalent(t *testing.T) {
- epA := api.Endpoints{
- ObjectMeta: meta.ObjectMeta{ResourceVersion: "1230"},
- Subsets: []api.EndpointSubset{{
- Addresses: []api.EndpointAddress{{IP: "1.2.3.4", Hostname: "foo"}},
- NotReadyAddresses: []api.EndpointAddress{{IP: "1.2.3.5", Hostname: "foobar"}},
+ epA := object.Endpoints{
+ Subsets: []object.EndpointSubset{{
+ Addresses: []object.EndpointAddress{{IP: "1.2.3.4", Hostname: "foo"}},
}},
}
- epB := api.Endpoints{
- ObjectMeta: meta.ObjectMeta{ResourceVersion: "1234"},
- Subsets: []api.EndpointSubset{{
- Addresses: []api.EndpointAddress{{IP: "1.2.3.4", Hostname: "foo"}},
- NotReadyAddresses: []api.EndpointAddress{{IP: "1.1.1.1", Hostname: "foobar"}},
+ epB := object.Endpoints{
+ Subsets: []object.EndpointSubset{{
+ Addresses: []object.EndpointAddress{{IP: "1.2.3.4", Hostname: "foo"}},
}},
}
- epC := api.Endpoints{
- Subsets: []api.EndpointSubset{{
- Addresses: []api.EndpointAddress{{IP: "1.2.3.5", Hostname: "foo"}},
+ epC := object.Endpoints{
+ Subsets: []object.EndpointSubset{{
+ Addresses: []object.EndpointAddress{{IP: "1.2.3.5", Hostname: "foo"}},
}},
}
- epD := api.Endpoints{
- Subsets: []api.EndpointSubset{{
- Addresses: []api.EndpointAddress{{IP: "1.2.3.5", Hostname: "foo"}},
+ epD := object.Endpoints{
+ Subsets: []object.EndpointSubset{{
+ Addresses: []object.EndpointAddress{{IP: "1.2.3.5", Hostname: "foo"}},
},
{
- Addresses: []api.EndpointAddress{{IP: "1.2.2.2", Hostname: "foofoo"}},
+ Addresses: []object.EndpointAddress{{IP: "1.2.2.2", Hostname: "foofoo"}},
}},
}
- epE := api.Endpoints{
- Subsets: []api.EndpointSubset{{
- Addresses: []api.EndpointAddress{{IP: "1.2.3.5", Hostname: "foo"}, {IP: "1.1.1.1"}},
+ epE := object.Endpoints{
+ Subsets: []object.EndpointSubset{{
+ Addresses: []object.EndpointAddress{{IP: "1.2.3.5", Hostname: "foo"}, {IP: "1.1.1.1"}},
}},
}
- epF := api.Endpoints{
- Subsets: []api.EndpointSubset{{
- Addresses: []api.EndpointAddress{{IP: "1.2.3.4", Hostname: "foofoo"}},
+ epF := object.Endpoints{
+ Subsets: []object.EndpointSubset{{
+ Addresses: []object.EndpointAddress{{IP: "1.2.3.4", Hostname: "foofoo"}},
}},
}
- epG := api.Endpoints{
- Subsets: []api.EndpointSubset{{
- Addresses: []api.EndpointAddress{{IP: "1.2.3.4", Hostname: "foo"}},
- Ports: []api.EndpointPort{{Name: "http", Port: 80, Protocol: "TCP"}},
+ epG := object.Endpoints{
+ Subsets: []object.EndpointSubset{{
+ Addresses: []object.EndpointAddress{{IP: "1.2.3.4", Hostname: "foo"}},
+ Ports: []object.EndpointPort{{Name: "http", Port: 80, Protocol: "TCP"}},
}},
}
- epH := api.Endpoints{
- Subsets: []api.EndpointSubset{{
- Addresses: []api.EndpointAddress{{IP: "1.2.3.4", Hostname: "foo"}},
- Ports: []api.EndpointPort{{Name: "newportname", Port: 80, Protocol: "TCP"}},
+ epH := object.Endpoints{
+ Subsets: []object.EndpointSubset{{
+ Addresses: []object.EndpointAddress{{IP: "1.2.3.4", Hostname: "foo"}},
+ Ports: []object.EndpointPort{{Name: "newportname", Port: 80, Protocol: "TCP"}},
}},
}
- epI := api.Endpoints{
- Subsets: []api.EndpointSubset{{
- Addresses: []api.EndpointAddress{{IP: "1.2.3.4", Hostname: "foo"}},
- Ports: []api.EndpointPort{{Name: "http", Port: 8080, Protocol: "TCP"}},
+ epI := object.Endpoints{
+ Subsets: []object.EndpointSubset{{
+ Addresses: []object.EndpointAddress{{IP: "1.2.3.4", Hostname: "foo"}},
+ Ports: []object.EndpointPort{{Name: "http", Port: 8080, Protocol: "TCP"}},
}},
}
- epJ := api.Endpoints{
- Subsets: []api.EndpointSubset{{
- Addresses: []api.EndpointAddress{{IP: "1.2.3.4", Hostname: "foo"}},
- Ports: []api.EndpointPort{{Name: "http", Port: 80, Protocol: "UDP"}},
+ epJ := object.Endpoints{
+ Subsets: []object.EndpointSubset{{
+ Addresses: []object.EndpointAddress{{IP: "1.2.3.4", Hostname: "foo"}},
+ Ports: []object.EndpointPort{{Name: "http", Port: 80, Protocol: "UDP"}},
}},
}
tests := []struct {
equiv bool
- a *api.Endpoints
- b *api.Endpoints
+ a *object.Endpoints
+ b *object.Endpoints
}{
{true, &epA, &epB},
{false, &epA, &epC},