aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGravatar Chris O'Haver <cohaver@infoblox.com> 2021-03-26 08:54:39 -0400
committerGravatar GitHub <noreply@github.com> 2021-03-26 08:54:39 -0400
commit9f72db12e73d6f1b7f69812baa4381fc7a1f04b3 (patch)
treee97d2b76c4c7ad75dbd4213d77f69cc0f6a8b1b3
parentea41dd23a01d653b01d153a868d586fac4d4381a (diff)
downloadcoredns-9f72db12e73d6f1b7f69812baa4381fc7a1f04b3.tar.gz
coredns-9f72db12e73d6f1b7f69812baa4381fc7a1f04b3.tar.zst
coredns-9f72db12e73d6f1b7f69812baa4381fc7a1f04b3.zip
plugin/kubernetes: do endpoint/slice check in retry loop (#4492)
* do endpoint/slice check in retry loop Signed-off-by: Chris O'Haver <cohaver@infoblox.com>
-rw-r--r--plugin/kubernetes/README.md14
-rw-r--r--plugin/kubernetes/controller.go68
-rw-r--r--plugin/kubernetes/kubernetes.go95
-rw-r--r--plugin/kubernetes/setup.go36
4 files changed, 140 insertions, 73 deletions
diff --git a/plugin/kubernetes/README.md b/plugin/kubernetes/README.md
index 4557300c2..655e24a74 100644
--- a/plugin/kubernetes/README.md
+++ b/plugin/kubernetes/README.md
@@ -104,6 +104,20 @@ kubernetes [ZONES...] {
Enabling zone transfer is done by using the *transfer* plugin.
+## Startup
+
+When CoreDNS starts with the *kubernetes* plugin enabled, it will delay serving DNS for up to 5 seconds
+until it can connect to the Kubernetes API and synchronize all object watches. If this cannot happen within
+5 seconds, then CoreDNS will start serving DNS while the *kubernetes* plugin continues to try to connect
+and synchronize all object watches. CoreDNS will answer SERVFAIL to any request made for a Kubernetes record
+that has not yet been synchronized.
+
+## Monitoring Kubernetes Endpoints
+
+By default the *kubernetes* plugin watches Endpoints via the `discovery.EndpointSlices` API. However the
+`api.Endpoints` API is used instead if the Kubernetes version does not support the `EndpointSliceProxying`
+feature gate by default (i.e. Kubernetes version < 1.19).
+
## Ready
This plugin reports readiness to the ready plugin. This will happen after it has synced to the
diff --git a/plugin/kubernetes/controller.go b/plugin/kubernetes/controller.go
index d10d9f313..890785d71 100644
--- a/plugin/kubernetes/controller.go
+++ b/plugin/kubernetes/controller.go
@@ -59,6 +59,10 @@ type dnsControl struct {
selector labels.Selector
namespaceSelector labels.Selector
+ // epLock is used to lock reads of epLister and epController while they are being replaced
+ // with the api.Endpoints Lister/Controller on k8s systems that don't use discovery.EndpointSlices
+ epLock sync.RWMutex
+
svcController cache.Controller
podController cache.Controller
epController cache.Controller
@@ -83,7 +87,6 @@ type dnsControl struct {
type dnsControlOpts struct {
initPodCache bool
initEndpointsCache bool
- useEndpointSlices bool
ignoreEmptyService bool
// Label handling.
@@ -132,32 +135,18 @@ func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, opts
}
if opts.initEndpointsCache {
- var (
- apiObj runtime.Object
- listWatch cache.ListWatch
- to object.ToFunc
- latency *object.EndpointLatencyRecorder
- )
- if opts.useEndpointSlices {
- apiObj = &discovery.EndpointSlice{}
- listWatch.ListFunc = endpointSliceListFunc(ctx, dns.client, api.NamespaceAll, dns.selector)
- listWatch.WatchFunc = endpointSliceWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector)
- to = object.EndpointSliceToEndpoints
- latency = dns.EndpointSliceLatencyRecorder()
- } else {
- apiObj = &api.Endpoints{}
- listWatch.ListFunc = endpointsListFunc(ctx, dns.client, api.NamespaceAll, dns.selector)
- listWatch.WatchFunc = endpointsWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector)
- to = object.ToEndpoints
- latency = dns.EndpointsLatencyRecorder()
- }
+ dns.epLock.Lock()
dns.epLister, dns.epController = object.NewIndexerInformer(
- &listWatch,
- apiObj,
+ &cache.ListWatch{
+ ListFunc: endpointSliceListFunc(ctx, dns.client, api.NamespaceAll, dns.selector),
+ WatchFunc: endpointSliceWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector),
+ },
+ &discovery.EndpointSlice{},
cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete},
cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc, epIPIndex: epIPIndexFunc},
- object.DefaultProcessor(to, latency),
+ object.DefaultProcessor(object.EndpointSliceToEndpoints, dns.EndpointSliceLatencyRecorder()),
)
+ dns.epLock.Unlock()
}
dns.nsLister, dns.nsController = cache.NewInformer(
@@ -172,6 +161,25 @@ func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, opts
return &dns
}
+// WatchEndpoints will set the endpoint Lister and Controller to watch object.Endpoints
+// instead of the default discovery.EndpointSlice. This is used in older k8s clusters where
+// discovery.EndpointSlice is not fully supported.
+// This can be removed when all supported k8s versions fully support EndpointSlice.
+func (dns *dnsControl) WatchEndpoints(ctx context.Context) {
+ dns.epLock.Lock()
+ dns.epLister, dns.epController = object.NewIndexerInformer(
+ &cache.ListWatch{
+ ListFunc: endpointsListFunc(ctx, dns.client, api.NamespaceAll, dns.selector),
+ WatchFunc: endpointsWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector),
+ },
+ &api.Endpoints{},
+ cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete},
+ cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc, epIPIndex: epIPIndexFunc},
+ object.DefaultProcessor(object.ToEndpoints, dns.EndpointsLatencyRecorder()),
+ )
+ dns.epLock.Unlock()
+}
+
func (dns *dnsControl) EndpointsLatencyRecorder() *object.EndpointLatencyRecorder {
return &object.EndpointLatencyRecorder{
ServiceFunc: func(o meta.Object) []*object.Service {
@@ -351,7 +359,11 @@ func (dns *dnsControl) Stop() error {
func (dns *dnsControl) Run() {
go dns.svcController.Run(dns.stopCh)
if dns.epController != nil {
- go dns.epController.Run(dns.stopCh)
+ go func() {
+ dns.epLock.RLock()
+ dns.epController.Run(dns.stopCh)
+ dns.epLock.RUnlock()
+ }()
}
if dns.podController != nil {
go dns.podController.Run(dns.stopCh)
@@ -365,7 +377,9 @@ func (dns *dnsControl) HasSynced() bool {
a := dns.svcController.HasSynced()
b := true
if dns.epController != nil {
+ dns.epLock.RLock()
b = dns.epController.HasSynced()
+ dns.epLock.RUnlock()
}
c := true
if dns.podController != nil {
@@ -388,6 +402,8 @@ func (dns *dnsControl) ServiceList() (svcs []*object.Service) {
}
func (dns *dnsControl) EndpointsList() (eps []*object.Endpoints) {
+ dns.epLock.RLock()
+ defer dns.epLock.RUnlock()
os := dns.epLister.List()
for _, o := range os {
ep, ok := o.(*object.Endpoints)
@@ -446,6 +462,8 @@ func (dns *dnsControl) SvcIndexReverse(ip string) (svcs []*object.Service) {
}
func (dns *dnsControl) EpIndex(idx string) (ep []*object.Endpoints) {
+ dns.epLock.RLock()
+ defer dns.epLock.RUnlock()
os, err := dns.epLister.ByIndex(epNameNamespaceIndex, idx)
if err != nil {
return nil
@@ -461,6 +479,8 @@ func (dns *dnsControl) EpIndex(idx string) (ep []*object.Endpoints) {
}
func (dns *dnsControl) EpIndexReverse(ip string) (ep []*object.Endpoints) {
+ dns.epLock.RLock()
+ defer dns.epLock.RUnlock()
os, err := dns.epLister.ByIndex(epIPIndex, ip)
if err != nil {
return nil
diff --git a/plugin/kubernetes/kubernetes.go b/plugin/kubernetes/kubernetes.go
index 7b3d822c2..bf1f01664 100644
--- a/plugin/kubernetes/kubernetes.go
+++ b/plugin/kubernetes/kubernetes.go
@@ -8,6 +8,7 @@ import (
"net"
"strconv"
"strings"
+ "time"
"github.com/coredns/coredns/plugin"
"github.com/coredns/coredns/plugin/etcd/msg"
@@ -213,22 +214,22 @@ func (k *Kubernetes) getClientConfig() (*rest.Config, error) {
}
// InitKubeCache initializes a new Kubernetes cache.
-func (k *Kubernetes) InitKubeCache(ctx context.Context) (err error) {
+func (k *Kubernetes) InitKubeCache(ctx context.Context) (onStart func() error, onShut func() error, err error) {
config, err := k.getClientConfig()
if err != nil {
- return err
+ return nil, nil, err
}
kubeClient, err := kubernetes.NewForConfig(config)
if err != nil {
- return fmt.Errorf("failed to create kubernetes notification controller: %q", err)
+ return nil, nil, fmt.Errorf("failed to create kubernetes notification controller: %q", err)
}
if k.opts.labelSelector != nil {
var selector labels.Selector
selector, err = meta.LabelSelectorAsSelector(k.opts.labelSelector)
if err != nil {
- return fmt.Errorf("unable to create Selector for LabelSelector '%s': %q", k.opts.labelSelector, err)
+ return nil, nil, fmt.Errorf("unable to create Selector for LabelSelector '%s': %q", k.opts.labelSelector, err)
}
k.opts.selector = selector
}
@@ -237,7 +238,7 @@ func (k *Kubernetes) InitKubeCache(ctx context.Context) (err error) {
var selector labels.Selector
selector, err = meta.LabelSelectorAsSelector(k.opts.namespaceLabelSelector)
if err != nil {
- return fmt.Errorf("unable to create Selector for LabelSelector '%s': %q", k.opts.namespaceLabelSelector, err)
+ return nil, nil, fmt.Errorf("unable to create Selector for LabelSelector '%s': %q", k.opts.namespaceLabelSelector, err)
}
k.opts.namespaceSelector = selector
}
@@ -246,25 +247,79 @@ func (k *Kubernetes) InitKubeCache(ctx context.Context) (err error) {
k.opts.zones = k.Zones
k.opts.endpointNameMode = k.endpointNameMode
- // Enable use of endpoint slices if the API supports the discovery v1 beta1 api
- if _, err := kubeClient.Discovery().ServerResourcesForGroupVersion(discovery.SchemeGroupVersion.String()); err == nil {
- k.opts.useEndpointSlices = true
- }
- // Disable use of endpoint slices for k8s versions 1.18 and earlier. Endpoint slices were
- // introduced in 1.17 but EndpointSliceMirroring was not added until 1.19.
- // if err != nil, we continue with the above default which is to use endpoint slices.
- if sv, err := kubeClient.ServerVersion(); err == nil {
- major, _ := strconv.Atoi(sv.Major)
- minor, _ := strconv.Atoi(strings.TrimRight(sv.Minor, "+"))
- if k.opts.useEndpointSlices && major <= 1 && minor <= 18 {
- log.Info("Watching Endpoints instead of EndpointSlices in k8s versions < 1.19")
- k.opts.useEndpointSlices = false
+
+ k.APIConn = newdnsController(ctx, kubeClient, k.opts)
+
+ initEndpointWatch := k.opts.initEndpointsCache
+
+ onStart = func() error {
+ go func() {
+ if initEndpointWatch {
+ // Revert to watching Endpoints for incompatible K8s.
+ // This can be remove when all supported k8s versions support endpointslices.
+ if ok := k.endpointSliceSupported(kubeClient); !ok {
+ k.APIConn.(*dnsControl).WatchEndpoints(ctx)
+ }
+ }
+ k.APIConn.Run()
+ }()
+
+ timeout := time.After(5 * time.Second)
+ ticker := time.NewTicker(100 * time.Millisecond)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-ticker.C:
+ if k.APIConn.HasSynced() {
+ return nil
+ }
+ case <-timeout:
+ log.Warning("starting server with unsynced Kubernetes API")
+ return nil
+ }
}
}
- k.APIConn = newdnsController(ctx, kubeClient, k.opts)
+ onShut = func() error {
+ return k.APIConn.Stop()
+ }
+
+ return onStart, onShut, err
+}
- return err
+// endpointSliceSupported will determine which endpoint object type to watch (endpointslices or endpoints)
+// based on the supportability of endpointslices in the API and server version. It will return true when endpointslices
+// should be watched, and false when endpoints should be watched.
+// If the API supports discovery v1 beta1, and the server versions >= 1.19, endpointslices are watched.
+// This function should be removed, along with non-slice endpoint watch code, when support for k8s < 1.19 is dropped.
+func (k *Kubernetes) endpointSliceSupported(kubeClient *kubernetes.Clientset) bool {
+ useEndpointSlices := false
+ ticker := time.NewTicker(100 * time.Millisecond)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-ticker.C:
+ sv, err := kubeClient.ServerVersion()
+ if err != nil {
+ continue
+ }
+ // Enable use of endpoint slices if the API supports the discovery v1 beta1 api
+ if _, err := kubeClient.Discovery().ServerResourcesForGroupVersion(discovery.SchemeGroupVersion.String()); err == nil {
+ useEndpointSlices = true
+ }
+ // Disable use of endpoint slices for k8s versions 1.18 and earlier. The Endpointslices API was enabled
+ // by default in 1.17 but Service -> Pod proxy continued to use Endpoints by default until 1.19.
+ // DNS results should be built from the same source data that the proxy uses. This decision assumes
+ // k8s EndpointSliceProxying featuregate is at the default (i.e. only enabled for k8s >= 1.19).
+ major, _ := strconv.Atoi(sv.Major)
+ minor, _ := strconv.Atoi(strings.TrimRight(sv.Minor, "+"))
+ if useEndpointSlices && major <= 1 && minor <= 18 {
+ log.Info("Watching Endpoints instead of EndpointSlices in k8s versions < 1.19")
+ useEndpointSlices = false
+ }
+ return useEndpointSlices
+ }
+ }
}
// Records looks up services in kubernetes.
diff --git a/plugin/kubernetes/setup.go b/plugin/kubernetes/setup.go
index 89ec439fb..8b9bd2c42 100644
--- a/plugin/kubernetes/setup.go
+++ b/plugin/kubernetes/setup.go
@@ -7,7 +7,6 @@ import (
"os"
"strconv"
"strings"
- "time"
"github.com/coredns/caddy"
"github.com/coredns/coredns/core/dnsserver"
@@ -39,12 +38,16 @@ func setup(c *caddy.Controller) error {
return plugin.Error(pluginName, err)
}
- err = k.InitKubeCache(context.Background())
+ onStart, onShut, err := k.InitKubeCache(context.Background())
if err != nil {
return plugin.Error(pluginName, err)
}
-
- k.RegisterKubeCache(c)
+ if onStart != nil {
+ c.OnStartup(onStart)
+ }
+ if onShut != nil {
+ c.OnShutdown(onShut)
+ }
dnsserver.GetConfig(c).AddPlugin(func(next plugin.Handler) plugin.Handler {
k.Next = next
@@ -60,30 +63,6 @@ func setup(c *caddy.Controller) error {
return nil
}
-// RegisterKubeCache registers KubeCache start and stop functions with Caddy
-func (k *Kubernetes) RegisterKubeCache(c *caddy.Controller) {
- c.OnStartup(func() error {
- go k.APIConn.Run()
-
- timeout := time.After(5 * time.Second)
- ticker := time.NewTicker(100 * time.Millisecond)
- for {
- select {
- case <-ticker.C:
- if k.APIConn.HasSynced() {
- return nil
- }
- case <-timeout:
- return nil
- }
- }
- })
-
- c.OnShutdown(func() error {
- return k.APIConn.Stop()
- })
-}
-
func kubernetesParse(c *caddy.Controller) (*Kubernetes, error) {
var (
k8s *Kubernetes
@@ -113,7 +92,6 @@ func ParseStanza(c *caddy.Controller) (*Kubernetes, error) {
opts := dnsControlOpts{
initEndpointsCache: true,
- useEndpointSlices: false,
ignoreEmptyService: false,
}
k8s.opts = opts