aboutsummaryrefslogtreecommitdiff
path: root/plugin/kubernetes/controller.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugin/kubernetes/controller.go')
-rw-r--r--plugin/kubernetes/controller.go68
1 files changed, 44 insertions, 24 deletions
diff --git a/plugin/kubernetes/controller.go b/plugin/kubernetes/controller.go
index d10d9f313..890785d71 100644
--- a/plugin/kubernetes/controller.go
+++ b/plugin/kubernetes/controller.go
@@ -59,6 +59,10 @@ type dnsControl struct {
selector labels.Selector
namespaceSelector labels.Selector
+ // epLock is used to lock reads of epLister and epController while they are being replaced
+ // with the api.Endpoints Lister/Controller on k8s systems that don't use discovery.EndpointSlices
+ epLock sync.RWMutex
+
svcController cache.Controller
podController cache.Controller
epController cache.Controller
@@ -83,7 +87,6 @@ type dnsControl struct {
type dnsControlOpts struct {
initPodCache bool
initEndpointsCache bool
- useEndpointSlices bool
ignoreEmptyService bool
// Label handling.
@@ -132,32 +135,18 @@ func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, opts
}
if opts.initEndpointsCache {
- var (
- apiObj runtime.Object
- listWatch cache.ListWatch
- to object.ToFunc
- latency *object.EndpointLatencyRecorder
- )
- if opts.useEndpointSlices {
- apiObj = &discovery.EndpointSlice{}
- listWatch.ListFunc = endpointSliceListFunc(ctx, dns.client, api.NamespaceAll, dns.selector)
- listWatch.WatchFunc = endpointSliceWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector)
- to = object.EndpointSliceToEndpoints
- latency = dns.EndpointSliceLatencyRecorder()
- } else {
- apiObj = &api.Endpoints{}
- listWatch.ListFunc = endpointsListFunc(ctx, dns.client, api.NamespaceAll, dns.selector)
- listWatch.WatchFunc = endpointsWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector)
- to = object.ToEndpoints
- latency = dns.EndpointsLatencyRecorder()
- }
+ dns.epLock.Lock()
dns.epLister, dns.epController = object.NewIndexerInformer(
- &listWatch,
- apiObj,
+ &cache.ListWatch{
+ ListFunc: endpointSliceListFunc(ctx, dns.client, api.NamespaceAll, dns.selector),
+ WatchFunc: endpointSliceWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector),
+ },
+ &discovery.EndpointSlice{},
cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete},
cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc, epIPIndex: epIPIndexFunc},
- object.DefaultProcessor(to, latency),
+ object.DefaultProcessor(object.EndpointSliceToEndpoints, dns.EndpointSliceLatencyRecorder()),
)
+ dns.epLock.Unlock()
}
dns.nsLister, dns.nsController = cache.NewInformer(
@@ -172,6 +161,25 @@ func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, opts
return &dns
}
+// WatchEndpoints will set the endpoint Lister and Controller to watch object.Endpoints
+// instead of the default discovery.EndpointSlice. This is used in older k8s clusters where
+// discovery.EndpointSlice is not fully supported.
+// This can be removed when all supported k8s versions fully support EndpointSlice.
+func (dns *dnsControl) WatchEndpoints(ctx context.Context) {
+ dns.epLock.Lock()
+ dns.epLister, dns.epController = object.NewIndexerInformer(
+ &cache.ListWatch{
+ ListFunc: endpointsListFunc(ctx, dns.client, api.NamespaceAll, dns.selector),
+ WatchFunc: endpointsWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector),
+ },
+ &api.Endpoints{},
+ cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete},
+ cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc, epIPIndex: epIPIndexFunc},
+ object.DefaultProcessor(object.ToEndpoints, dns.EndpointsLatencyRecorder()),
+ )
+ dns.epLock.Unlock()
+}
+
func (dns *dnsControl) EndpointsLatencyRecorder() *object.EndpointLatencyRecorder {
return &object.EndpointLatencyRecorder{
ServiceFunc: func(o meta.Object) []*object.Service {
@@ -351,7 +359,11 @@ func (dns *dnsControl) Stop() error {
func (dns *dnsControl) Run() {
go dns.svcController.Run(dns.stopCh)
if dns.epController != nil {
- go dns.epController.Run(dns.stopCh)
+ go func() {
+ dns.epLock.RLock()
+ dns.epController.Run(dns.stopCh)
+ dns.epLock.RUnlock()
+ }()
}
if dns.podController != nil {
go dns.podController.Run(dns.stopCh)
@@ -365,7 +377,9 @@ func (dns *dnsControl) HasSynced() bool {
a := dns.svcController.HasSynced()
b := true
if dns.epController != nil {
+ dns.epLock.RLock()
b = dns.epController.HasSynced()
+ dns.epLock.RUnlock()
}
c := true
if dns.podController != nil {
@@ -388,6 +402,8 @@ func (dns *dnsControl) ServiceList() (svcs []*object.Service) {
}
func (dns *dnsControl) EndpointsList() (eps []*object.Endpoints) {
+ dns.epLock.RLock()
+ defer dns.epLock.RUnlock()
os := dns.epLister.List()
for _, o := range os {
ep, ok := o.(*object.Endpoints)
@@ -446,6 +462,8 @@ func (dns *dnsControl) SvcIndexReverse(ip string) (svcs []*object.Service) {
}
func (dns *dnsControl) EpIndex(idx string) (ep []*object.Endpoints) {
+ dns.epLock.RLock()
+ defer dns.epLock.RUnlock()
os, err := dns.epLister.ByIndex(epNameNamespaceIndex, idx)
if err != nil {
return nil
@@ -461,6 +479,8 @@ func (dns *dnsControl) EpIndex(idx string) (ep []*object.Endpoints) {
}
func (dns *dnsControl) EpIndexReverse(ip string) (ep []*object.Endpoints) {
+ dns.epLock.RLock()
+ defer dns.epLock.RUnlock()
os, err := dns.epLister.ByIndex(epIPIndex, ip)
if err != nil {
return nil