aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--plugin/kubernetes/README.md4
-rw-r--r--plugin/kubernetes/controller.go88
-rw-r--r--plugin/kubernetes/kubernetes.go80
-rw-r--r--plugin/kubernetes/object/endpoint.go105
4 files changed, 1 insertions, 276 deletions
diff --git a/plugin/kubernetes/README.md b/plugin/kubernetes/README.md
index a62455630..0c50333e9 100644
--- a/plugin/kubernetes/README.md
+++ b/plugin/kubernetes/README.md
@@ -114,9 +114,7 @@ that has not yet been synchronized.
## Monitoring Kubernetes Endpoints
-By default the *kubernetes* plugin watches Endpoints via the `discovery.EndpointSlices` API. However the
-`api.Endpoints` API is used instead if the Kubernetes version does not support the `EndpointSliceProxying`
-feature gate by default (i.e. Kubernetes version < 1.19).
+The *kubernetes* plugin watches Endpoints via the `discovery.EndpointSlices` API.
## Ready
diff --git a/plugin/kubernetes/controller.go b/plugin/kubernetes/controller.go
index a785a003d..e7db294fc 100644
--- a/plugin/kubernetes/controller.go
+++ b/plugin/kubernetes/controller.go
@@ -12,7 +12,6 @@ import (
api "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
- discoveryV1beta1 "k8s.io/api/discovery/v1beta1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
@@ -66,10 +65,6 @@ type dnsControl struct {
selector labels.Selector
namespaceSelector labels.Selector
- // epLock is used to lock reads of epLister and epController while they are being replaced
- // with the api.Endpoints Lister/Controller on k8s systems that don't use discovery.EndpointSlices
- epLock sync.RWMutex
-
svcController cache.Controller
podController cache.Controller
epController cache.Controller
@@ -153,12 +148,10 @@ func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, opts
cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc, epIPIndex: epIPIndexFunc},
object.DefaultProcessor(object.EndpointSliceToEndpoints, dns.EndpointSliceLatencyRecorder()),
)
- dns.epLock.Lock()
dns.epLister = epLister
if opts.initEndpointsCache {
dns.epController = epController
}
- dns.epLock.Unlock()
dns.nsLister, dns.nsController = object.NewIndexerInformer(
&cache.ListWatch{
@@ -174,42 +167,6 @@ func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, opts
return &dns
}
-// WatchEndpoints will set the endpoint Lister and Controller to watch object.Endpoints
-// instead of the default discovery.EndpointSlice. This is used in older k8s clusters where
-// discovery.EndpointSlice is not fully supported.
-// This can be removed when all supported k8s versions fully support EndpointSlice.
-func (dns *dnsControl) WatchEndpoints(ctx context.Context) {
- dns.epLock.Lock()
- dns.epLister, dns.epController = object.NewIndexerInformer(
- &cache.ListWatch{
- ListFunc: endpointsListFunc(ctx, dns.client, api.NamespaceAll, dns.selector),
- WatchFunc: endpointsWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector),
- },
- &api.Endpoints{},
- cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete},
- cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc, epIPIndex: epIPIndexFunc},
- object.DefaultProcessor(object.ToEndpoints, dns.EndpointsLatencyRecorder()),
- )
- dns.epLock.Unlock()
-}
-
-// WatchEndpointSliceV1beta1 will set the endpoint Lister and Controller to watch v1beta1
-// instead of the default v1.
-func (dns *dnsControl) WatchEndpointSliceV1beta1(ctx context.Context) {
- dns.epLock.Lock()
- dns.epLister, dns.epController = object.NewIndexerInformer(
- &cache.ListWatch{
- ListFunc: endpointSliceListFuncV1beta1(ctx, dns.client, api.NamespaceAll, dns.selector),
- WatchFunc: endpointSliceWatchFuncV1beta1(ctx, dns.client, api.NamespaceAll, dns.selector),
- },
- &discoveryV1beta1.EndpointSlice{},
- cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete},
- cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc, epIPIndex: epIPIndexFunc},
- object.DefaultProcessor(object.EndpointSliceV1beta1ToEndpoints, dns.EndpointSliceLatencyRecorder()),
- )
- dns.epLock.Unlock()
-}
-
func (dns *dnsControl) EndpointsLatencyRecorder() *object.EndpointLatencyRecorder {
return &object.EndpointLatencyRecorder{
ServiceFunc: func(o meta.Object) []*object.Service {
@@ -298,14 +255,6 @@ func podListFunc(ctx context.Context, c kubernetes.Interface, ns string, s label
return c.CoreV1().Pods(ns).List(ctx, opts)
}
}
-func endpointSliceListFuncV1beta1(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) {
- return func(opts meta.ListOptions) (runtime.Object, error) {
- if s != nil {
- opts.LabelSelector = s.String()
- }
- return c.DiscoveryV1beta1().EndpointSlices(ns).List(ctx, opts)
- }
-}
func endpointSliceListFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) {
return func(opts meta.ListOptions) (runtime.Object, error) {
@@ -316,15 +265,6 @@ func endpointSliceListFunc(ctx context.Context, c kubernetes.Interface, ns strin
}
}
-func endpointsListFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) {
- return func(opts meta.ListOptions) (runtime.Object, error) {
- if s != nil {
- opts.LabelSelector = s.String()
- }
- return c.CoreV1().Endpoints(ns).List(ctx, opts)
- }
-}
-
func namespaceListFunc(ctx context.Context, c kubernetes.Interface, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) {
return func(opts meta.ListOptions) (runtime.Object, error) {
if s != nil {
@@ -356,15 +296,6 @@ func podWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labe
}
}
-func endpointSliceWatchFuncV1beta1(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
- return func(options meta.ListOptions) (watch.Interface, error) {
- if s != nil {
- options.LabelSelector = s.String()
- }
- return c.DiscoveryV1beta1().EndpointSlices(ns).Watch(ctx, options)
- }
-}
-
func endpointSliceWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
return func(options meta.ListOptions) (watch.Interface, error) {
if s != nil {
@@ -374,15 +305,6 @@ func endpointSliceWatchFunc(ctx context.Context, c kubernetes.Interface, ns stri
}
}
-func endpointsWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
- return func(options meta.ListOptions) (watch.Interface, error) {
- if s != nil {
- options.LabelSelector = s.String()
- }
- return c.CoreV1().Endpoints(ns).Watch(ctx, options)
- }
-}
-
func namespaceWatchFunc(ctx context.Context, c kubernetes.Interface, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
return func(options meta.ListOptions) (watch.Interface, error) {
if s != nil {
@@ -413,9 +335,7 @@ func (dns *dnsControl) Run() {
go dns.svcController.Run(dns.stopCh)
if dns.epController != nil {
go func() {
- dns.epLock.RLock()
dns.epController.Run(dns.stopCh)
- dns.epLock.RUnlock()
}()
}
if dns.podController != nil {
@@ -430,9 +350,7 @@ func (dns *dnsControl) HasSynced() bool {
a := dns.svcController.HasSynced()
b := true
if dns.epController != nil {
- dns.epLock.RLock()
b = dns.epController.HasSynced()
- dns.epLock.RUnlock()
}
c := true
if dns.podController != nil {
@@ -455,8 +373,6 @@ func (dns *dnsControl) ServiceList() (svcs []*object.Service) {
}
func (dns *dnsControl) EndpointsList() (eps []*object.Endpoints) {
- dns.epLock.RLock()
- defer dns.epLock.RUnlock()
os := dns.epLister.List()
for _, o := range os {
ep, ok := o.(*object.Endpoints)
@@ -531,8 +447,6 @@ func (dns *dnsControl) SvcExtIndexReverse(ip string) (svcs []*object.Service) {
}
func (dns *dnsControl) EpIndex(idx string) (ep []*object.Endpoints) {
- dns.epLock.RLock()
- defer dns.epLock.RUnlock()
os, err := dns.epLister.ByIndex(epNameNamespaceIndex, idx)
if err != nil {
return nil
@@ -548,8 +462,6 @@ func (dns *dnsControl) EpIndex(idx string) (ep []*object.Endpoints) {
}
func (dns *dnsControl) EpIndexReverse(ip string) (ep []*object.Endpoints) {
- dns.epLock.RLock()
- defer dns.epLock.RUnlock()
os, err := dns.epLister.ByIndex(epIPIndex, ip)
if err != nil {
return nil
diff --git a/plugin/kubernetes/kubernetes.go b/plugin/kubernetes/kubernetes.go
index 9e4633ca8..14ea031a0 100644
--- a/plugin/kubernetes/kubernetes.go
+++ b/plugin/kubernetes/kubernetes.go
@@ -6,7 +6,6 @@ import (
"errors"
"fmt"
"net"
- "strconv"
"strings"
"time"
@@ -19,9 +18,6 @@ import (
"github.com/miekg/dns"
api "k8s.io/api/core/v1"
- discovery "k8s.io/api/discovery/v1"
- discoveryV1beta1 "k8s.io/api/discovery/v1beta1"
- kerrors "k8s.io/apimachinery/pkg/api/errors"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
@@ -262,22 +258,8 @@ func (k *Kubernetes) InitKubeCache(ctx context.Context) (onStart func() error, o
k.APIConn = newdnsController(ctx, kubeClient, k.opts)
- initEndpointWatch := k.opts.initEndpointsCache
-
onStart = func() error {
go func() {
- if initEndpointWatch {
- // Revert to watching Endpoints for incompatible K8s.
- // This can be removed when all supported k8s versions support endpointslices.
- ok, v := k.endpointSliceSupported(kubeClient)
- if !ok {
- k.APIConn.(*dnsControl).WatchEndpoints(ctx)
- }
- // Revert to EndpointSlice v1beta1 if v1 is not supported
- if ok && v == discoveryV1beta1.SchemeGroupVersion.String() {
- k.APIConn.(*dnsControl).WatchEndpointSliceV1beta1(ctx)
- }
- }
k.APIConn.Run()
}()
@@ -311,68 +293,6 @@ func (k *Kubernetes) InitKubeCache(ctx context.Context) (onStart func() error, o
return onStart, onShut, err
}
-// endpointSliceSupported will determine which endpoint object type to watch (endpointslices or endpoints)
-// based on the supportability of endpointslices in the API and server version. It will return true when endpointslices
-// should be watched, and false when endpoints should be watched.
-// If the API supports discovery, and the server versions >= 1.19, true is returned.
-// Also returned is the discovery version supported: "v1" if v1 is supported, and v1beta1 if v1beta1 is supported and
-// v1 is not supported.
-// This function should be removed, when all supported versions of k8s support v1.
-func (k *Kubernetes) endpointSliceSupported(kubeClient *kubernetes.Clientset) (bool, string) {
- ticker := time.NewTicker(100 * time.Millisecond)
- defer ticker.Stop()
- logTicker := time.NewTicker(10 * time.Second)
- defer logTicker.Stop()
- var connErr error
- for {
- select {
- case <-logTicker.C:
- if connErr == nil {
- continue
- }
- log.Warningf("Kubernetes API connection failure: %v", connErr)
- case <-ticker.C:
- sv, err := kubeClient.ServerVersion()
- if err != nil {
- connErr = err
- continue
- }
-
- // Disable use of endpoint slices for k8s versions 1.18 and earlier. The Endpointslices API was enabled
- // by default in 1.17 but Service -> Pod proxy continued to use Endpoints by default until 1.19.
- // DNS results should be built from the same source data that the proxy uses. This decision assumes
- // k8s EndpointSliceProxying feature gate is at the default (i.e. only enabled for k8s >= 1.19).
- major, _ := strconv.Atoi(sv.Major)
- minor, _ := strconv.Atoi(strings.TrimRight(sv.Minor, "+"))
- if major <= 1 && minor <= 18 {
- log.Info("Watching Endpoints instead of EndpointSlices in k8s versions < 1.19")
- return false, ""
- }
-
- // Enable use of endpoint slices if the API supports the discovery api
- _, err = kubeClient.Discovery().ServerResourcesForGroupVersion(discovery.SchemeGroupVersion.String())
- if err == nil {
- return true, discovery.SchemeGroupVersion.String()
- } else if !kerrors.IsNotFound(err) {
- connErr = err
- continue
- }
-
- _, err = kubeClient.Discovery().ServerResourcesForGroupVersion(discoveryV1beta1.SchemeGroupVersion.String())
- if err == nil {
- return true, discoveryV1beta1.SchemeGroupVersion.String()
- } else if !kerrors.IsNotFound(err) {
- connErr = err
- continue
- }
-
- // Disable use of endpoint slices in case that it is disabled in k8s versions 1.19 and newer.
- log.Info("Endpointslices API disabled. Watching Endpoints instead.")
- return false, ""
- }
- }
-}
-
// Records looks up services in kubernetes.
func (k *Kubernetes) Records(ctx context.Context, state request.Request, exact bool) ([]msg.Service, error) {
r, e := parseRequest(state.Name(), state.Zone)
diff --git a/plugin/kubernetes/object/endpoint.go b/plugin/kubernetes/object/endpoint.go
index 4af64f363..b8c2f63f7 100644
--- a/plugin/kubernetes/object/endpoint.go
+++ b/plugin/kubernetes/object/endpoint.go
@@ -3,9 +3,7 @@ package object
import (
"fmt"
- api "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
- discoveryV1beta1 "k8s.io/api/discovery/v1beta1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
)
@@ -48,60 +46,6 @@ type EndpointPort struct {
// EndpointsKey returns a string using for the index.
func EndpointsKey(name, namespace string) string { return name + "." + namespace }
-// ToEndpoints converts an *api.Endpoints to a *Endpoints.
-func ToEndpoints(obj meta.Object) (meta.Object, error) {
- end, ok := obj.(*api.Endpoints)
- if !ok {
- return nil, fmt.Errorf("unexpected object %v", obj)
- }
- e := &Endpoints{
- Version: end.GetResourceVersion(),
- Name: end.GetName(),
- Namespace: end.GetNamespace(),
- Index: EndpointsKey(end.GetName(), end.GetNamespace()),
- Subsets: make([]EndpointSubset, len(end.Subsets)),
- }
- for i, eps := range end.Subsets {
- sub := EndpointSubset{
- Addresses: make([]EndpointAddress, len(eps.Addresses)),
- }
- if len(eps.Ports) == 0 {
- // Add sentinel if there are no ports.
- sub.Ports = []EndpointPort{{Port: -1}}
- } else {
- sub.Ports = make([]EndpointPort, len(eps.Ports))
- }
-
- for j, a := range eps.Addresses {
- ea := EndpointAddress{IP: a.IP, Hostname: a.Hostname}
- if a.NodeName != nil {
- ea.NodeName = *a.NodeName
- }
- if a.TargetRef != nil {
- ea.TargetRefName = a.TargetRef.Name
- }
- sub.Addresses[j] = ea
- }
-
- for k, p := range eps.Ports {
- ep := EndpointPort{Port: p.Port, Name: p.Name, Protocol: string(p.Protocol)}
- sub.Ports[k] = ep
- }
-
- e.Subsets[i] = sub
- }
-
- for _, eps := range end.Subsets {
- for _, a := range eps.Addresses {
- e.IndexIP = append(e.IndexIP, a.IP)
- }
- }
-
- *end = api.Endpoints{}
-
- return e, nil
-}
-
// EndpointSliceToEndpoints converts a *discovery.EndpointSlice to a *Endpoints.
func EndpointSliceToEndpoints(obj meta.Object) (meta.Object, error) {
ends, ok := obj.(*discovery.EndpointSlice)
@@ -153,55 +97,6 @@ func EndpointSliceToEndpoints(obj meta.Object) (meta.Object, error) {
return e, nil
}
-// EndpointSliceV1beta1ToEndpoints converts a v1beta1 *discovery.EndpointSlice to a *Endpoints.
-func EndpointSliceV1beta1ToEndpoints(obj meta.Object) (meta.Object, error) {
- ends, ok := obj.(*discoveryV1beta1.EndpointSlice)
- if !ok {
- return nil, fmt.Errorf("unexpected object %v", obj)
- }
- e := &Endpoints{
- Version: ends.GetResourceVersion(),
- Name: ends.GetName(),
- Namespace: ends.GetNamespace(),
- Index: EndpointsKey(ends.Labels[discovery.LabelServiceName], ends.GetNamespace()),
- Subsets: make([]EndpointSubset, 1),
- }
-
- if len(ends.Ports) == 0 {
- // Add sentinel if there are no ports.
- e.Subsets[0].Ports = []EndpointPort{{Port: -1}}
- } else {
- e.Subsets[0].Ports = make([]EndpointPort, len(ends.Ports))
- for k, p := range ends.Ports {
- ep := EndpointPort{Port: *p.Port, Name: *p.Name, Protocol: string(*p.Protocol)}
- e.Subsets[0].Ports[k] = ep
- }
- }
-
- for _, end := range ends.Endpoints {
- if !endpointsliceReady(end.Conditions.Ready) {
- continue
- }
- for _, a := range end.Addresses {
- ea := EndpointAddress{IP: a}
- if end.Hostname != nil {
- ea.Hostname = *end.Hostname
- }
- // ignore pod names that are too long to be a valid label
- if end.TargetRef != nil && len(end.TargetRef.Name) < 64 {
- ea.TargetRefName = end.TargetRef.Name
- }
- // EndpointSlice does not contain NodeName, leave blank
- e.Subsets[0].Addresses = append(e.Subsets[0].Addresses, ea)
- e.IndexIP = append(e.IndexIP, a)
- }
- }
-
- *ends = discoveryV1beta1.EndpointSlice{}
-
- return e, nil
-}
-
func endpointsliceReady(ready *bool) bool {
// Per API docs: a nil value indicates an unknown state. In most cases consumers
// should interpret this unknown state as ready.