diff options
Diffstat (limited to 'plugin/kubernetes/watch.go')
-rw-r--r-- | plugin/kubernetes/watch.go | 162 |
1 files changed, 162 insertions, 0 deletions
diff --git a/plugin/kubernetes/watch.go b/plugin/kubernetes/watch.go index 488540444..5c52cc4f9 100644 --- a/plugin/kubernetes/watch.go +++ b/plugin/kubernetes/watch.go @@ -1,7 +1,9 @@ package kubernetes import ( + "github.com/coredns/coredns/plugin/kubernetes/object" "github.com/coredns/coredns/plugin/pkg/watch" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" ) // SetWatchChan implements watch.Watchable @@ -18,3 +20,163 @@ func (k *Kubernetes) Watch(qname string) error { func (k *Kubernetes) StopWatching(qname string) { k.APIConn.StopWatching(qname) } + +var _ watch.Watchable = &Kubernetes{} + +func (dns *dnsControl) sendServiceUpdates(s *object.Service) { + for i := range dns.zones { + name := serviceFQDN(s, dns.zones[i]) + if _, ok := dns.watched[name]; ok { + dns.watchChan <- name + } + } +} + +func (dns *dnsControl) sendPodUpdates(p *object.Pod) { + for i := range dns.zones { + name := podFQDN(p, dns.zones[i]) + if _, ok := dns.watched[name]; ok { + dns.watchChan <- name + } + } +} + +func (dns *dnsControl) sendEndpointsUpdates(ep *object.Endpoints) { + for _, zone := range dns.zones { + for _, name := range endpointFQDN(ep, zone, dns.endpointNameMode) { + if _, ok := dns.watched[name]; ok { + dns.watchChan <- name + } + } + name := serviceFQDN(ep, zone) + if _, ok := dns.watched[name]; ok { + dns.watchChan <- name + } + } +} + +// endpointsSubsetDiffs returns an Endpoints struct containing the Subsets that have changed between a and b. +// When we notify clients of changed endpoints we only want to notify them of endpoints that have changed. +// The Endpoints API object holds more than one endpoint, held in a list of Subsets. Each Subset refers to +// an endpoint. So, here we create a new Endpoints struct, and populate it with only the endpoints that have changed. +// This new Endpoints object is later used to generate the list of endpoint FQDNs to send to the client. +// This function computes this literally by combining the sets (in a and not in b) union (in b and not in a). +func endpointsSubsetDiffs(a, b *object.Endpoints) *object.Endpoints { + c := b.CopyWithoutSubsets() + + // In the following loop, the first iteration computes (in a but not in b). + // The second iteration then adds (in b but not in a) + // The end result is an Endpoints that only contains the subsets (endpoints) that are different between a and b. + for _, abba := range [][]*object.Endpoints{{a, b}, {b, a}} { + a := abba[0] + b := abba[1] + left: + for _, as := range a.Subsets { + for _, bs := range b.Subsets { + if subsetsEquivalent(as, bs) { + continue left + } + } + c.Subsets = append(c.Subsets, as) + } + } + return c +} + +// sendUpdates sends a notification to the server if a watch is enabled for the qname. +func (dns *dnsControl) sendUpdates(oldObj, newObj interface{}) { + // If both objects have the same resource version, they are identical. + if newObj != nil && oldObj != nil && (oldObj.(meta.Object).GetResourceVersion() == newObj.(meta.Object).GetResourceVersion()) { + return + } + obj := newObj + if obj == nil { + obj = oldObj + } + switch ob := obj.(type) { + case *object.Service: + dns.updateModifed() + dns.sendServiceUpdates(ob) + case *object.Endpoints: + if newObj == nil || oldObj == nil { + dns.updateModifed() + dns.sendEndpointsUpdates(ob) + return + } + p := oldObj.(*object.Endpoints) + // endpoint updates can come frequently, make sure it's a change we care about + if endpointsEquivalent(p, ob) { + return + } + dns.updateModifed() + dns.sendEndpointsUpdates(endpointsSubsetDiffs(p, ob)) + case *object.Pod: + dns.updateModifed() + dns.sendPodUpdates(ob) + default: + log.Warningf("Updates for %T not supported.", ob) + } +} + +func (dns *dnsControl) Add(obj interface{}) { dns.sendUpdates(nil, obj) } +func (dns *dnsControl) Delete(obj interface{}) { dns.sendUpdates(obj, nil) } +func (dns *dnsControl) Update(oldObj, newObj interface{}) { dns.sendUpdates(oldObj, newObj) } + +// subsetsEquivalent checks if two endpoint subsets are significantly equivalent +// I.e. that they have the same ready addresses, host names, ports (including protocol +// and service names for SRV) +func subsetsEquivalent(sa, sb object.EndpointSubset) bool { + if len(sa.Addresses) != len(sb.Addresses) { + return false + } + if len(sa.Ports) != len(sb.Ports) { + return false + } + + // in Addresses and Ports, we should be able to rely on + // these being sorted and able to be compared + // they are supposed to be in a canonical format + for addr, aaddr := range sa.Addresses { + baddr := sb.Addresses[addr] + if aaddr.IP != baddr.IP { + return false + } + if aaddr.Hostname != baddr.Hostname { + return false + } + } + + for port, aport := range sa.Ports { + bport := sb.Ports[port] + if aport.Name != bport.Name { + return false + } + if aport.Port != bport.Port { + return false + } + if aport.Protocol != bport.Protocol { + return false + } + } + return true +} + +// endpointsEquivalent checks if the update to an endpoint is something +// that matters to us or if they are effectively equivalent. +func endpointsEquivalent(a, b *object.Endpoints) bool { + + if len(a.Subsets) != len(b.Subsets) { + return false + } + + // we should be able to rely on + // these being sorted and able to be compared + // they are supposed to be in a canonical format + for i, sa := range a.Subsets { + sb := b.Subsets[i] + if !subsetsEquivalent(sa, sb) { + return false + } + } + return true +} |