diff options
author | 2020-06-15 10:15:41 -0400 | |
---|---|---|
committer | 2020-06-15 10:15:41 -0400 | |
commit | d902e859199e4085cd27453f30367fd1b0799bc5 (patch) | |
tree | 907bb1a8ae35452d4f82bab798a9c1d0f89d3f1b /plugin/kubernetes/controller.go | |
parent | d35c8e9eda7d6f2bb570165d4d1bf98ae5f813d9 (diff) | |
download | coredns-d902e859199e4085cd27453f30367fd1b0799bc5.tar.gz coredns-d902e859199e4085cd27453f30367fd1b0799bc5.tar.zst coredns-d902e859199e4085cd27453f30367fd1b0799bc5.zip |
plugin/kubernetes: fix tombstone unwrapping (#3924)
* fix tombstone unwrapping
Signed-off-by: Chris O'Haver <cohaver@infoblox.com>
Diffstat (limited to 'plugin/kubernetes/controller.go')
-rw-r--r-- | plugin/kubernetes/controller.go | 87 |
1 files changed, 16 insertions, 71 deletions
diff --git a/plugin/kubernetes/controller.go b/plugin/kubernetes/controller.go index 01cce28f2..90a005177 100644 --- a/plugin/kubernetes/controller.go +++ b/plugin/kubernetes/controller.go @@ -113,7 +113,7 @@ func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, opts &api.Service{}, cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete}, cache.Indexers{svcNameNamespaceIndex: svcNameNamespaceIndexFunc, svcIPIndex: svcIPIndexFunc}, - object.DefaultProcessor(object.ToService(opts.skipAPIObjectsCleanup)), + object.DefaultProcessor(object.ToService(opts.skipAPIObjectsCleanup), nil), ) if opts.initPodCache { @@ -125,7 +125,7 @@ func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, opts &api.Pod{}, cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete}, cache.Indexers{podIPIndex: podIPIndexFunc}, - object.DefaultProcessor(object.ToPod(opts.skipAPIObjectsCleanup)), + object.DefaultProcessor(object.ToPod(opts.skipAPIObjectsCleanup), nil), ) } @@ -136,73 +136,10 @@ func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, opts WatchFunc: endpointsWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector), }, &api.Endpoints{}, - cache.ResourceEventHandlerFuncs{}, + cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete}, cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc, epIPIndex: epIPIndexFunc}, - func(clientState cache.Indexer, h cache.ResourceEventHandler) cache.ProcessFunc { - return func(obj interface{}) error { - for _, d := range obj.(cache.Deltas) { - switch d.Type { - case cache.Sync, cache.Added, cache.Updated: - apiEndpoints, ok := d.Object.(*api.Endpoints) - if !ok { - return errors.New("got non-endpoint add/update") - } - obj := object.ToEndpoints(apiEndpoints) - - if old, exists, err := clientState.Get(obj); err == nil && exists { - if err := clientState.Update(obj); err != nil { - return err - } - h.OnUpdate(old, obj) - // endpoint updates can come frequently, make sure it's a change we care about - if !endpointsEquivalent(old.(*object.Endpoints), obj) { - dns.updateModifed() - recordDNSProgrammingLatency(dns.getServices(obj), apiEndpoints) - } - } else { - if err := clientState.Add(obj); err != nil { - return err - } - h.OnAdd(d.Object) - dns.updateModifed() - recordDNSProgrammingLatency(dns.getServices(obj), apiEndpoints) - if !opts.skipAPIObjectsCleanup { - *apiEndpoints = api.Endpoints{} - } - } - case cache.Deleted: - apiEndpoints, ok := d.Object.(*api.Endpoints) - if !ok { - // Assume that the object must be a cache.DeletedFinalStateUnknown. - // This is essentially an indicator that the Endpoint was deleted, without a containing a - // up-to date copy of the Endpoints object. We need to use cache.DeletedFinalStateUnknown - // object so it can be properly deleted by store.Delete() below, which knows how to handle it. - tombstone, ok := d.Object.(cache.DeletedFinalStateUnknown) - if !ok { - return errors.New("expected tombstone") - } - apiEndpoints, ok = tombstone.Obj.(*api.Endpoints) - if !ok { - return errors.New("got non-endpoint tombstone") - } - } - obj := object.ToEndpoints(apiEndpoints) - - if err := clientState.Delete(obj); err != nil { - return err - } - h.OnDelete(d.Object) - dns.updateModifed() - recordDNSProgrammingLatency(dns.getServices(obj), apiEndpoints) - if !opts.skipAPIObjectsCleanup { - *apiEndpoints = api.Endpoints{} - } - } - } - return nil - } - }) - + object.DefaultProcessor(object.ToEndpoints(opts.skipAPIObjectsCleanup), dns.recordDNSProgrammingLatency), + ) } dns.nsLister, dns.nsController = cache.NewInformer( @@ -217,6 +154,10 @@ func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, opts return &dns } +func (dns *dnsControl) recordDNSProgrammingLatency(obj meta.Object) { + recordDNSProgrammingLatency(dns.getServices(obj.(*api.Endpoints)), obj.(*api.Endpoints)) +} + func podIPIndexFunc(obj interface{}) ([]string, error) { p, ok := obj.(*object.Pod) if !ok { @@ -472,8 +413,8 @@ func (dns *dnsControl) GetNamespaceByName(name string) (*api.Namespace, error) { return nil, fmt.Errorf("namespace not found") } -func (dns *dnsControl) Add(obj interface{}) { dns.detectChanges(nil, obj) } -func (dns *dnsControl) Delete(obj interface{}) { dns.detectChanges(obj, nil) } +func (dns *dnsControl) Add(obj interface{}) { dns.updateModifed() } +func (dns *dnsControl) Delete(obj interface{}) { dns.updateModifed() } func (dns *dnsControl) Update(oldObj, newObj interface{}) { dns.detectChanges(oldObj, newObj) } // detectChanges detects changes in objects, and updates the modified timestamp @@ -491,12 +432,16 @@ func (dns *dnsControl) detectChanges(oldObj, newObj interface{}) { dns.updateModifed() case *object.Pod: dns.updateModifed() + case *object.Endpoints: + if !endpointsEquivalent(oldObj.(*object.Endpoints), newObj.(*object.Endpoints)) { + dns.updateModifed() + } default: log.Warningf("Updates for %T not supported.", ob) } } -func (dns *dnsControl) getServices(endpoints *object.Endpoints) []*object.Service { +func (dns *dnsControl) getServices(endpoints *api.Endpoints) []*object.Service { return dns.SvcIndex(object.EndpointsKey(endpoints.GetName(), endpoints.GetNamespace())) } |