diff options
Diffstat (limited to 'plugin/kubernetes/kubernetes.go')
-rw-r--r-- | plugin/kubernetes/kubernetes.go | 95 |
1 files changed, 75 insertions, 20 deletions
diff --git a/plugin/kubernetes/kubernetes.go b/plugin/kubernetes/kubernetes.go index 7b3d822c2..bf1f01664 100644 --- a/plugin/kubernetes/kubernetes.go +++ b/plugin/kubernetes/kubernetes.go @@ -8,6 +8,7 @@ import ( "net" "strconv" "strings" + "time" "github.com/coredns/coredns/plugin" "github.com/coredns/coredns/plugin/etcd/msg" @@ -213,22 +214,22 @@ func (k *Kubernetes) getClientConfig() (*rest.Config, error) { } // InitKubeCache initializes a new Kubernetes cache. -func (k *Kubernetes) InitKubeCache(ctx context.Context) (err error) { +func (k *Kubernetes) InitKubeCache(ctx context.Context) (onStart func() error, onShut func() error, err error) { config, err := k.getClientConfig() if err != nil { - return err + return nil, nil, err } kubeClient, err := kubernetes.NewForConfig(config) if err != nil { - return fmt.Errorf("failed to create kubernetes notification controller: %q", err) + return nil, nil, fmt.Errorf("failed to create kubernetes notification controller: %q", err) } if k.opts.labelSelector != nil { var selector labels.Selector selector, err = meta.LabelSelectorAsSelector(k.opts.labelSelector) if err != nil { - return fmt.Errorf("unable to create Selector for LabelSelector '%s': %q", k.opts.labelSelector, err) + return nil, nil, fmt.Errorf("unable to create Selector for LabelSelector '%s': %q", k.opts.labelSelector, err) } k.opts.selector = selector } @@ -237,7 +238,7 @@ func (k *Kubernetes) InitKubeCache(ctx context.Context) (err error) { var selector labels.Selector selector, err = meta.LabelSelectorAsSelector(k.opts.namespaceLabelSelector) if err != nil { - return fmt.Errorf("unable to create Selector for LabelSelector '%s': %q", k.opts.namespaceLabelSelector, err) + return nil, nil, fmt.Errorf("unable to create Selector for LabelSelector '%s': %q", k.opts.namespaceLabelSelector, err) } k.opts.namespaceSelector = selector } @@ -246,25 +247,79 @@ func (k *Kubernetes) InitKubeCache(ctx context.Context) (err error) { k.opts.zones = k.Zones k.opts.endpointNameMode = k.endpointNameMode - // Enable use of endpoint slices if the API supports the discovery v1 beta1 api - if _, err := kubeClient.Discovery().ServerResourcesForGroupVersion(discovery.SchemeGroupVersion.String()); err == nil { - k.opts.useEndpointSlices = true - } - // Disable use of endpoint slices for k8s versions 1.18 and earlier. Endpoint slices were - // introduced in 1.17 but EndpointSliceMirroring was not added until 1.19. - // if err != nil, we continue with the above default which is to use endpoint slices. - if sv, err := kubeClient.ServerVersion(); err == nil { - major, _ := strconv.Atoi(sv.Major) - minor, _ := strconv.Atoi(strings.TrimRight(sv.Minor, "+")) - if k.opts.useEndpointSlices && major <= 1 && minor <= 18 { - log.Info("Watching Endpoints instead of EndpointSlices in k8s versions < 1.19") - k.opts.useEndpointSlices = false + + k.APIConn = newdnsController(ctx, kubeClient, k.opts) + + initEndpointWatch := k.opts.initEndpointsCache + + onStart = func() error { + go func() { + if initEndpointWatch { + // Revert to watching Endpoints for incompatible K8s. + // This can be remove when all supported k8s versions support endpointslices. + if ok := k.endpointSliceSupported(kubeClient); !ok { + k.APIConn.(*dnsControl).WatchEndpoints(ctx) + } + } + k.APIConn.Run() + }() + + timeout := time.After(5 * time.Second) + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + for { + select { + case <-ticker.C: + if k.APIConn.HasSynced() { + return nil + } + case <-timeout: + log.Warning("starting server with unsynced Kubernetes API") + return nil + } } } - k.APIConn = newdnsController(ctx, kubeClient, k.opts) + onShut = func() error { + return k.APIConn.Stop() + } + + return onStart, onShut, err +} - return err +// endpointSliceSupported will determine which endpoint object type to watch (endpointslices or endpoints) +// based on the supportability of endpointslices in the API and server version. It will return true when endpointslices +// should be watched, and false when endpoints should be watched. +// If the API supports discovery v1 beta1, and the server versions >= 1.19, endpointslices are watched. +// This function should be removed, along with non-slice endpoint watch code, when support for k8s < 1.19 is dropped. +func (k *Kubernetes) endpointSliceSupported(kubeClient *kubernetes.Clientset) bool { + useEndpointSlices := false + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + for { + select { + case <-ticker.C: + sv, err := kubeClient.ServerVersion() + if err != nil { + continue + } + // Enable use of endpoint slices if the API supports the discovery v1 beta1 api + if _, err := kubeClient.Discovery().ServerResourcesForGroupVersion(discovery.SchemeGroupVersion.String()); err == nil { + useEndpointSlices = true + } + // Disable use of endpoint slices for k8s versions 1.18 and earlier. The Endpointslices API was enabled + // by default in 1.17 but Service -> Pod proxy continued to use Endpoints by default until 1.19. + // DNS results should be built from the same source data that the proxy uses. This decision assumes + // k8s EndpointSliceProxying featuregate is at the default (i.e. only enabled for k8s >= 1.19). + major, _ := strconv.Atoi(sv.Major) + minor, _ := strconv.Atoi(strings.TrimRight(sv.Minor, "+")) + if useEndpointSlices && major <= 1 && minor <= 18 { + log.Info("Watching Endpoints instead of EndpointSlices in k8s versions < 1.19") + useEndpointSlices = false + } + return useEndpointSlices + } + } } // Records looks up services in kubernetes. |