diff options
author | 2018-06-27 07:45:32 -0700 | |
---|---|---|
committer | 2018-06-27 07:45:32 -0700 | |
commit | 99287d091c2db4028e54782fd4de43f63ca4b040 (patch) | |
tree | 5e9908031dbf3256a0ad3e2a54c0a90251e88757 /plugin/kubernetes/controller.go | |
parent | b7480d5d1216aa87d80d240a31de750079eba904 (diff) | |
download | coredns-99287d091c2db4028e54782fd4de43f63ca4b040.tar.gz coredns-99287d091c2db4028e54782fd4de43f63ca4b040.tar.zst coredns-99287d091c2db4028e54782fd4de43f63ca4b040.zip |
Watch feature (#1527)
* Add part 1 watch functionality. (squashed)
* add funcs for service/endpoint fqdns
* add endpoints watch
* document exposed funcs
* only send subset deltas
* locking for watch map
* tests and docs
* add pod watch
* remove debugs prints
* feedback part 1
* add error reporting to proto
* inform clients of server stop+errors
* add grpc options param
* use proper context
* Review feedback:
* Removed client (will move to another repo)
* Use new log functions
* Change watchChan to be for string not []string
* Rework how k8s plugin stores watch tracking info to simplify
* Normalize the qname on watch request
* Add blank line back
* Revert another spurious change
* Fix tests
* Add stop channel.
Fix tests.
Better docs for plugin interface.
* fmt.Printf -> log.Warningf
* Move from dnsserver to plugin/pkg/watch
* gofmt
* remove dead client watches
* sate linter
* linter omg
Diffstat (limited to 'plugin/kubernetes/controller.go')
-rw-r--r-- | plugin/kubernetes/controller.go | 222 |
1 files changed, 179 insertions, 43 deletions
diff --git a/plugin/kubernetes/controller.go b/plugin/kubernetes/controller.go index 0d7370a56..286f87d8e 100644 --- a/plugin/kubernetes/controller.go +++ b/plugin/kubernetes/controller.go @@ -7,6 +7,8 @@ import ( "sync/atomic" "time" + dnswatch "github.com/coredns/coredns/plugin/pkg/watch" + api "k8s.io/api/core/v1" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" @@ -45,6 +47,11 @@ type dnsController interface { // Modified returns the timestamp of the most recent changes Modified() int64 + + // Watch-related items + SetWatchChan(dnswatch.Chan) + Watch(string) error + StopWatching(string) } type dnsControl struct { @@ -73,6 +80,12 @@ type dnsControl struct { stopLock sync.Mutex shutdown bool stopCh chan struct{} + + // watch-related items channel + watchChan dnswatch.Chan + watched map[string]bool + zones []string + endpointNameMode bool } type dnsControlOpts struct { @@ -83,14 +96,20 @@ type dnsControlOpts struct { // Label handling. labelSelector *meta.LabelSelector selector labels.Selector + + zones []string + endpointNameMode bool } // newDNSController creates a controller for CoreDNS. func newdnsController(kubeClient *kubernetes.Clientset, opts dnsControlOpts) *dnsControl { dns := dnsControl{ - client: kubeClient, - selector: opts.selector, - stopCh: make(chan struct{}), + client: kubeClient, + selector: opts.selector, + stopCh: make(chan struct{}), + watched: make(map[string]bool), + zones: opts.zones, + endpointNameMode: opts.endpointNameMode, } dns.svcLister, dns.svcController = cache.NewIndexerInformer( @@ -292,6 +311,22 @@ func namespaceWatchFunc(c *kubernetes.Clientset, s labels.Selector) func(options } } +func (dns *dnsControl) SetWatchChan(c dnswatch.Chan) { + dns.watchChan = c +} + +func (dns *dnsControl) Watch(qname string) error { + if dns.watchChan == nil { + return fmt.Errorf("cannot start watch because the channel has not been set") + } + dns.watched[qname] = true + return nil +} + +func (dns *dnsControl) StopWatching(qname string) { + delete(dns.watched, qname) +} + // Stop stops the controller. func (dns *dnsControl) Stop() error { dns.stopLock.Lock() @@ -492,63 +527,164 @@ func (dns *dnsControl) updateModifed() { atomic.StoreInt64(&dns.modified, unix) } -func (dns *dnsControl) Add(obj interface{}) { dns.updateModifed() } -func (dns *dnsControl) Delete(obj interface{}) { dns.updateModifed() } +func (dns *dnsControl) sendServiceUpdates(s *api.Service) { + for i := range dns.zones { + name := serviceFQDN(s, dns.zones[i]) + if _, ok := dns.watched[name]; ok { + dns.watchChan <- name + } + } +} + +func (dns *dnsControl) sendPodUpdates(p *api.Pod) { + for i := range dns.zones { + name := podFQDN(p, dns.zones[i]) + if _, ok := dns.watched[name]; ok { + dns.watchChan <- name + } + } +} + +func (dns *dnsControl) sendEndpointsUpdates(ep *api.Endpoints) { + for _, zone := range dns.zones { + names := append(endpointFQDN(ep, zone, dns.endpointNameMode), serviceFQDN(ep, zone)) + for _, name := range names { + if _, ok := dns.watched[name]; ok { + dns.watchChan <- name + } + } + } +} + +// endpointsSubsetDiffs returns an Endpoints struct containing the Subsets that have changed between a and b. +// When we notify clients of changed endpoints we only want to notify them of endpoints that have changed. +// The Endpoints API object holds more than one endpoint, held in a list of Subsets. Each Subset refers to +// an endpoint. So, here we create a new Endpoints struct, and populate it with only the endpoints that have changed. +// This new Endpoints object is later used to generate the list of endpoint FQDNs to send to the client. +// This function computes this literally by combining the sets (in a and not in b) union (in b and not in a). +func endpointsSubsetDiffs(a, b *api.Endpoints) *api.Endpoints { + c := b.DeepCopy() + c.Subsets = []api.EndpointSubset{} + + // In the following loop, the first iteration computes (in a but not in b). + // The second iteration then adds (in b but not in a) + // The end result is an Endpoints that only contains the subsets (endpoints) that are different between a and b. + for _, abba := range [][]*api.Endpoints{{a, b}, {b, a}} { + a := abba[0] + b := abba[1] + left: + for _, as := range a.Subsets { + for _, bs := range b.Subsets { + if subsetsEquivalent(as, bs) { + continue left + } + } + c.Subsets = append(c.Subsets, as) + } + } + return c +} -func (dns *dnsControl) Update(objOld, newObj interface{}) { - // endpoint updates can come frequently, make sure - // it's a change we care about - if o, ok := objOld.(*api.Endpoints); ok { - n := newObj.(*api.Endpoints) - if endpointsEquivalent(o, n) { +// sendUpdates sends a notification to the server if a watch +// is enabled for the qname +func (dns *dnsControl) sendUpdates(oldObj, newObj interface{}) { + // If both objects have the same resource version, they are identical. + if newObj != nil && oldObj != nil && (oldObj.(meta.Object).GetResourceVersion() == newObj.(meta.Object).GetResourceVersion()) { + return + } + obj := newObj + if obj == nil { + obj = oldObj + } + switch ob := obj.(type) { + case *api.Service: + dns.updateModifed() + dns.sendServiceUpdates(ob) + case *api.Endpoints: + if newObj == nil || oldObj == nil { + dns.updateModifed() + dns.sendEndpointsUpdates(ob) + return + } + p := oldObj.(*api.Endpoints) + // endpoint updates can come frequently, make sure it's a change we care about + if endpointsEquivalent(p, ob) { return } + dns.updateModifed() + dns.sendEndpointsUpdates(endpointsSubsetDiffs(p, ob)) + case *api.Pod: + dns.updateModifed() + dns.sendPodUpdates(ob) + default: + log.Warningf("Updates for %T not supported.", ob) } - dns.updateModifed() } -// endpointsEquivalent checks if the update to an endpoint is something -// that matters to us: ready addresses, host names, ports (including names for SRV) -func endpointsEquivalent(a, b *api.Endpoints) bool { - // supposedly we should be able to rely on - // these being sorted and able to be compared - // they are supposed to be in a canonical format +func (dns *dnsControl) Add(obj interface{}) { + dns.sendUpdates(nil, obj) +} +func (dns *dnsControl) Delete(obj interface{}) { + dns.sendUpdates(obj, nil) +} +func (dns *dnsControl) Update(oldObj, newObj interface{}) { + dns.sendUpdates(oldObj, newObj) +} - if len(a.Subsets) != len(b.Subsets) { +// subsetsEquivalent checks if two endpoint subsets are significantly equivalent +// I.e. that they have the same ready addresses, host names, ports (including protocol +// and service names for SRV) +func subsetsEquivalent(sa, sb api.EndpointSubset) bool { + if len(sa.Addresses) != len(sb.Addresses) { + return false + } + if len(sa.Ports) != len(sb.Ports) { return false } - for i, sa := range a.Subsets { - // check the Addresses and Ports. Ignore unready addresses. - sb := b.Subsets[i] - if len(sa.Addresses) != len(sb.Addresses) { + // in Addresses and Ports, we should be able to rely on + // these being sorted and able to be compared + // they are supposed to be in a canonical format + for addr, aaddr := range sa.Addresses { + baddr := sb.Addresses[addr] + if aaddr.IP != baddr.IP { return false } - if len(sa.Ports) != len(sb.Ports) { + if aaddr.Hostname != baddr.Hostname { return false } + } - for addr, aaddr := range sa.Addresses { - baddr := sb.Addresses[addr] - if aaddr.IP != baddr.IP { - return false - } - if aaddr.Hostname != baddr.Hostname { - return false - } + for port, aport := range sa.Ports { + bport := sb.Ports[port] + if aport.Name != bport.Name { + return false + } + if aport.Port != bport.Port { + return false + } + if aport.Protocol != bport.Protocol { + return false } + } + return true +} - for port, aport := range sa.Ports { - bport := sb.Ports[port] - if aport.Name != bport.Name { - return false - } - if aport.Port != bport.Port { - return false - } - if aport.Protocol != bport.Protocol { - return false - } +// endpointsEquivalent checks if the update to an endpoint is something +// that matters to us or if they are effectively equivalent. +func endpointsEquivalent(a, b *api.Endpoints) bool { + + if len(a.Subsets) != len(b.Subsets) { + return false + } + + // we should be able to rely on + // these being sorted and able to be compared + // they are supposed to be in a canonical format + for i, sa := range a.Subsets { + sb := b.Subsets[i] + if !subsetsEquivalent(sa, sb) { + return false } } return true |