aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar Chris O'Haver <cohaver@infoblox.com> 2021-05-10 12:57:23 -0400
committerGravatar GitHub <noreply@github.com> 2021-05-10 09:57:23 -0700
commit24547447d0457b8600c63602d1a64a60d018c82f (patch)
tree98d7130c586273b5d9f7e458b104cb3625bf5d6e
parent73545522965e22bcd132a6ded4128d301fea5ca3 (diff)
downloadcoredns-24547447d0457b8600c63602d1a64a60d018c82f.tar.gz
coredns-24547447d0457b8600c63602d1a64a60d018c82f.tar.zst
coredns-24547447d0457b8600c63602d1a64a60d018c82f.zip
plugin/kubernetes: Support both v1 and v1beta1 EndpointSlices (#4570)
* support v1 and v1beta1 endpointslice Signed-off-by: Chris O'Haver <cohaver@infoblox.com> * update comments Signed-off-by: Chris O'Haver <cohaver@infoblox.com>
-rw-r--r--plugin/kubernetes/controller.go41
-rw-r--r--plugin/kubernetes/kubernetes.go29
-rw-r--r--plugin/kubernetes/object/endpoint.go48
3 files changed, 106 insertions, 12 deletions
diff --git a/plugin/kubernetes/controller.go b/plugin/kubernetes/controller.go
index 890785d71..f17576eb4 100644
--- a/plugin/kubernetes/controller.go
+++ b/plugin/kubernetes/controller.go
@@ -11,7 +11,8 @@ import (
"github.com/coredns/coredns/plugin/kubernetes/object"
api "k8s.io/api/core/v1"
- discovery "k8s.io/api/discovery/v1beta1"
+ discovery "k8s.io/api/discovery/v1"
+ discoveryV1beta1 "k8s.io/api/discovery/v1beta1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
@@ -180,6 +181,23 @@ func (dns *dnsControl) WatchEndpoints(ctx context.Context) {
dns.epLock.Unlock()
}
+// WatchEndpointSliceV1beta1 will set the endpoint Lister and Controller to watch v1beta1
+// instead of the default v1.
+func (dns *dnsControl) WatchEndpointSliceV1beta1(ctx context.Context) {
+ dns.epLock.Lock()
+ dns.epLister, dns.epController = object.NewIndexerInformer(
+ &cache.ListWatch{
+ ListFunc: endpointSliceListFuncV1beta1(ctx, dns.client, api.NamespaceAll, dns.selector),
+ WatchFunc: endpointSliceWatchFuncV1beta1(ctx, dns.client, api.NamespaceAll, dns.selector),
+ },
+ &discoveryV1beta1.EndpointSlice{},
+ cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete},
+ cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc, epIPIndex: epIPIndexFunc},
+ object.DefaultProcessor(object.EndpointSliceV1beta1ToEndpoints, dns.EndpointSliceLatencyRecorder()),
+ )
+ dns.epLock.Unlock()
+}
+
func (dns *dnsControl) EndpointsLatencyRecorder() *object.EndpointLatencyRecorder {
return &object.EndpointLatencyRecorder{
ServiceFunc: func(o meta.Object) []*object.Service {
@@ -262,13 +280,21 @@ func podListFunc(ctx context.Context, c kubernetes.Interface, ns string, s label
return c.CoreV1().Pods(ns).List(ctx, opts)
}
}
+func endpointSliceListFuncV1beta1(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) {
+ return func(opts meta.ListOptions) (runtime.Object, error) {
+ if s != nil {
+ opts.LabelSelector = s.String()
+ }
+ return c.DiscoveryV1beta1().EndpointSlices(ns).List(ctx, opts)
+ }
+}
func endpointSliceListFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) {
return func(opts meta.ListOptions) (runtime.Object, error) {
if s != nil {
opts.LabelSelector = s.String()
}
- return c.DiscoveryV1beta1().EndpointSlices(ns).List(ctx, opts)
+ return c.DiscoveryV1().EndpointSlices(ns).List(ctx, opts)
}
}
@@ -312,7 +338,7 @@ func podWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labe
}
}
-func endpointSliceWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
+func endpointSliceWatchFuncV1beta1(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
return func(options meta.ListOptions) (watch.Interface, error) {
if s != nil {
options.LabelSelector = s.String()
@@ -321,6 +347,15 @@ func endpointSliceWatchFunc(ctx context.Context, c kubernetes.Interface, ns stri
}
}
+func endpointSliceWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
+ return func(options meta.ListOptions) (watch.Interface, error) {
+ if s != nil {
+ options.LabelSelector = s.String()
+ }
+ return c.DiscoveryV1().EndpointSlices(ns).Watch(ctx, options)
+ }
+}
+
func endpointsWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
return func(options meta.ListOptions) (watch.Interface, error) {
if s != nil {
diff --git a/plugin/kubernetes/kubernetes.go b/plugin/kubernetes/kubernetes.go
index bf1f01664..ea99a81ce 100644
--- a/plugin/kubernetes/kubernetes.go
+++ b/plugin/kubernetes/kubernetes.go
@@ -20,7 +20,8 @@ import (
"github.com/miekg/dns"
api "k8s.io/api/core/v1"
- discovery "k8s.io/api/discovery/v1beta1"
+ discovery "k8s.io/api/discovery/v1"
+ discoveryV1beta1 "k8s.io/api/discovery/v1beta1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
@@ -256,10 +257,15 @@ func (k *Kubernetes) InitKubeCache(ctx context.Context) (onStart func() error, o
go func() {
if initEndpointWatch {
// Revert to watching Endpoints for incompatible K8s.
- // This can be remove when all supported k8s versions support endpointslices.
- if ok := k.endpointSliceSupported(kubeClient); !ok {
+ // This can be removed when all supported k8s versions support endpointslices.
+ ok, v := k.endpointSliceSupported(kubeClient)
+ if !ok {
k.APIConn.(*dnsControl).WatchEndpoints(ctx)
}
+ // Revert to EndpointSlice v1beta1 if v1 is not supported
+ if ok && v == discoveryV1beta1.SchemeGroupVersion.String() {
+ k.APIConn.(*dnsControl).WatchEndpointSliceV1beta1(ctx)
+ }
}
k.APIConn.Run()
}()
@@ -290,9 +296,12 @@ func (k *Kubernetes) InitKubeCache(ctx context.Context) (onStart func() error, o
// endpointSliceSupported will determine which endpoint object type to watch (endpointslices or endpoints)
// based on the supportability of endpointslices in the API and server version. It will return true when endpointslices
// should be watched, and false when endpoints should be watched.
-// If the API supports discovery v1 beta1, and the server versions >= 1.19, endpointslices are watched.
-// This function should be removed, along with non-slice endpoint watch code, when support for k8s < 1.19 is dropped.
-func (k *Kubernetes) endpointSliceSupported(kubeClient *kubernetes.Clientset) bool {
+// If the API supports discovery, and the server versions >= 1.19, true is returned.
+// Also returned is the discovery version supported: "v1" if v1 is supported, and v1beta1 if v1beta1 is supported and
+// v1 is not supported.
+// This function should be removed, when all supported versions of k8s support v1.
+func (k *Kubernetes) endpointSliceSupported(kubeClient *kubernetes.Clientset) (bool, string) {
+ var sliceVer string
useEndpointSlices := false
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
@@ -303,9 +312,13 @@ func (k *Kubernetes) endpointSliceSupported(kubeClient *kubernetes.Clientset) bo
if err != nil {
continue
}
- // Enable use of endpoint slices if the API supports the discovery v1 beta1 api
+ // Enable use of endpoint slices if the API supports the discovery api
if _, err := kubeClient.Discovery().ServerResourcesForGroupVersion(discovery.SchemeGroupVersion.String()); err == nil {
useEndpointSlices = true
+ sliceVer = discovery.SchemeGroupVersion.String()
+ } else if _, err := kubeClient.Discovery().ServerResourcesForGroupVersion(discoveryV1beta1.SchemeGroupVersion.String()); err == nil {
+ useEndpointSlices = true
+ sliceVer = discoveryV1beta1.SchemeGroupVersion.String()
}
// Disable use of endpoint slices for k8s versions 1.18 and earlier. The Endpointslices API was enabled
// by default in 1.17 but Service -> Pod proxy continued to use Endpoints by default until 1.19.
@@ -317,7 +330,7 @@ func (k *Kubernetes) endpointSliceSupported(kubeClient *kubernetes.Clientset) bo
log.Info("Watching Endpoints instead of EndpointSlices in k8s versions < 1.19")
useEndpointSlices = false
}
- return useEndpointSlices
+ return useEndpointSlices, sliceVer
}
}
}
diff --git a/plugin/kubernetes/object/endpoint.go b/plugin/kubernetes/object/endpoint.go
index fc25aa4fb..9a5347a9b 100644
--- a/plugin/kubernetes/object/endpoint.go
+++ b/plugin/kubernetes/object/endpoint.go
@@ -4,7 +4,8 @@ import (
"fmt"
api "k8s.io/api/core/v1"
- discovery "k8s.io/api/discovery/v1beta1"
+ discovery "k8s.io/api/discovery/v1"
+ discoveryV1beta1 "k8s.io/api/discovery/v1beta1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
)
@@ -151,6 +152,51 @@ func EndpointSliceToEndpoints(obj meta.Object) (meta.Object, error) {
return e, nil
}
+// EndpointSliceV1beta1ToEndpoints converts a v1beta1 *discovery.EndpointSlice to a *Endpoints.
+func EndpointSliceV1beta1ToEndpoints(obj meta.Object) (meta.Object, error) {
+ ends, ok := obj.(*discoveryV1beta1.EndpointSlice)
+ if !ok {
+ return nil, fmt.Errorf("unexpected object %v", obj)
+ }
+ e := &Endpoints{
+ Version: ends.GetResourceVersion(),
+ Name: ends.GetName(),
+ Namespace: ends.GetNamespace(),
+ Index: EndpointsKey(ends.Labels[discovery.LabelServiceName], ends.GetNamespace()),
+ Subsets: make([]EndpointSubset, 1),
+ }
+
+ if len(ends.Ports) == 0 {
+ // Add sentinel if there are no ports.
+ e.Subsets[0].Ports = []EndpointPort{{Port: -1}}
+ } else {
+ e.Subsets[0].Ports = make([]EndpointPort, len(ends.Ports))
+ for k, p := range ends.Ports {
+ ep := EndpointPort{Port: *p.Port, Name: *p.Name, Protocol: string(*p.Protocol)}
+ e.Subsets[0].Ports[k] = ep
+ }
+ }
+
+ for _, end := range ends.Endpoints {
+ for _, a := range end.Addresses {
+ ea := EndpointAddress{IP: a}
+ if end.Hostname != nil {
+ ea.Hostname = *end.Hostname
+ }
+ if end.TargetRef != nil {
+ ea.TargetRefName = end.TargetRef.Name
+ }
+ // EndpointSlice does not contain NodeName, leave blank
+ e.Subsets[0].Addresses = append(e.Subsets[0].Addresses, ea)
+ e.IndexIP = append(e.IndexIP, a)
+ }
+ }
+
+ *ends = discoveryV1beta1.EndpointSlice{}
+
+ return e, nil
+}
+
// CopyWithoutSubsets copies e, without the subsets.
func (e *Endpoints) CopyWithoutSubsets() *Endpoints {
e1 := &Endpoints{