aboutsummaryrefslogtreecommitdiff
path: root/plugin/kubernetes/controller.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugin/kubernetes/controller.go')
-rw-r--r--plugin/kubernetes/controller.go290
1 files changed, 57 insertions, 233 deletions
diff --git a/plugin/kubernetes/controller.go b/plugin/kubernetes/controller.go
index a4ec8bf07..1c41b6ddf 100644
--- a/plugin/kubernetes/controller.go
+++ b/plugin/kubernetes/controller.go
@@ -7,16 +7,16 @@ import (
"sync/atomic"
"time"
+ "github.com/coredns/coredns/plugin/kubernetes/object"
dnswatch "github.com/coredns/coredns/plugin/pkg/watch"
api "k8s.io/api/core/v1"
- "k8s.io/client-go/kubernetes"
- "k8s.io/client-go/tools/cache"
-
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
+ "k8s.io/client-go/kubernetes"
+ "k8s.io/client-go/tools/cache"
)
const (
@@ -28,13 +28,13 @@ const (
)
type dnsController interface {
- ServiceList() []*api.Service
- SvcIndex(string) []*api.Service
- SvcIndexReverse(string) []*api.Service
- PodIndex(string) []*api.Pod
- EpIndex(string) []*api.Endpoints
- EpIndexReverse(string) []*api.Endpoints
- EndpointsList() []*api.Endpoints
+ ServiceList() []*object.Service
+ EndpointsList() []*object.Endpoints
+ SvcIndex(string) []*object.Service
+ SvcIndexReverse(string) []*object.Service
+ PodIndex(string) []*object.Pod
+ EpIndex(string) []*object.Endpoints
+ EpIndexReverse(string) []*object.Endpoints
GetNodeByName(string) (*api.Node, error)
GetNamespaceByName(string) (*api.Namespace, error)
@@ -110,30 +110,34 @@ func newdnsController(kubeClient kubernetes.Interface, opts dnsControlOpts) *dns
endpointNameMode: opts.endpointNameMode,
}
- dns.svcLister, dns.svcController = cache.NewIndexerInformer(
+ dns.svcLister, dns.svcController = object.NewIndexerInformer(
&cache.ListWatch{
ListFunc: serviceListFunc(dns.client, api.NamespaceAll, dns.selector),
WatchFunc: serviceWatchFunc(dns.client, api.NamespaceAll, dns.selector),
},
- &api.Service{},
+ &object.Service{},
opts.resyncPeriod,
cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete},
- cache.Indexers{svcNameNamespaceIndex: svcNameNamespaceIndexFunc, svcIPIndex: svcIPIndexFunc})
+ cache.Indexers{svcNameNamespaceIndex: svcNameNamespaceIndexFunc, svcIPIndex: svcIPIndexFunc},
+ object.ToService,
+ )
if opts.initPodCache {
- dns.podLister, dns.podController = cache.NewIndexerInformer(
+ dns.podLister, dns.podController = object.NewIndexerInformer(
&cache.ListWatch{
ListFunc: podListFunc(dns.client, api.NamespaceAll, dns.selector),
WatchFunc: podWatchFunc(dns.client, api.NamespaceAll, dns.selector),
},
- &api.Pod{},
+ &object.Pod{},
opts.resyncPeriod,
cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete},
- cache.Indexers{podIPIndex: podIPIndexFunc})
+ cache.Indexers{podIPIndex: podIPIndexFunc},
+ object.ToPod,
+ )
}
if opts.initEndpointsCache {
- dns.epLister, dns.epController = cache.NewIndexerInformer(
+ dns.epLister, dns.epController = object.NewIndexerInformer(
&cache.ListWatch{
ListFunc: endpointsListFunc(dns.client, api.NamespaceAll, dns.selector),
WatchFunc: endpointsWatchFunc(dns.client, api.NamespaceAll, dns.selector),
@@ -141,7 +145,8 @@ func newdnsController(kubeClient kubernetes.Interface, opts dnsControlOpts) *dns
&api.Endpoints{},
opts.resyncPeriod,
cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete},
- cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc, epIPIndex: epIPIndexFunc})
+ cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc, epIPIndex: epIPIndexFunc},
+ object.ToEndpoints)
}
dns.nsLister, dns.nsController = cache.NewInformer(
@@ -155,49 +160,43 @@ func newdnsController(kubeClient kubernetes.Interface, opts dnsControlOpts) *dns
}
func podIPIndexFunc(obj interface{}) ([]string, error) {
- p, ok := obj.(*api.Pod)
+ p, ok := obj.(*object.Pod)
if !ok {
return nil, errObj
}
- return []string{p.Status.PodIP}, nil
+ return []string{p.PodIP}, nil
}
func svcIPIndexFunc(obj interface{}) ([]string, error) {
- svc, ok := obj.(*api.Service)
+ svc, ok := obj.(*object.Service)
if !ok {
return nil, errObj
}
- return []string{svc.Spec.ClusterIP}, nil
+ return []string{svc.ClusterIP}, nil
}
func svcNameNamespaceIndexFunc(obj interface{}) ([]string, error) {
- s, ok := obj.(*api.Service)
+ s, ok := obj.(*object.Service)
if !ok {
return nil, errObj
}
- return []string{s.ObjectMeta.Name + "." + s.ObjectMeta.Namespace}, nil
+ return []string{s.Index}, nil
}
func epNameNamespaceIndexFunc(obj interface{}) ([]string, error) {
- s, ok := obj.(*api.Endpoints)
+ s, ok := obj.(*object.Endpoints)
if !ok {
return nil, errObj
}
- return []string{s.ObjectMeta.Name + "." + s.ObjectMeta.Namespace}, nil
+ return []string{s.Index}, nil
}
func epIPIndexFunc(obj interface{}) ([]string, error) {
- ep, ok := obj.(*api.Endpoints)
+ ep, ok := obj.(*object.Endpoints)
if !ok {
return nil, errObj
}
- var idx []string
- for _, eps := range ep.Subsets {
- for _, addr := range eps.Addresses {
- idx = append(idx, addr.IP)
- }
- }
- return idx, nil
+ return ep.IndexIP, nil
}
func serviceListFunc(c kubernetes.Interface, ns string, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) {
@@ -335,10 +334,10 @@ func (dns *dnsControl) HasSynced() bool {
return a && b && c && d
}
-func (dns *dnsControl) ServiceList() (svcs []*api.Service) {
+func (dns *dnsControl) ServiceList() (svcs []*object.Service) {
os := dns.svcLister.List()
for _, o := range os {
- s, ok := o.(*api.Service)
+ s, ok := o.(*object.Service)
if !ok {
continue
}
@@ -347,16 +346,25 @@ func (dns *dnsControl) ServiceList() (svcs []*api.Service) {
return svcs
}
-func (dns *dnsControl) PodIndex(ip string) (pods []*api.Pod) {
- if dns.podLister == nil {
- return nil
+func (dns *dnsControl) EndpointsList() (eps []*object.Endpoints) {
+ os := dns.epLister.List()
+ for _, o := range os {
+ ep, ok := o.(*object.Endpoints)
+ if !ok {
+ continue
+ }
+ eps = append(eps, ep)
}
+ return eps
+}
+
+func (dns *dnsControl) PodIndex(ip string) (pods []*object.Pod) {
os, err := dns.podLister.ByIndex(podIPIndex, ip)
if err != nil {
return nil
}
for _, o := range os {
- p, ok := o.(*api.Pod)
+ p, ok := o.(*object.Pod)
if !ok {
continue
}
@@ -365,16 +373,13 @@ func (dns *dnsControl) PodIndex(ip string) (pods []*api.Pod) {
return pods
}
-func (dns *dnsControl) SvcIndex(idx string) (svcs []*api.Service) {
- if dns.svcLister == nil {
- return nil
- }
+func (dns *dnsControl) SvcIndex(idx string) (svcs []*object.Service) {
os, err := dns.svcLister.ByIndex(svcNameNamespaceIndex, idx)
if err != nil {
return nil
}
for _, o := range os {
- s, ok := o.(*api.Service)
+ s, ok := o.(*object.Service)
if !ok {
continue
}
@@ -383,17 +388,14 @@ func (dns *dnsControl) SvcIndex(idx string) (svcs []*api.Service) {
return svcs
}
-func (dns *dnsControl) SvcIndexReverse(ip string) (svcs []*api.Service) {
- if dns.svcLister == nil {
- return nil
- }
+func (dns *dnsControl) SvcIndexReverse(ip string) (svcs []*object.Service) {
os, err := dns.svcLister.ByIndex(svcIPIndex, ip)
if err != nil {
return nil
}
for _, o := range os {
- s, ok := o.(*api.Service)
+ s, ok := o.(*object.Service)
if !ok {
continue
}
@@ -402,16 +404,13 @@ func (dns *dnsControl) SvcIndexReverse(ip string) (svcs []*api.Service) {
return svcs
}
-func (dns *dnsControl) EpIndex(idx string) (ep []*api.Endpoints) {
- if dns.epLister == nil {
- return nil
- }
+func (dns *dnsControl) EpIndex(idx string) (ep []*object.Endpoints) {
os, err := dns.epLister.ByIndex(epNameNamespaceIndex, idx)
if err != nil {
return nil
}
for _, o := range os {
- e, ok := o.(*api.Endpoints)
+ e, ok := o.(*object.Endpoints)
if !ok {
continue
}
@@ -420,16 +419,13 @@ func (dns *dnsControl) EpIndex(idx string) (ep []*api.Endpoints) {
return ep
}
-func (dns *dnsControl) EpIndexReverse(ip string) (ep []*api.Endpoints) {
- if dns.svcLister == nil {
- return nil
- }
+func (dns *dnsControl) EpIndexReverse(ip string) (ep []*object.Endpoints) {
os, err := dns.epLister.ByIndex(epIPIndex, ip)
if err != nil {
return nil
}
for _, o := range os {
- e, ok := o.(*api.Endpoints)
+ e, ok := o.(*object.Endpoints)
if !ok {
continue
}
@@ -438,21 +434,6 @@ func (dns *dnsControl) EpIndexReverse(ip string) (ep []*api.Endpoints) {
return ep
}
-func (dns *dnsControl) EndpointsList() (eps []*api.Endpoints) {
- if dns.epLister == nil {
- return nil
- }
- os := dns.epLister.List()
- for _, o := range os {
- ep, ok := o.(*api.Endpoints)
- if !ok {
- continue
- }
- eps = append(eps, ep)
- }
- return eps
-}
-
// GetNodeByName return the node by name. If nothing is found an error is
// returned. This query causes a roundtrip to the k8s API server, so use
// sparingly. Currently this is only used for Federation.
@@ -461,8 +442,7 @@ func (dns *dnsControl) GetNodeByName(name string) (*api.Node, error) {
return v1node, err
}
-// GetNamespaceByName returns the namespace by name. If nothing is found an
-// error is returned.
+// GetNamespaceByName returns the namespace by name. If nothing is found an error is returned.
func (dns *dnsControl) GetNamespaceByName(name string) (*api.Namespace, error) {
os := dns.nsLister.List()
for _, o := range os {
@@ -488,160 +468,4 @@ func (dns *dnsControl) updateModifed() {
atomic.StoreInt64(&dns.modified, unix)
}
-func (dns *dnsControl) sendServiceUpdates(s *api.Service) {
- for i := range dns.zones {
- name := serviceFQDN(s, dns.zones[i])
- if _, ok := dns.watched[name]; ok {
- dns.watchChan <- name
- }
- }
-}
-
-func (dns *dnsControl) sendPodUpdates(p *api.Pod) {
- for i := range dns.zones {
- name := podFQDN(p, dns.zones[i])
- if _, ok := dns.watched[name]; ok {
- dns.watchChan <- name
- }
- }
-}
-
-func (dns *dnsControl) sendEndpointsUpdates(ep *api.Endpoints) {
- for _, zone := range dns.zones {
- names := append(endpointFQDN(ep, zone, dns.endpointNameMode), serviceFQDN(ep, zone))
- for _, name := range names {
- if _, ok := dns.watched[name]; ok {
- dns.watchChan <- name
- }
- }
- }
-}
-
-// endpointsSubsetDiffs returns an Endpoints struct containing the Subsets that have changed between a and b.
-// When we notify clients of changed endpoints we only want to notify them of endpoints that have changed.
-// The Endpoints API object holds more than one endpoint, held in a list of Subsets. Each Subset refers to
-// an endpoint. So, here we create a new Endpoints struct, and populate it with only the endpoints that have changed.
-// This new Endpoints object is later used to generate the list of endpoint FQDNs to send to the client.
-// This function computes this literally by combining the sets (in a and not in b) union (in b and not in a).
-func endpointsSubsetDiffs(a, b *api.Endpoints) *api.Endpoints {
- c := b.DeepCopy()
- c.Subsets = []api.EndpointSubset{}
-
- // In the following loop, the first iteration computes (in a but not in b).
- // The second iteration then adds (in b but not in a)
- // The end result is an Endpoints that only contains the subsets (endpoints) that are different between a and b.
- for _, abba := range [][]*api.Endpoints{{a, b}, {b, a}} {
- a := abba[0]
- b := abba[1]
- left:
- for _, as := range a.Subsets {
- for _, bs := range b.Subsets {
- if subsetsEquivalent(as, bs) {
- continue left
- }
- }
- c.Subsets = append(c.Subsets, as)
- }
- }
- return c
-}
-
-// sendUpdates sends a notification to the server if a watch is enabled for the qname.
-func (dns *dnsControl) sendUpdates(oldObj, newObj interface{}) {
- // If both objects have the same resource version, they are identical.
- if newObj != nil && oldObj != nil && (oldObj.(meta.Object).GetResourceVersion() == newObj.(meta.Object).GetResourceVersion()) {
- return
- }
- obj := newObj
- if obj == nil {
- obj = oldObj
- }
- switch ob := obj.(type) {
- case *api.Service:
- dns.updateModifed()
- dns.sendServiceUpdates(ob)
- case *api.Endpoints:
- if newObj == nil || oldObj == nil {
- dns.updateModifed()
- dns.sendEndpointsUpdates(ob)
- return
- }
- p := oldObj.(*api.Endpoints)
- // endpoint updates can come frequently, make sure it's a change we care about
- if endpointsEquivalent(p, ob) {
- return
- }
- dns.updateModifed()
- dns.sendEndpointsUpdates(endpointsSubsetDiffs(p, ob))
- case *api.Pod:
- dns.updateModifed()
- dns.sendPodUpdates(ob)
- default:
- log.Warningf("Updates for %T not supported.", ob)
- }
-}
-
-func (dns *dnsControl) Add(obj interface{}) { dns.sendUpdates(nil, obj) }
-func (dns *dnsControl) Delete(obj interface{}) { dns.sendUpdates(obj, nil) }
-func (dns *dnsControl) Update(oldObj, newObj interface{}) { dns.sendUpdates(oldObj, newObj) }
-
-// subsetsEquivalent checks if two endpoint subsets are significantly equivalent
-// I.e. that they have the same ready addresses, host names, ports (including protocol
-// and service names for SRV)
-func subsetsEquivalent(sa, sb api.EndpointSubset) bool {
- if len(sa.Addresses) != len(sb.Addresses) {
- return false
- }
- if len(sa.Ports) != len(sb.Ports) {
- return false
- }
-
- // in Addresses and Ports, we should be able to rely on
- // these being sorted and able to be compared
- // they are supposed to be in a canonical format
- for addr, aaddr := range sa.Addresses {
- baddr := sb.Addresses[addr]
- if aaddr.IP != baddr.IP {
- return false
- }
- if aaddr.Hostname != baddr.Hostname {
- return false
- }
- }
-
- for port, aport := range sa.Ports {
- bport := sb.Ports[port]
- if aport.Name != bport.Name {
- return false
- }
- if aport.Port != bport.Port {
- return false
- }
- if aport.Protocol != bport.Protocol {
- return false
- }
- }
- return true
-}
-
-// endpointsEquivalent checks if the update to an endpoint is something
-// that matters to us or if they are effectively equivalent.
-func endpointsEquivalent(a, b *api.Endpoints) bool {
-
- if len(a.Subsets) != len(b.Subsets) {
- return false
- }
-
- // we should be able to rely on
- // these being sorted and able to be compared
- // they are supposed to be in a canonical format
- for i, sa := range a.Subsets {
- sb := b.Subsets[i]
- if !subsetsEquivalent(sa, sb) {
- return false
- }
- }
- return true
-}
-
var errObj = errors.New("obj was not of the correct type")