aboutsummaryrefslogtreecommitdiff
path: root/plugin/kubernetes/controller.go
diff options
context:
space:
mode:
authorGravatar Chris O'Haver <cohaver@infoblox.com> 2020-10-30 08:14:30 -0400
committerGravatar GitHub <noreply@github.com> 2020-10-30 08:14:30 -0400
commit272ccb195d31cd1622d48f961f3a189ce3abb937 (patch)
treeb5db771e2371b2e4ede772dff2c2c4217188115c /plugin/kubernetes/controller.go
parentc840caf1ef77d8f86ee7d11f644e0d6ea42c469a (diff)
downloadcoredns-272ccb195d31cd1622d48f961f3a189ce3abb937.tar.gz
coredns-272ccb195d31cd1622d48f961f3a189ce3abb937.tar.zst
coredns-272ccb195d31cd1622d48f961f3a189ce3abb937.zip
plugin/kubernetes: Watch EndpointSlices (#4209)
* initial commit Signed-off-by: Chris O'Haver <cohaver@infoblox.com> * convert endpointslices to object.endpoints Signed-off-by: Chris O'Haver <cohaver@infoblox.com> * add opt hard coded for now Signed-off-by: Chris O'Haver <cohaver@infoblox.com> * check that server supports endpointslice Signed-off-by: Chris O'Haver <cohaver@infoblox.com> * fix import grouping Signed-off-by: Chris O'Haver <cohaver@infoblox.com> * dont use endpoint slice in 1.17 or 1.18 Signed-off-by: Chris O'Haver <cohaver@infoblox.com> * bump kind/k8s in circle ci to latest Signed-off-by: Chris O'Haver <cohaver@infoblox.com> * drop k8s to latest supported by kind Signed-off-by: Chris O'Haver <cohaver@infoblox.com> * use endpointslice name as endoint Name; index by Service name Signed-off-by: Chris O'Haver <cohaver@infoblox.com> * use index key comparison in nsAddrs() Signed-off-by: Chris O'Haver <cohaver@infoblox.com> * add Index to object.Endpoint fixtures; fix direct endpoint name compares Signed-off-by: Chris O'Haver <cohaver@infoblox.com> * add slice dup check and test Signed-off-by: Chris O'Haver <cohaver@infoblox.com> * todo Signed-off-by: Chris O'Haver <cohaver@infoblox.com> * add ep-slice skew dup test for reverse Signed-off-by: Chris O'Haver <cohaver@infoblox.com> * nsaddrs: de-dup ep-slice skew dups; add test Signed-off-by: Chris O'Haver <cohaver@infoblox.com> * remove todo Signed-off-by: Chris O'Haver <cohaver@infoblox.com> * address various feedback Signed-off-by: Chris O'Haver <cohaver@infoblox.com> * consolidate endpoint/slice informer code Signed-off-by: Chris O'Haver <cohaver@infoblox.com> * fix endpoint informer consolidation; use clearer func name Signed-off-by: Chris O'Haver <cohaver@infoblox.com> * log info; use major/minor fields Signed-off-by: Chris O'Haver <cohaver@infoblox.com> * fix nsAddr and unit test Signed-off-by: Chris O'Haver <cohaver@infoblox.com> * add latency tracking for endpointslices Signed-off-by: Chris O'Haver <cohaver@infoblox.com> * endpointslice latency unit test & fix Signed-off-by: Chris O'Haver <cohaver@infoblox.com> * code shuffling Signed-off-by: Chris O'Haver <cohaver@infoblox.com> * rename endpointslices in tests Signed-off-by: Chris O'Haver <cohaver@infoblox.com> * remove de-dup from nsAddrs and test Signed-off-by: Chris O'Haver <cohaver@infoblox.com> * remove de-dup from findServices / test Signed-off-by: Chris O'Haver <cohaver@infoblox.com>
Diffstat (limited to 'plugin/kubernetes/controller.go')
-rw-r--r--plugin/kubernetes/controller.go113
1 files changed, 95 insertions, 18 deletions
diff --git a/plugin/kubernetes/controller.go b/plugin/kubernetes/controller.go
index 90a005177..f9373eee9 100644
--- a/plugin/kubernetes/controller.go
+++ b/plugin/kubernetes/controller.go
@@ -11,16 +11,18 @@ import (
"github.com/coredns/coredns/plugin/kubernetes/object"
api "k8s.io/api/core/v1"
+ discovery "k8s.io/api/discovery/v1beta1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
)
const (
podIPIndex = "PodIP"
- svcNameNamespaceIndex = "NameNamespace"
+ svcNameNamespaceIndex = "ServiceNameNamespace"
svcIPIndex = "ServiceIP"
epNameNamespaceIndex = "EndpointNameNamespace"
epIPIndex = "EndpointsIP"
@@ -81,6 +83,7 @@ type dnsControl struct {
type dnsControlOpts struct {
initPodCache bool
initEndpointsCache bool
+ useEndpointSlices bool
ignoreEmptyService bool
// Label handling.
@@ -130,15 +133,31 @@ func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, opts
}
if opts.initEndpointsCache {
+ var (
+ apiObj runtime.Object
+ listWatch cache.ListWatch
+ to func(bool) object.ToFunc
+ latency object.RecordLatencyFunc
+ )
+ if opts.useEndpointSlices {
+ apiObj = &discovery.EndpointSlice{}
+ listWatch.ListFunc = endpointSliceListFunc(ctx, dns.client, api.NamespaceAll, dns.selector)
+ listWatch.WatchFunc = endpointSliceWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector)
+ to = object.EndpointSliceToEndpoints
+ latency = dns.recordEndpointSliceDNSProgrammingLatency
+ } else {
+ apiObj = &api.Endpoints{}
+ listWatch.ListFunc = endpointsListFunc(ctx, dns.client, api.NamespaceAll, dns.selector)
+ listWatch.WatchFunc = endpointsWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector)
+ to = object.ToEndpoints
+ latency = dns.recordEndpointDNSProgrammingLatency
+ }
dns.epLister, dns.epController = object.NewIndexerInformer(
- &cache.ListWatch{
- ListFunc: endpointsListFunc(ctx, dns.client, api.NamespaceAll, dns.selector),
- WatchFunc: endpointsWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector),
- },
- &api.Endpoints{},
+ &listWatch,
+ apiObj,
cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete},
cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc, epIPIndex: epIPIndexFunc},
- object.DefaultProcessor(object.ToEndpoints(opts.skipAPIObjectsCleanup), dns.recordDNSProgrammingLatency),
+ object.DefaultProcessor(to(opts.skipAPIObjectsCleanup), latency),
)
}
@@ -154,8 +173,12 @@ func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, opts
return &dns
}
-func (dns *dnsControl) recordDNSProgrammingLatency(obj meta.Object) {
- recordDNSProgrammingLatency(dns.getServices(obj.(*api.Endpoints)), obj.(*api.Endpoints))
+func (dns *dnsControl) recordEndpointDNSProgrammingLatency(obj meta.Object) {
+ recordDNSProgrammingLatency(dns.getServices(obj.(*api.Endpoints)), obj)
+}
+
+func (dns *dnsControl) recordEndpointSliceDNSProgrammingLatency(obj meta.Object) {
+ recordDNSProgrammingLatency(dns.SvcIndex(object.ServiceKey(obj.GetLabels()[discovery.LabelServiceName], obj.GetNamespace())), obj)
}
func podIPIndexFunc(obj interface{}) ([]string, error) {
@@ -207,8 +230,7 @@ func serviceListFunc(ctx context.Context, c kubernetes.Interface, ns string, s l
if s != nil {
opts.LabelSelector = s.String()
}
- listV1, err := c.CoreV1().Services(ns).List(ctx, opts)
- return listV1, err
+ return c.CoreV1().Services(ns).List(ctx, opts)
}
}
@@ -221,8 +243,16 @@ func podListFunc(ctx context.Context, c kubernetes.Interface, ns string, s label
opts.FieldSelector = opts.FieldSelector + ","
}
opts.FieldSelector = opts.FieldSelector + "status.phase!=Succeeded,status.phase!=Failed,status.phase!=Unknown"
- listV1, err := c.CoreV1().Pods(ns).List(ctx, opts)
- return listV1, err
+ return c.CoreV1().Pods(ns).List(ctx, opts)
+ }
+}
+
+func endpointSliceListFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) {
+ return func(opts meta.ListOptions) (runtime.Object, error) {
+ if s != nil {
+ opts.LabelSelector = s.String()
+ }
+ return c.DiscoveryV1beta1().EndpointSlices(ns).List(ctx, opts)
}
}
@@ -231,8 +261,7 @@ func endpointsListFunc(ctx context.Context, c kubernetes.Interface, ns string, s
if s != nil {
opts.LabelSelector = s.String()
}
- listV1, err := c.CoreV1().Endpoints(ns).List(ctx, opts)
- return listV1, err
+ return c.CoreV1().Endpoints(ns).List(ctx, opts)
}
}
@@ -241,8 +270,56 @@ func namespaceListFunc(ctx context.Context, c kubernetes.Interface, s labels.Sel
if s != nil {
opts.LabelSelector = s.String()
}
- listV1, err := c.CoreV1().Namespaces().List(ctx, opts)
- return listV1, err
+ return c.CoreV1().Namespaces().List(ctx, opts)
+ }
+}
+
+func serviceWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
+ return func(options meta.ListOptions) (watch.Interface, error) {
+ if s != nil {
+ options.LabelSelector = s.String()
+ }
+ return c.CoreV1().Services(ns).Watch(ctx, options)
+ }
+}
+
+func podWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
+ return func(options meta.ListOptions) (watch.Interface, error) {
+ if s != nil {
+ options.LabelSelector = s.String()
+ }
+ if len(options.FieldSelector) > 0 {
+ options.FieldSelector = options.FieldSelector + ","
+ }
+ options.FieldSelector = options.FieldSelector + "status.phase!=Succeeded,status.phase!=Failed,status.phase!=Unknown"
+ return c.CoreV1().Pods(ns).Watch(ctx, options)
+ }
+}
+
+func endpointSliceWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
+ return func(options meta.ListOptions) (watch.Interface, error) {
+ if s != nil {
+ options.LabelSelector = s.String()
+ }
+ return c.DiscoveryV1beta1().EndpointSlices(ns).Watch(ctx, options)
+ }
+}
+
+func endpointsWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
+ return func(options meta.ListOptions) (watch.Interface, error) {
+ if s != nil {
+ options.LabelSelector = s.String()
+ }
+ return c.CoreV1().Endpoints(ns).Watch(ctx, options)
+ }
+}
+
+func namespaceWatchFunc(ctx context.Context, c kubernetes.Interface, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
+ return func(options meta.ListOptions) (watch.Interface, error) {
+ if s != nil {
+ options.LabelSelector = s.String()
+ }
+ return c.CoreV1().Namespaces().Watch(ctx, options)
}
}
@@ -442,7 +519,7 @@ func (dns *dnsControl) detectChanges(oldObj, newObj interface{}) {
}
func (dns *dnsControl) getServices(endpoints *api.Endpoints) []*object.Service {
- return dns.SvcIndex(object.EndpointsKey(endpoints.GetName(), endpoints.GetNamespace()))
+ return dns.SvcIndex(object.ServiceKey(endpoints.GetName(), endpoints.GetNamespace()))
}
// subsetsEquivalent checks if two endpoint subsets are significantly equivalent