aboutsummaryrefslogtreecommitdiff
path: root/plugin/kubernetes/kubernetes.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugin/kubernetes/kubernetes.go')
-rw-r--r--plugin/kubernetes/kubernetes.go95
1 files changed, 75 insertions, 20 deletions
diff --git a/plugin/kubernetes/kubernetes.go b/plugin/kubernetes/kubernetes.go
index 7b3d822c2..bf1f01664 100644
--- a/plugin/kubernetes/kubernetes.go
+++ b/plugin/kubernetes/kubernetes.go
@@ -8,6 +8,7 @@ import (
"net"
"strconv"
"strings"
+ "time"
"github.com/coredns/coredns/plugin"
"github.com/coredns/coredns/plugin/etcd/msg"
@@ -213,22 +214,22 @@ func (k *Kubernetes) getClientConfig() (*rest.Config, error) {
}
// InitKubeCache initializes a new Kubernetes cache.
-func (k *Kubernetes) InitKubeCache(ctx context.Context) (err error) {
+func (k *Kubernetes) InitKubeCache(ctx context.Context) (onStart func() error, onShut func() error, err error) {
config, err := k.getClientConfig()
if err != nil {
- return err
+ return nil, nil, err
}
kubeClient, err := kubernetes.NewForConfig(config)
if err != nil {
- return fmt.Errorf("failed to create kubernetes notification controller: %q", err)
+ return nil, nil, fmt.Errorf("failed to create kubernetes notification controller: %q", err)
}
if k.opts.labelSelector != nil {
var selector labels.Selector
selector, err = meta.LabelSelectorAsSelector(k.opts.labelSelector)
if err != nil {
- return fmt.Errorf("unable to create Selector for LabelSelector '%s': %q", k.opts.labelSelector, err)
+ return nil, nil, fmt.Errorf("unable to create Selector for LabelSelector '%s': %q", k.opts.labelSelector, err)
}
k.opts.selector = selector
}
@@ -237,7 +238,7 @@ func (k *Kubernetes) InitKubeCache(ctx context.Context) (err error) {
var selector labels.Selector
selector, err = meta.LabelSelectorAsSelector(k.opts.namespaceLabelSelector)
if err != nil {
- return fmt.Errorf("unable to create Selector for LabelSelector '%s': %q", k.opts.namespaceLabelSelector, err)
+ return nil, nil, fmt.Errorf("unable to create Selector for LabelSelector '%s': %q", k.opts.namespaceLabelSelector, err)
}
k.opts.namespaceSelector = selector
}
@@ -246,25 +247,79 @@ func (k *Kubernetes) InitKubeCache(ctx context.Context) (err error) {
k.opts.zones = k.Zones
k.opts.endpointNameMode = k.endpointNameMode
- // Enable use of endpoint slices if the API supports the discovery v1 beta1 api
- if _, err := kubeClient.Discovery().ServerResourcesForGroupVersion(discovery.SchemeGroupVersion.String()); err == nil {
- k.opts.useEndpointSlices = true
- }
- // Disable use of endpoint slices for k8s versions 1.18 and earlier. Endpoint slices were
- // introduced in 1.17 but EndpointSliceMirroring was not added until 1.19.
- // if err != nil, we continue with the above default which is to use endpoint slices.
- if sv, err := kubeClient.ServerVersion(); err == nil {
- major, _ := strconv.Atoi(sv.Major)
- minor, _ := strconv.Atoi(strings.TrimRight(sv.Minor, "+"))
- if k.opts.useEndpointSlices && major <= 1 && minor <= 18 {
- log.Info("Watching Endpoints instead of EndpointSlices in k8s versions < 1.19")
- k.opts.useEndpointSlices = false
+
+ k.APIConn = newdnsController(ctx, kubeClient, k.opts)
+
+ initEndpointWatch := k.opts.initEndpointsCache
+
+ onStart = func() error {
+ go func() {
+ if initEndpointWatch {
+ // Revert to watching Endpoints for incompatible K8s.
+ // This can be remove when all supported k8s versions support endpointslices.
+ if ok := k.endpointSliceSupported(kubeClient); !ok {
+ k.APIConn.(*dnsControl).WatchEndpoints(ctx)
+ }
+ }
+ k.APIConn.Run()
+ }()
+
+ timeout := time.After(5 * time.Second)
+ ticker := time.NewTicker(100 * time.Millisecond)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-ticker.C:
+ if k.APIConn.HasSynced() {
+ return nil
+ }
+ case <-timeout:
+ log.Warning("starting server with unsynced Kubernetes API")
+ return nil
+ }
}
}
- k.APIConn = newdnsController(ctx, kubeClient, k.opts)
+ onShut = func() error {
+ return k.APIConn.Stop()
+ }
+
+ return onStart, onShut, err
+}
- return err
+// endpointSliceSupported will determine which endpoint object type to watch (endpointslices or endpoints)
+// based on the supportability of endpointslices in the API and server version. It will return true when endpointslices
+// should be watched, and false when endpoints should be watched.
+// If the API supports discovery v1 beta1, and the server versions >= 1.19, endpointslices are watched.
+// This function should be removed, along with non-slice endpoint watch code, when support for k8s < 1.19 is dropped.
+func (k *Kubernetes) endpointSliceSupported(kubeClient *kubernetes.Clientset) bool {
+ useEndpointSlices := false
+ ticker := time.NewTicker(100 * time.Millisecond)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-ticker.C:
+ sv, err := kubeClient.ServerVersion()
+ if err != nil {
+ continue
+ }
+ // Enable use of endpoint slices if the API supports the discovery v1 beta1 api
+ if _, err := kubeClient.Discovery().ServerResourcesForGroupVersion(discovery.SchemeGroupVersion.String()); err == nil {
+ useEndpointSlices = true
+ }
+ // Disable use of endpoint slices for k8s versions 1.18 and earlier. The Endpointslices API was enabled
+ // by default in 1.17 but Service -> Pod proxy continued to use Endpoints by default until 1.19.
+ // DNS results should be built from the same source data that the proxy uses. This decision assumes
+ // k8s EndpointSliceProxying featuregate is at the default (i.e. only enabled for k8s >= 1.19).
+ major, _ := strconv.Atoi(sv.Major)
+ minor, _ := strconv.Atoi(strings.TrimRight(sv.Minor, "+"))
+ if useEndpointSlices && major <= 1 && minor <= 18 {
+ log.Info("Watching Endpoints instead of EndpointSlices in k8s versions < 1.19")
+ useEndpointSlices = false
+ }
+ return useEndpointSlices
+ }
+ }
}
// Records looks up services in kubernetes.