diff options
Diffstat (limited to 'plugin/kubernetes/controller.go')
-rw-r--r-- | plugin/kubernetes/controller.go | 290 |
1 files changed, 57 insertions, 233 deletions
diff --git a/plugin/kubernetes/controller.go b/plugin/kubernetes/controller.go index a4ec8bf07..1c41b6ddf 100644 --- a/plugin/kubernetes/controller.go +++ b/plugin/kubernetes/controller.go @@ -7,16 +7,16 @@ import ( "sync/atomic" "time" + "github.com/coredns/coredns/plugin/kubernetes/object" dnswatch "github.com/coredns/coredns/plugin/pkg/watch" api "k8s.io/api/core/v1" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/cache" - meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" ) const ( @@ -28,13 +28,13 @@ const ( ) type dnsController interface { - ServiceList() []*api.Service - SvcIndex(string) []*api.Service - SvcIndexReverse(string) []*api.Service - PodIndex(string) []*api.Pod - EpIndex(string) []*api.Endpoints - EpIndexReverse(string) []*api.Endpoints - EndpointsList() []*api.Endpoints + ServiceList() []*object.Service + EndpointsList() []*object.Endpoints + SvcIndex(string) []*object.Service + SvcIndexReverse(string) []*object.Service + PodIndex(string) []*object.Pod + EpIndex(string) []*object.Endpoints + EpIndexReverse(string) []*object.Endpoints GetNodeByName(string) (*api.Node, error) GetNamespaceByName(string) (*api.Namespace, error) @@ -110,30 +110,34 @@ func newdnsController(kubeClient kubernetes.Interface, opts dnsControlOpts) *dns endpointNameMode: opts.endpointNameMode, } - dns.svcLister, dns.svcController = cache.NewIndexerInformer( + dns.svcLister, dns.svcController = object.NewIndexerInformer( &cache.ListWatch{ ListFunc: serviceListFunc(dns.client, api.NamespaceAll, dns.selector), WatchFunc: serviceWatchFunc(dns.client, api.NamespaceAll, dns.selector), }, - &api.Service{}, + &object.Service{}, opts.resyncPeriod, cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete}, - cache.Indexers{svcNameNamespaceIndex: svcNameNamespaceIndexFunc, svcIPIndex: svcIPIndexFunc}) + cache.Indexers{svcNameNamespaceIndex: svcNameNamespaceIndexFunc, svcIPIndex: svcIPIndexFunc}, + object.ToService, + ) if opts.initPodCache { - dns.podLister, dns.podController = cache.NewIndexerInformer( + dns.podLister, dns.podController = object.NewIndexerInformer( &cache.ListWatch{ ListFunc: podListFunc(dns.client, api.NamespaceAll, dns.selector), WatchFunc: podWatchFunc(dns.client, api.NamespaceAll, dns.selector), }, - &api.Pod{}, + &object.Pod{}, opts.resyncPeriod, cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete}, - cache.Indexers{podIPIndex: podIPIndexFunc}) + cache.Indexers{podIPIndex: podIPIndexFunc}, + object.ToPod, + ) } if opts.initEndpointsCache { - dns.epLister, dns.epController = cache.NewIndexerInformer( + dns.epLister, dns.epController = object.NewIndexerInformer( &cache.ListWatch{ ListFunc: endpointsListFunc(dns.client, api.NamespaceAll, dns.selector), WatchFunc: endpointsWatchFunc(dns.client, api.NamespaceAll, dns.selector), @@ -141,7 +145,8 @@ func newdnsController(kubeClient kubernetes.Interface, opts dnsControlOpts) *dns &api.Endpoints{}, opts.resyncPeriod, cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete}, - cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc, epIPIndex: epIPIndexFunc}) + cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc, epIPIndex: epIPIndexFunc}, + object.ToEndpoints) } dns.nsLister, dns.nsController = cache.NewInformer( @@ -155,49 +160,43 @@ func newdnsController(kubeClient kubernetes.Interface, opts dnsControlOpts) *dns } func podIPIndexFunc(obj interface{}) ([]string, error) { - p, ok := obj.(*api.Pod) + p, ok := obj.(*object.Pod) if !ok { return nil, errObj } - return []string{p.Status.PodIP}, nil + return []string{p.PodIP}, nil } func svcIPIndexFunc(obj interface{}) ([]string, error) { - svc, ok := obj.(*api.Service) + svc, ok := obj.(*object.Service) if !ok { return nil, errObj } - return []string{svc.Spec.ClusterIP}, nil + return []string{svc.ClusterIP}, nil } func svcNameNamespaceIndexFunc(obj interface{}) ([]string, error) { - s, ok := obj.(*api.Service) + s, ok := obj.(*object.Service) if !ok { return nil, errObj } - return []string{s.ObjectMeta.Name + "." + s.ObjectMeta.Namespace}, nil + return []string{s.Index}, nil } func epNameNamespaceIndexFunc(obj interface{}) ([]string, error) { - s, ok := obj.(*api.Endpoints) + s, ok := obj.(*object.Endpoints) if !ok { return nil, errObj } - return []string{s.ObjectMeta.Name + "." + s.ObjectMeta.Namespace}, nil + return []string{s.Index}, nil } func epIPIndexFunc(obj interface{}) ([]string, error) { - ep, ok := obj.(*api.Endpoints) + ep, ok := obj.(*object.Endpoints) if !ok { return nil, errObj } - var idx []string - for _, eps := range ep.Subsets { - for _, addr := range eps.Addresses { - idx = append(idx, addr.IP) - } - } - return idx, nil + return ep.IndexIP, nil } func serviceListFunc(c kubernetes.Interface, ns string, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) { @@ -335,10 +334,10 @@ func (dns *dnsControl) HasSynced() bool { return a && b && c && d } -func (dns *dnsControl) ServiceList() (svcs []*api.Service) { +func (dns *dnsControl) ServiceList() (svcs []*object.Service) { os := dns.svcLister.List() for _, o := range os { - s, ok := o.(*api.Service) + s, ok := o.(*object.Service) if !ok { continue } @@ -347,16 +346,25 @@ func (dns *dnsControl) ServiceList() (svcs []*api.Service) { return svcs } -func (dns *dnsControl) PodIndex(ip string) (pods []*api.Pod) { - if dns.podLister == nil { - return nil +func (dns *dnsControl) EndpointsList() (eps []*object.Endpoints) { + os := dns.epLister.List() + for _, o := range os { + ep, ok := o.(*object.Endpoints) + if !ok { + continue + } + eps = append(eps, ep) } + return eps +} + +func (dns *dnsControl) PodIndex(ip string) (pods []*object.Pod) { os, err := dns.podLister.ByIndex(podIPIndex, ip) if err != nil { return nil } for _, o := range os { - p, ok := o.(*api.Pod) + p, ok := o.(*object.Pod) if !ok { continue } @@ -365,16 +373,13 @@ func (dns *dnsControl) PodIndex(ip string) (pods []*api.Pod) { return pods } -func (dns *dnsControl) SvcIndex(idx string) (svcs []*api.Service) { - if dns.svcLister == nil { - return nil - } +func (dns *dnsControl) SvcIndex(idx string) (svcs []*object.Service) { os, err := dns.svcLister.ByIndex(svcNameNamespaceIndex, idx) if err != nil { return nil } for _, o := range os { - s, ok := o.(*api.Service) + s, ok := o.(*object.Service) if !ok { continue } @@ -383,17 +388,14 @@ func (dns *dnsControl) SvcIndex(idx string) (svcs []*api.Service) { return svcs } -func (dns *dnsControl) SvcIndexReverse(ip string) (svcs []*api.Service) { - if dns.svcLister == nil { - return nil - } +func (dns *dnsControl) SvcIndexReverse(ip string) (svcs []*object.Service) { os, err := dns.svcLister.ByIndex(svcIPIndex, ip) if err != nil { return nil } for _, o := range os { - s, ok := o.(*api.Service) + s, ok := o.(*object.Service) if !ok { continue } @@ -402,16 +404,13 @@ func (dns *dnsControl) SvcIndexReverse(ip string) (svcs []*api.Service) { return svcs } -func (dns *dnsControl) EpIndex(idx string) (ep []*api.Endpoints) { - if dns.epLister == nil { - return nil - } +func (dns *dnsControl) EpIndex(idx string) (ep []*object.Endpoints) { os, err := dns.epLister.ByIndex(epNameNamespaceIndex, idx) if err != nil { return nil } for _, o := range os { - e, ok := o.(*api.Endpoints) + e, ok := o.(*object.Endpoints) if !ok { continue } @@ -420,16 +419,13 @@ func (dns *dnsControl) EpIndex(idx string) (ep []*api.Endpoints) { return ep } -func (dns *dnsControl) EpIndexReverse(ip string) (ep []*api.Endpoints) { - if dns.svcLister == nil { - return nil - } +func (dns *dnsControl) EpIndexReverse(ip string) (ep []*object.Endpoints) { os, err := dns.epLister.ByIndex(epIPIndex, ip) if err != nil { return nil } for _, o := range os { - e, ok := o.(*api.Endpoints) + e, ok := o.(*object.Endpoints) if !ok { continue } @@ -438,21 +434,6 @@ func (dns *dnsControl) EpIndexReverse(ip string) (ep []*api.Endpoints) { return ep } -func (dns *dnsControl) EndpointsList() (eps []*api.Endpoints) { - if dns.epLister == nil { - return nil - } - os := dns.epLister.List() - for _, o := range os { - ep, ok := o.(*api.Endpoints) - if !ok { - continue - } - eps = append(eps, ep) - } - return eps -} - // GetNodeByName return the node by name. If nothing is found an error is // returned. This query causes a roundtrip to the k8s API server, so use // sparingly. Currently this is only used for Federation. @@ -461,8 +442,7 @@ func (dns *dnsControl) GetNodeByName(name string) (*api.Node, error) { return v1node, err } -// GetNamespaceByName returns the namespace by name. If nothing is found an -// error is returned. +// GetNamespaceByName returns the namespace by name. If nothing is found an error is returned. func (dns *dnsControl) GetNamespaceByName(name string) (*api.Namespace, error) { os := dns.nsLister.List() for _, o := range os { @@ -488,160 +468,4 @@ func (dns *dnsControl) updateModifed() { atomic.StoreInt64(&dns.modified, unix) } -func (dns *dnsControl) sendServiceUpdates(s *api.Service) { - for i := range dns.zones { - name := serviceFQDN(s, dns.zones[i]) - if _, ok := dns.watched[name]; ok { - dns.watchChan <- name - } - } -} - -func (dns *dnsControl) sendPodUpdates(p *api.Pod) { - for i := range dns.zones { - name := podFQDN(p, dns.zones[i]) - if _, ok := dns.watched[name]; ok { - dns.watchChan <- name - } - } -} - -func (dns *dnsControl) sendEndpointsUpdates(ep *api.Endpoints) { - for _, zone := range dns.zones { - names := append(endpointFQDN(ep, zone, dns.endpointNameMode), serviceFQDN(ep, zone)) - for _, name := range names { - if _, ok := dns.watched[name]; ok { - dns.watchChan <- name - } - } - } -} - -// endpointsSubsetDiffs returns an Endpoints struct containing the Subsets that have changed between a and b. -// When we notify clients of changed endpoints we only want to notify them of endpoints that have changed. -// The Endpoints API object holds more than one endpoint, held in a list of Subsets. Each Subset refers to -// an endpoint. So, here we create a new Endpoints struct, and populate it with only the endpoints that have changed. -// This new Endpoints object is later used to generate the list of endpoint FQDNs to send to the client. -// This function computes this literally by combining the sets (in a and not in b) union (in b and not in a). -func endpointsSubsetDiffs(a, b *api.Endpoints) *api.Endpoints { - c := b.DeepCopy() - c.Subsets = []api.EndpointSubset{} - - // In the following loop, the first iteration computes (in a but not in b). - // The second iteration then adds (in b but not in a) - // The end result is an Endpoints that only contains the subsets (endpoints) that are different between a and b. - for _, abba := range [][]*api.Endpoints{{a, b}, {b, a}} { - a := abba[0] - b := abba[1] - left: - for _, as := range a.Subsets { - for _, bs := range b.Subsets { - if subsetsEquivalent(as, bs) { - continue left - } - } - c.Subsets = append(c.Subsets, as) - } - } - return c -} - -// sendUpdates sends a notification to the server if a watch is enabled for the qname. -func (dns *dnsControl) sendUpdates(oldObj, newObj interface{}) { - // If both objects have the same resource version, they are identical. - if newObj != nil && oldObj != nil && (oldObj.(meta.Object).GetResourceVersion() == newObj.(meta.Object).GetResourceVersion()) { - return - } - obj := newObj - if obj == nil { - obj = oldObj - } - switch ob := obj.(type) { - case *api.Service: - dns.updateModifed() - dns.sendServiceUpdates(ob) - case *api.Endpoints: - if newObj == nil || oldObj == nil { - dns.updateModifed() - dns.sendEndpointsUpdates(ob) - return - } - p := oldObj.(*api.Endpoints) - // endpoint updates can come frequently, make sure it's a change we care about - if endpointsEquivalent(p, ob) { - return - } - dns.updateModifed() - dns.sendEndpointsUpdates(endpointsSubsetDiffs(p, ob)) - case *api.Pod: - dns.updateModifed() - dns.sendPodUpdates(ob) - default: - log.Warningf("Updates for %T not supported.", ob) - } -} - -func (dns *dnsControl) Add(obj interface{}) { dns.sendUpdates(nil, obj) } -func (dns *dnsControl) Delete(obj interface{}) { dns.sendUpdates(obj, nil) } -func (dns *dnsControl) Update(oldObj, newObj interface{}) { dns.sendUpdates(oldObj, newObj) } - -// subsetsEquivalent checks if two endpoint subsets are significantly equivalent -// I.e. that they have the same ready addresses, host names, ports (including protocol -// and service names for SRV) -func subsetsEquivalent(sa, sb api.EndpointSubset) bool { - if len(sa.Addresses) != len(sb.Addresses) { - return false - } - if len(sa.Ports) != len(sb.Ports) { - return false - } - - // in Addresses and Ports, we should be able to rely on - // these being sorted and able to be compared - // they are supposed to be in a canonical format - for addr, aaddr := range sa.Addresses { - baddr := sb.Addresses[addr] - if aaddr.IP != baddr.IP { - return false - } - if aaddr.Hostname != baddr.Hostname { - return false - } - } - - for port, aport := range sa.Ports { - bport := sb.Ports[port] - if aport.Name != bport.Name { - return false - } - if aport.Port != bport.Port { - return false - } - if aport.Protocol != bport.Protocol { - return false - } - } - return true -} - -// endpointsEquivalent checks if the update to an endpoint is something -// that matters to us or if they are effectively equivalent. -func endpointsEquivalent(a, b *api.Endpoints) bool { - - if len(a.Subsets) != len(b.Subsets) { - return false - } - - // we should be able to rely on - // these being sorted and able to be compared - // they are supposed to be in a canonical format - for i, sa := range a.Subsets { - sb := b.Subsets[i] - if !subsetsEquivalent(sa, sb) { - return false - } - } - return true -} - var errObj = errors.New("obj was not of the correct type") |