From 272ccb195d31cd1622d48f961f3a189ce3abb937 Mon Sep 17 00:00:00 2001 From: Chris O'Haver Date: Fri, 30 Oct 2020 08:14:30 -0400 Subject: plugin/kubernetes: Watch EndpointSlices (#4209) * initial commit Signed-off-by: Chris O'Haver * convert endpointslices to object.endpoints Signed-off-by: Chris O'Haver * add opt hard coded for now Signed-off-by: Chris O'Haver * check that server supports endpointslice Signed-off-by: Chris O'Haver * fix import grouping Signed-off-by: Chris O'Haver * dont use endpoint slice in 1.17 or 1.18 Signed-off-by: Chris O'Haver * bump kind/k8s in circle ci to latest Signed-off-by: Chris O'Haver * drop k8s to latest supported by kind Signed-off-by: Chris O'Haver * use endpointslice name as endoint Name; index by Service name Signed-off-by: Chris O'Haver * use index key comparison in nsAddrs() Signed-off-by: Chris O'Haver * add Index to object.Endpoint fixtures; fix direct endpoint name compares Signed-off-by: Chris O'Haver * add slice dup check and test Signed-off-by: Chris O'Haver * todo Signed-off-by: Chris O'Haver * add ep-slice skew dup test for reverse Signed-off-by: Chris O'Haver * nsaddrs: de-dup ep-slice skew dups; add test Signed-off-by: Chris O'Haver * remove todo Signed-off-by: Chris O'Haver * address various feedback Signed-off-by: Chris O'Haver * consolidate endpoint/slice informer code Signed-off-by: Chris O'Haver * fix endpoint informer consolidation; use clearer func name Signed-off-by: Chris O'Haver * log info; use major/minor fields Signed-off-by: Chris O'Haver * fix nsAddr and unit test Signed-off-by: Chris O'Haver * add latency tracking for endpointslices Signed-off-by: Chris O'Haver * endpointslice latency unit test & fix Signed-off-by: Chris O'Haver * code shuffling Signed-off-by: Chris O'Haver * rename endpointslices in tests Signed-off-by: Chris O'Haver * remove de-dup from nsAddrs and test Signed-off-by: Chris O'Haver * remove de-dup from findServices / test Signed-off-by: Chris O'Haver --- plugin/kubernetes/controller.go | 113 +++++++++++++++++++++++++++++++++------- 1 file changed, 95 insertions(+), 18 deletions(-) (limited to 'plugin/kubernetes/controller.go') diff --git a/plugin/kubernetes/controller.go b/plugin/kubernetes/controller.go index 90a005177..f9373eee9 100644 --- a/plugin/kubernetes/controller.go +++ b/plugin/kubernetes/controller.go @@ -11,16 +11,18 @@ import ( "github.com/coredns/coredns/plugin/kubernetes/object" api "k8s.io/api/core/v1" + discovery "k8s.io/api/discovery/v1beta1" meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" ) const ( podIPIndex = "PodIP" - svcNameNamespaceIndex = "NameNamespace" + svcNameNamespaceIndex = "ServiceNameNamespace" svcIPIndex = "ServiceIP" epNameNamespaceIndex = "EndpointNameNamespace" epIPIndex = "EndpointsIP" @@ -81,6 +83,7 @@ type dnsControl struct { type dnsControlOpts struct { initPodCache bool initEndpointsCache bool + useEndpointSlices bool ignoreEmptyService bool // Label handling. @@ -130,15 +133,31 @@ func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, opts } if opts.initEndpointsCache { + var ( + apiObj runtime.Object + listWatch cache.ListWatch + to func(bool) object.ToFunc + latency object.RecordLatencyFunc + ) + if opts.useEndpointSlices { + apiObj = &discovery.EndpointSlice{} + listWatch.ListFunc = endpointSliceListFunc(ctx, dns.client, api.NamespaceAll, dns.selector) + listWatch.WatchFunc = endpointSliceWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector) + to = object.EndpointSliceToEndpoints + latency = dns.recordEndpointSliceDNSProgrammingLatency + } else { + apiObj = &api.Endpoints{} + listWatch.ListFunc = endpointsListFunc(ctx, dns.client, api.NamespaceAll, dns.selector) + listWatch.WatchFunc = endpointsWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector) + to = object.ToEndpoints + latency = dns.recordEndpointDNSProgrammingLatency + } dns.epLister, dns.epController = object.NewIndexerInformer( - &cache.ListWatch{ - ListFunc: endpointsListFunc(ctx, dns.client, api.NamespaceAll, dns.selector), - WatchFunc: endpointsWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector), - }, - &api.Endpoints{}, + &listWatch, + apiObj, cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete}, cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc, epIPIndex: epIPIndexFunc}, - object.DefaultProcessor(object.ToEndpoints(opts.skipAPIObjectsCleanup), dns.recordDNSProgrammingLatency), + object.DefaultProcessor(to(opts.skipAPIObjectsCleanup), latency), ) } @@ -154,8 +173,12 @@ func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, opts return &dns } -func (dns *dnsControl) recordDNSProgrammingLatency(obj meta.Object) { - recordDNSProgrammingLatency(dns.getServices(obj.(*api.Endpoints)), obj.(*api.Endpoints)) +func (dns *dnsControl) recordEndpointDNSProgrammingLatency(obj meta.Object) { + recordDNSProgrammingLatency(dns.getServices(obj.(*api.Endpoints)), obj) +} + +func (dns *dnsControl) recordEndpointSliceDNSProgrammingLatency(obj meta.Object) { + recordDNSProgrammingLatency(dns.SvcIndex(object.ServiceKey(obj.GetLabels()[discovery.LabelServiceName], obj.GetNamespace())), obj) } func podIPIndexFunc(obj interface{}) ([]string, error) { @@ -207,8 +230,7 @@ func serviceListFunc(ctx context.Context, c kubernetes.Interface, ns string, s l if s != nil { opts.LabelSelector = s.String() } - listV1, err := c.CoreV1().Services(ns).List(ctx, opts) - return listV1, err + return c.CoreV1().Services(ns).List(ctx, opts) } } @@ -221,8 +243,16 @@ func podListFunc(ctx context.Context, c kubernetes.Interface, ns string, s label opts.FieldSelector = opts.FieldSelector + "," } opts.FieldSelector = opts.FieldSelector + "status.phase!=Succeeded,status.phase!=Failed,status.phase!=Unknown" - listV1, err := c.CoreV1().Pods(ns).List(ctx, opts) - return listV1, err + return c.CoreV1().Pods(ns).List(ctx, opts) + } +} + +func endpointSliceListFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) { + return func(opts meta.ListOptions) (runtime.Object, error) { + if s != nil { + opts.LabelSelector = s.String() + } + return c.DiscoveryV1beta1().EndpointSlices(ns).List(ctx, opts) } } @@ -231,8 +261,7 @@ func endpointsListFunc(ctx context.Context, c kubernetes.Interface, ns string, s if s != nil { opts.LabelSelector = s.String() } - listV1, err := c.CoreV1().Endpoints(ns).List(ctx, opts) - return listV1, err + return c.CoreV1().Endpoints(ns).List(ctx, opts) } } @@ -241,8 +270,56 @@ func namespaceListFunc(ctx context.Context, c kubernetes.Interface, s labels.Sel if s != nil { opts.LabelSelector = s.String() } - listV1, err := c.CoreV1().Namespaces().List(ctx, opts) - return listV1, err + return c.CoreV1().Namespaces().List(ctx, opts) + } +} + +func serviceWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) { + return func(options meta.ListOptions) (watch.Interface, error) { + if s != nil { + options.LabelSelector = s.String() + } + return c.CoreV1().Services(ns).Watch(ctx, options) + } +} + +func podWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) { + return func(options meta.ListOptions) (watch.Interface, error) { + if s != nil { + options.LabelSelector = s.String() + } + if len(options.FieldSelector) > 0 { + options.FieldSelector = options.FieldSelector + "," + } + options.FieldSelector = options.FieldSelector + "status.phase!=Succeeded,status.phase!=Failed,status.phase!=Unknown" + return c.CoreV1().Pods(ns).Watch(ctx, options) + } +} + +func endpointSliceWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) { + return func(options meta.ListOptions) (watch.Interface, error) { + if s != nil { + options.LabelSelector = s.String() + } + return c.DiscoveryV1beta1().EndpointSlices(ns).Watch(ctx, options) + } +} + +func endpointsWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) { + return func(options meta.ListOptions) (watch.Interface, error) { + if s != nil { + options.LabelSelector = s.String() + } + return c.CoreV1().Endpoints(ns).Watch(ctx, options) + } +} + +func namespaceWatchFunc(ctx context.Context, c kubernetes.Interface, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) { + return func(options meta.ListOptions) (watch.Interface, error) { + if s != nil { + options.LabelSelector = s.String() + } + return c.CoreV1().Namespaces().Watch(ctx, options) } } @@ -442,7 +519,7 @@ func (dns *dnsControl) detectChanges(oldObj, newObj interface{}) { } func (dns *dnsControl) getServices(endpoints *api.Endpoints) []*object.Service { - return dns.SvcIndex(object.EndpointsKey(endpoints.GetName(), endpoints.GetNamespace())) + return dns.SvcIndex(object.ServiceKey(endpoints.GetName(), endpoints.GetNamespace())) } // subsetsEquivalent checks if two endpoint subsets are significantly equivalent -- cgit v1.2.3