diff options
author | 2020-10-30 08:14:30 -0400 | |
---|---|---|
committer | 2020-10-30 08:14:30 -0400 | |
commit | 272ccb195d31cd1622d48f961f3a189ce3abb937 (patch) | |
tree | b5db771e2371b2e4ede772dff2c2c4217188115c /plugin/kubernetes/controller.go | |
parent | c840caf1ef77d8f86ee7d11f644e0d6ea42c469a (diff) | |
download | coredns-272ccb195d31cd1622d48f961f3a189ce3abb937.tar.gz coredns-272ccb195d31cd1622d48f961f3a189ce3abb937.tar.zst coredns-272ccb195d31cd1622d48f961f3a189ce3abb937.zip |
plugin/kubernetes: Watch EndpointSlices (#4209)
* initial commit
Signed-off-by: Chris O'Haver <cohaver@infoblox.com>
* convert endpointslices to object.endpoints
Signed-off-by: Chris O'Haver <cohaver@infoblox.com>
* add opt hard coded for now
Signed-off-by: Chris O'Haver <cohaver@infoblox.com>
* check that server supports endpointslice
Signed-off-by: Chris O'Haver <cohaver@infoblox.com>
* fix import grouping
Signed-off-by: Chris O'Haver <cohaver@infoblox.com>
* dont use endpoint slice in 1.17 or 1.18
Signed-off-by: Chris O'Haver <cohaver@infoblox.com>
* bump kind/k8s in circle ci to latest
Signed-off-by: Chris O'Haver <cohaver@infoblox.com>
* drop k8s to latest supported by kind
Signed-off-by: Chris O'Haver <cohaver@infoblox.com>
* use endpointslice name as endoint Name; index by Service name
Signed-off-by: Chris O'Haver <cohaver@infoblox.com>
* use index key comparison in nsAddrs()
Signed-off-by: Chris O'Haver <cohaver@infoblox.com>
* add Index to object.Endpoint fixtures; fix direct endpoint name compares
Signed-off-by: Chris O'Haver <cohaver@infoblox.com>
* add slice dup check and test
Signed-off-by: Chris O'Haver <cohaver@infoblox.com>
* todo
Signed-off-by: Chris O'Haver <cohaver@infoblox.com>
* add ep-slice skew dup test for reverse
Signed-off-by: Chris O'Haver <cohaver@infoblox.com>
* nsaddrs: de-dup ep-slice skew dups; add test
Signed-off-by: Chris O'Haver <cohaver@infoblox.com>
* remove todo
Signed-off-by: Chris O'Haver <cohaver@infoblox.com>
* address various feedback
Signed-off-by: Chris O'Haver <cohaver@infoblox.com>
* consolidate endpoint/slice informer code
Signed-off-by: Chris O'Haver <cohaver@infoblox.com>
* fix endpoint informer consolidation; use clearer func name
Signed-off-by: Chris O'Haver <cohaver@infoblox.com>
* log info; use major/minor fields
Signed-off-by: Chris O'Haver <cohaver@infoblox.com>
* fix nsAddr and unit test
Signed-off-by: Chris O'Haver <cohaver@infoblox.com>
* add latency tracking for endpointslices
Signed-off-by: Chris O'Haver <cohaver@infoblox.com>
* endpointslice latency unit test & fix
Signed-off-by: Chris O'Haver <cohaver@infoblox.com>
* code shuffling
Signed-off-by: Chris O'Haver <cohaver@infoblox.com>
* rename endpointslices in tests
Signed-off-by: Chris O'Haver <cohaver@infoblox.com>
* remove de-dup from nsAddrs and test
Signed-off-by: Chris O'Haver <cohaver@infoblox.com>
* remove de-dup from findServices / test
Signed-off-by: Chris O'Haver <cohaver@infoblox.com>
Diffstat (limited to 'plugin/kubernetes/controller.go')
-rw-r--r-- | plugin/kubernetes/controller.go | 113 |
1 files changed, 95 insertions, 18 deletions
diff --git a/plugin/kubernetes/controller.go b/plugin/kubernetes/controller.go index 90a005177..f9373eee9 100644 --- a/plugin/kubernetes/controller.go +++ b/plugin/kubernetes/controller.go @@ -11,16 +11,18 @@ import ( "github.com/coredns/coredns/plugin/kubernetes/object" api "k8s.io/api/core/v1" + discovery "k8s.io/api/discovery/v1beta1" meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" ) const ( podIPIndex = "PodIP" - svcNameNamespaceIndex = "NameNamespace" + svcNameNamespaceIndex = "ServiceNameNamespace" svcIPIndex = "ServiceIP" epNameNamespaceIndex = "EndpointNameNamespace" epIPIndex = "EndpointsIP" @@ -81,6 +83,7 @@ type dnsControl struct { type dnsControlOpts struct { initPodCache bool initEndpointsCache bool + useEndpointSlices bool ignoreEmptyService bool // Label handling. @@ -130,15 +133,31 @@ func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, opts } if opts.initEndpointsCache { + var ( + apiObj runtime.Object + listWatch cache.ListWatch + to func(bool) object.ToFunc + latency object.RecordLatencyFunc + ) + if opts.useEndpointSlices { + apiObj = &discovery.EndpointSlice{} + listWatch.ListFunc = endpointSliceListFunc(ctx, dns.client, api.NamespaceAll, dns.selector) + listWatch.WatchFunc = endpointSliceWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector) + to = object.EndpointSliceToEndpoints + latency = dns.recordEndpointSliceDNSProgrammingLatency + } else { + apiObj = &api.Endpoints{} + listWatch.ListFunc = endpointsListFunc(ctx, dns.client, api.NamespaceAll, dns.selector) + listWatch.WatchFunc = endpointsWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector) + to = object.ToEndpoints + latency = dns.recordEndpointDNSProgrammingLatency + } dns.epLister, dns.epController = object.NewIndexerInformer( - &cache.ListWatch{ - ListFunc: endpointsListFunc(ctx, dns.client, api.NamespaceAll, dns.selector), - WatchFunc: endpointsWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector), - }, - &api.Endpoints{}, + &listWatch, + apiObj, cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete}, cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc, epIPIndex: epIPIndexFunc}, - object.DefaultProcessor(object.ToEndpoints(opts.skipAPIObjectsCleanup), dns.recordDNSProgrammingLatency), + object.DefaultProcessor(to(opts.skipAPIObjectsCleanup), latency), ) } @@ -154,8 +173,12 @@ func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, opts return &dns } -func (dns *dnsControl) recordDNSProgrammingLatency(obj meta.Object) { - recordDNSProgrammingLatency(dns.getServices(obj.(*api.Endpoints)), obj.(*api.Endpoints)) +func (dns *dnsControl) recordEndpointDNSProgrammingLatency(obj meta.Object) { + recordDNSProgrammingLatency(dns.getServices(obj.(*api.Endpoints)), obj) +} + +func (dns *dnsControl) recordEndpointSliceDNSProgrammingLatency(obj meta.Object) { + recordDNSProgrammingLatency(dns.SvcIndex(object.ServiceKey(obj.GetLabels()[discovery.LabelServiceName], obj.GetNamespace())), obj) } func podIPIndexFunc(obj interface{}) ([]string, error) { @@ -207,8 +230,7 @@ func serviceListFunc(ctx context.Context, c kubernetes.Interface, ns string, s l if s != nil { opts.LabelSelector = s.String() } - listV1, err := c.CoreV1().Services(ns).List(ctx, opts) - return listV1, err + return c.CoreV1().Services(ns).List(ctx, opts) } } @@ -221,8 +243,16 @@ func podListFunc(ctx context.Context, c kubernetes.Interface, ns string, s label opts.FieldSelector = opts.FieldSelector + "," } opts.FieldSelector = opts.FieldSelector + "status.phase!=Succeeded,status.phase!=Failed,status.phase!=Unknown" - listV1, err := c.CoreV1().Pods(ns).List(ctx, opts) - return listV1, err + return c.CoreV1().Pods(ns).List(ctx, opts) + } +} + +func endpointSliceListFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) { + return func(opts meta.ListOptions) (runtime.Object, error) { + if s != nil { + opts.LabelSelector = s.String() + } + return c.DiscoveryV1beta1().EndpointSlices(ns).List(ctx, opts) } } @@ -231,8 +261,7 @@ func endpointsListFunc(ctx context.Context, c kubernetes.Interface, ns string, s if s != nil { opts.LabelSelector = s.String() } - listV1, err := c.CoreV1().Endpoints(ns).List(ctx, opts) - return listV1, err + return c.CoreV1().Endpoints(ns).List(ctx, opts) } } @@ -241,8 +270,56 @@ func namespaceListFunc(ctx context.Context, c kubernetes.Interface, s labels.Sel if s != nil { opts.LabelSelector = s.String() } - listV1, err := c.CoreV1().Namespaces().List(ctx, opts) - return listV1, err + return c.CoreV1().Namespaces().List(ctx, opts) + } +} + +func serviceWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) { + return func(options meta.ListOptions) (watch.Interface, error) { + if s != nil { + options.LabelSelector = s.String() + } + return c.CoreV1().Services(ns).Watch(ctx, options) + } +} + +func podWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) { + return func(options meta.ListOptions) (watch.Interface, error) { + if s != nil { + options.LabelSelector = s.String() + } + if len(options.FieldSelector) > 0 { + options.FieldSelector = options.FieldSelector + "," + } + options.FieldSelector = options.FieldSelector + "status.phase!=Succeeded,status.phase!=Failed,status.phase!=Unknown" + return c.CoreV1().Pods(ns).Watch(ctx, options) + } +} + +func endpointSliceWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) { + return func(options meta.ListOptions) (watch.Interface, error) { + if s != nil { + options.LabelSelector = s.String() + } + return c.DiscoveryV1beta1().EndpointSlices(ns).Watch(ctx, options) + } +} + +func endpointsWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) { + return func(options meta.ListOptions) (watch.Interface, error) { + if s != nil { + options.LabelSelector = s.String() + } + return c.CoreV1().Endpoints(ns).Watch(ctx, options) + } +} + +func namespaceWatchFunc(ctx context.Context, c kubernetes.Interface, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) { + return func(options meta.ListOptions) (watch.Interface, error) { + if s != nil { + options.LabelSelector = s.String() + } + return c.CoreV1().Namespaces().Watch(ctx, options) } } @@ -442,7 +519,7 @@ func (dns *dnsControl) detectChanges(oldObj, newObj interface{}) { } func (dns *dnsControl) getServices(endpoints *api.Endpoints) []*object.Service { - return dns.SvcIndex(object.EndpointsKey(endpoints.GetName(), endpoints.GetNamespace())) + return dns.SvcIndex(object.ServiceKey(endpoints.GetName(), endpoints.GetNamespace())) } // subsetsEquivalent checks if two endpoint subsets are significantly equivalent |