diff options
Diffstat (limited to 'plugin/kubernetes/controller.go')
-rw-r--r-- | plugin/kubernetes/controller.go | 68 |
1 files changed, 44 insertions, 24 deletions
diff --git a/plugin/kubernetes/controller.go b/plugin/kubernetes/controller.go index d10d9f313..890785d71 100644 --- a/plugin/kubernetes/controller.go +++ b/plugin/kubernetes/controller.go @@ -59,6 +59,10 @@ type dnsControl struct { selector labels.Selector namespaceSelector labels.Selector + // epLock is used to lock reads of epLister and epController while they are being replaced + // with the api.Endpoints Lister/Controller on k8s systems that don't use discovery.EndpointSlices + epLock sync.RWMutex + svcController cache.Controller podController cache.Controller epController cache.Controller @@ -83,7 +87,6 @@ type dnsControl struct { type dnsControlOpts struct { initPodCache bool initEndpointsCache bool - useEndpointSlices bool ignoreEmptyService bool // Label handling. @@ -132,32 +135,18 @@ func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, opts } if opts.initEndpointsCache { - var ( - apiObj runtime.Object - listWatch cache.ListWatch - to object.ToFunc - latency *object.EndpointLatencyRecorder - ) - if opts.useEndpointSlices { - apiObj = &discovery.EndpointSlice{} - listWatch.ListFunc = endpointSliceListFunc(ctx, dns.client, api.NamespaceAll, dns.selector) - listWatch.WatchFunc = endpointSliceWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector) - to = object.EndpointSliceToEndpoints - latency = dns.EndpointSliceLatencyRecorder() - } else { - apiObj = &api.Endpoints{} - listWatch.ListFunc = endpointsListFunc(ctx, dns.client, api.NamespaceAll, dns.selector) - listWatch.WatchFunc = endpointsWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector) - to = object.ToEndpoints - latency = dns.EndpointsLatencyRecorder() - } + dns.epLock.Lock() dns.epLister, dns.epController = object.NewIndexerInformer( - &listWatch, - apiObj, + &cache.ListWatch{ + ListFunc: endpointSliceListFunc(ctx, dns.client, api.NamespaceAll, dns.selector), + WatchFunc: endpointSliceWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector), + }, + &discovery.EndpointSlice{}, cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete}, cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc, epIPIndex: epIPIndexFunc}, - object.DefaultProcessor(to, latency), + object.DefaultProcessor(object.EndpointSliceToEndpoints, dns.EndpointSliceLatencyRecorder()), ) + dns.epLock.Unlock() } dns.nsLister, dns.nsController = cache.NewInformer( @@ -172,6 +161,25 @@ func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, opts return &dns } +// WatchEndpoints will set the endpoint Lister and Controller to watch object.Endpoints +// instead of the default discovery.EndpointSlice. This is used in older k8s clusters where +// discovery.EndpointSlice is not fully supported. +// This can be removed when all supported k8s versions fully support EndpointSlice. +func (dns *dnsControl) WatchEndpoints(ctx context.Context) { + dns.epLock.Lock() + dns.epLister, dns.epController = object.NewIndexerInformer( + &cache.ListWatch{ + ListFunc: endpointsListFunc(ctx, dns.client, api.NamespaceAll, dns.selector), + WatchFunc: endpointsWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector), + }, + &api.Endpoints{}, + cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete}, + cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc, epIPIndex: epIPIndexFunc}, + object.DefaultProcessor(object.ToEndpoints, dns.EndpointsLatencyRecorder()), + ) + dns.epLock.Unlock() +} + func (dns *dnsControl) EndpointsLatencyRecorder() *object.EndpointLatencyRecorder { return &object.EndpointLatencyRecorder{ ServiceFunc: func(o meta.Object) []*object.Service { @@ -351,7 +359,11 @@ func (dns *dnsControl) Stop() error { func (dns *dnsControl) Run() { go dns.svcController.Run(dns.stopCh) if dns.epController != nil { - go dns.epController.Run(dns.stopCh) + go func() { + dns.epLock.RLock() + dns.epController.Run(dns.stopCh) + dns.epLock.RUnlock() + }() } if dns.podController != nil { go dns.podController.Run(dns.stopCh) @@ -365,7 +377,9 @@ func (dns *dnsControl) HasSynced() bool { a := dns.svcController.HasSynced() b := true if dns.epController != nil { + dns.epLock.RLock() b = dns.epController.HasSynced() + dns.epLock.RUnlock() } c := true if dns.podController != nil { @@ -388,6 +402,8 @@ func (dns *dnsControl) ServiceList() (svcs []*object.Service) { } func (dns *dnsControl) EndpointsList() (eps []*object.Endpoints) { + dns.epLock.RLock() + defer dns.epLock.RUnlock() os := dns.epLister.List() for _, o := range os { ep, ok := o.(*object.Endpoints) @@ -446,6 +462,8 @@ func (dns *dnsControl) SvcIndexReverse(ip string) (svcs []*object.Service) { } func (dns *dnsControl) EpIndex(idx string) (ep []*object.Endpoints) { + dns.epLock.RLock() + defer dns.epLock.RUnlock() os, err := dns.epLister.ByIndex(epNameNamespaceIndex, idx) if err != nil { return nil @@ -461,6 +479,8 @@ func (dns *dnsControl) EpIndex(idx string) (ep []*object.Endpoints) { } func (dns *dnsControl) EpIndexReverse(ip string) (ep []*object.Endpoints) { + dns.epLock.RLock() + defer dns.epLock.RUnlock() os, err := dns.epLister.ByIndex(epIPIndex, ip) if err != nil { return nil |