aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--.circleci/config.yml4
-rw-r--r--plugin/kubernetes/controller.go113
-rw-r--r--plugin/kubernetes/handler_test.go12
-rw-r--r--plugin/kubernetes/kubernetes.go19
-rw-r--r--plugin/kubernetes/kubernetes_test.go34
-rw-r--r--plugin/kubernetes/metrics.go6
-rw-r--r--plugin/kubernetes/metrics_test.go146
-rw-r--r--plugin/kubernetes/ns.go3
-rw-r--r--plugin/kubernetes/ns_test.go39
-rw-r--r--plugin/kubernetes/object/endpoint.go55
-rw-r--r--plugin/kubernetes/object/informer.go5
-rw-r--r--plugin/kubernetes/reverse.go2
-rw-r--r--plugin/kubernetes/reverse_test.go49
-rw-r--r--plugin/kubernetes/setup.go1
-rw-r--r--plugin/kubernetes/watch.go54
-rw-r--r--plugin/kubernetes/xfr.go4
16 files changed, 369 insertions, 177 deletions
diff --git a/.circleci/config.yml b/.circleci/config.yml
index 7b6bfb56e..cd9c2b2f6 100644
--- a/.circleci/config.yml
+++ b/.circleci/config.yml
@@ -21,8 +21,8 @@ integrationDefaults: &integrationDefaults
image: ubuntu-1604:201903-01
working_directory: ~/go/src/${CIRCLE_PROJECT_USERNAME}/coredns
environment:
- - K8S_VERSION: v1.18.2
- - KIND_VERSION: v0.8.1
+ - K8S_VERSION: v1.19.1
+ - KIND_VERSION: v0.9.0
- KUBECONFIG: /home/circleci/.kube/kind-config-kind
setupKubernetes: &setupKubernetes
diff --git a/plugin/kubernetes/controller.go b/plugin/kubernetes/controller.go
index 90a005177..f9373eee9 100644
--- a/plugin/kubernetes/controller.go
+++ b/plugin/kubernetes/controller.go
@@ -11,16 +11,18 @@ import (
"github.com/coredns/coredns/plugin/kubernetes/object"
api "k8s.io/api/core/v1"
+ discovery "k8s.io/api/discovery/v1beta1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
)
const (
podIPIndex = "PodIP"
- svcNameNamespaceIndex = "NameNamespace"
+ svcNameNamespaceIndex = "ServiceNameNamespace"
svcIPIndex = "ServiceIP"
epNameNamespaceIndex = "EndpointNameNamespace"
epIPIndex = "EndpointsIP"
@@ -81,6 +83,7 @@ type dnsControl struct {
type dnsControlOpts struct {
initPodCache bool
initEndpointsCache bool
+ useEndpointSlices bool
ignoreEmptyService bool
// Label handling.
@@ -130,15 +133,31 @@ func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, opts
}
if opts.initEndpointsCache {
+ var (
+ apiObj runtime.Object
+ listWatch cache.ListWatch
+ to func(bool) object.ToFunc
+ latency object.RecordLatencyFunc
+ )
+ if opts.useEndpointSlices {
+ apiObj = &discovery.EndpointSlice{}
+ listWatch.ListFunc = endpointSliceListFunc(ctx, dns.client, api.NamespaceAll, dns.selector)
+ listWatch.WatchFunc = endpointSliceWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector)
+ to = object.EndpointSliceToEndpoints
+ latency = dns.recordEndpointSliceDNSProgrammingLatency
+ } else {
+ apiObj = &api.Endpoints{}
+ listWatch.ListFunc = endpointsListFunc(ctx, dns.client, api.NamespaceAll, dns.selector)
+ listWatch.WatchFunc = endpointsWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector)
+ to = object.ToEndpoints
+ latency = dns.recordEndpointDNSProgrammingLatency
+ }
dns.epLister, dns.epController = object.NewIndexerInformer(
- &cache.ListWatch{
- ListFunc: endpointsListFunc(ctx, dns.client, api.NamespaceAll, dns.selector),
- WatchFunc: endpointsWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector),
- },
- &api.Endpoints{},
+ &listWatch,
+ apiObj,
cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete},
cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc, epIPIndex: epIPIndexFunc},
- object.DefaultProcessor(object.ToEndpoints(opts.skipAPIObjectsCleanup), dns.recordDNSProgrammingLatency),
+ object.DefaultProcessor(to(opts.skipAPIObjectsCleanup), latency),
)
}
@@ -154,8 +173,12 @@ func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, opts
return &dns
}
-func (dns *dnsControl) recordDNSProgrammingLatency(obj meta.Object) {
- recordDNSProgrammingLatency(dns.getServices(obj.(*api.Endpoints)), obj.(*api.Endpoints))
+func (dns *dnsControl) recordEndpointDNSProgrammingLatency(obj meta.Object) {
+ recordDNSProgrammingLatency(dns.getServices(obj.(*api.Endpoints)), obj)
+}
+
+func (dns *dnsControl) recordEndpointSliceDNSProgrammingLatency(obj meta.Object) {
+ recordDNSProgrammingLatency(dns.SvcIndex(object.ServiceKey(obj.GetLabels()[discovery.LabelServiceName], obj.GetNamespace())), obj)
}
func podIPIndexFunc(obj interface{}) ([]string, error) {
@@ -207,8 +230,7 @@ func serviceListFunc(ctx context.Context, c kubernetes.Interface, ns string, s l
if s != nil {
opts.LabelSelector = s.String()
}
- listV1, err := c.CoreV1().Services(ns).List(ctx, opts)
- return listV1, err
+ return c.CoreV1().Services(ns).List(ctx, opts)
}
}
@@ -221,8 +243,16 @@ func podListFunc(ctx context.Context, c kubernetes.Interface, ns string, s label
opts.FieldSelector = opts.FieldSelector + ","
}
opts.FieldSelector = opts.FieldSelector + "status.phase!=Succeeded,status.phase!=Failed,status.phase!=Unknown"
- listV1, err := c.CoreV1().Pods(ns).List(ctx, opts)
- return listV1, err
+ return c.CoreV1().Pods(ns).List(ctx, opts)
+ }
+}
+
+func endpointSliceListFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) {
+ return func(opts meta.ListOptions) (runtime.Object, error) {
+ if s != nil {
+ opts.LabelSelector = s.String()
+ }
+ return c.DiscoveryV1beta1().EndpointSlices(ns).List(ctx, opts)
}
}
@@ -231,8 +261,7 @@ func endpointsListFunc(ctx context.Context, c kubernetes.Interface, ns string, s
if s != nil {
opts.LabelSelector = s.String()
}
- listV1, err := c.CoreV1().Endpoints(ns).List(ctx, opts)
- return listV1, err
+ return c.CoreV1().Endpoints(ns).List(ctx, opts)
}
}
@@ -241,8 +270,56 @@ func namespaceListFunc(ctx context.Context, c kubernetes.Interface, s labels.Sel
if s != nil {
opts.LabelSelector = s.String()
}
- listV1, err := c.CoreV1().Namespaces().List(ctx, opts)
- return listV1, err
+ return c.CoreV1().Namespaces().List(ctx, opts)
+ }
+}
+
+func serviceWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
+ return func(options meta.ListOptions) (watch.Interface, error) {
+ if s != nil {
+ options.LabelSelector = s.String()
+ }
+ return c.CoreV1().Services(ns).Watch(ctx, options)
+ }
+}
+
+func podWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
+ return func(options meta.ListOptions) (watch.Interface, error) {
+ if s != nil {
+ options.LabelSelector = s.String()
+ }
+ if len(options.FieldSelector) > 0 {
+ options.FieldSelector = options.FieldSelector + ","
+ }
+ options.FieldSelector = options.FieldSelector + "status.phase!=Succeeded,status.phase!=Failed,status.phase!=Unknown"
+ return c.CoreV1().Pods(ns).Watch(ctx, options)
+ }
+}
+
+func endpointSliceWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
+ return func(options meta.ListOptions) (watch.Interface, error) {
+ if s != nil {
+ options.LabelSelector = s.String()
+ }
+ return c.DiscoveryV1beta1().EndpointSlices(ns).Watch(ctx, options)
+ }
+}
+
+func endpointsWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
+ return func(options meta.ListOptions) (watch.Interface, error) {
+ if s != nil {
+ options.LabelSelector = s.String()
+ }
+ return c.CoreV1().Endpoints(ns).Watch(ctx, options)
+ }
+}
+
+func namespaceWatchFunc(ctx context.Context, c kubernetes.Interface, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
+ return func(options meta.ListOptions) (watch.Interface, error) {
+ if s != nil {
+ options.LabelSelector = s.String()
+ }
+ return c.CoreV1().Namespaces().Watch(ctx, options)
}
}
@@ -442,7 +519,7 @@ func (dns *dnsControl) detectChanges(oldObj, newObj interface{}) {
}
func (dns *dnsControl) getServices(endpoints *api.Endpoints) []*object.Service {
- return dns.SvcIndex(object.EndpointsKey(endpoints.GetName(), endpoints.GetNamespace()))
+ return dns.SvcIndex(object.ServiceKey(endpoints.GetName(), endpoints.GetNamespace()))
}
// subsetsEquivalent checks if two endpoint subsets are significantly equivalent
diff --git a/plugin/kubernetes/handler_test.go b/plugin/kubernetes/handler_test.go
index 12ebc2765..f5630ccdd 100644
--- a/plugin/kubernetes/handler_test.go
+++ b/plugin/kubernetes/handler_test.go
@@ -643,8 +643,9 @@ var epsIndex = map[string][]*object.Endpoints{
},
},
},
- Name: "svc1",
+ Name: "svc1-slice1",
Namespace: "testns",
+ Index: object.EndpointsKey("svc1", "testns"),
}},
"svcempty.testns": {{
Subsets: []object.EndpointSubset{
@@ -655,8 +656,9 @@ var epsIndex = map[string][]*object.Endpoints{
},
},
},
- Name: "svcempty",
+ Name: "svcempty-slice1",
Namespace: "testns",
+ Index: object.EndpointsKey("svcempty", "testns"),
}},
"hdls1.testns": {{
Subsets: []object.EndpointSubset{
@@ -674,8 +676,9 @@ var epsIndex = map[string][]*object.Endpoints{
},
},
},
- Name: "hdls1",
+ Name: "hdls1-slice1",
Namespace: "testns",
+ Index: object.EndpointsKey("hdls1", "testns"),
}},
"hdlsprtls.testns": {{
Subsets: []object.EndpointSubset{
@@ -686,8 +689,9 @@ var epsIndex = map[string][]*object.Endpoints{
Ports: []object.EndpointPort{{Port: -1}},
},
},
- Name: "hdlsprtls",
+ Name: "hdlsprtls-slice1",
Namespace: "testns",
+ Index: object.EndpointsKey("hdlsprtls", "testns"),
}},
}
diff --git a/plugin/kubernetes/kubernetes.go b/plugin/kubernetes/kubernetes.go
index 83b4b02d1..991323e76 100644
--- a/plugin/kubernetes/kubernetes.go
+++ b/plugin/kubernetes/kubernetes.go
@@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"net"
+ "strconv"
"strings"
"github.com/coredns/coredns/plugin"
@@ -18,6 +19,7 @@ import (
"github.com/miekg/dns"
api "k8s.io/api/core/v1"
+ discovery "k8s.io/api/discovery/v1beta1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
@@ -244,6 +246,20 @@ func (k *Kubernetes) InitKubeCache(ctx context.Context) (err error) {
k.opts.zones = k.Zones
k.opts.endpointNameMode = k.endpointNameMode
+ // Enable use of endpoint slices if the API supports the discovery v1 beta1 api
+ if _, err := kubeClient.Discovery().ServerResourcesForGroupVersion(discovery.SchemeGroupVersion.String()); err == nil {
+ k.opts.useEndpointSlices = true
+ }
+ // Disable use of endpoint slices for k8s versions 1.18 and earlier. Endpoint slices were
+ // introduced in 1.17 but EndpointSliceMirroring was not added until 1.19.
+ sv, _ := kubeClient.ServerVersion()
+ major, _ := strconv.Atoi(sv.Major)
+ minor, _ := strconv.Atoi(sv.Minor)
+ if k.opts.useEndpointSlices && major <= 1 && minor <= 18 {
+ log.Info("watching Endpoints instead of EndpointSlices in k8s versions < 1.19")
+ k.opts.useEndpointSlices = false
+ }
+
k.APIConn = newdnsController(ctx, kubeClient, k.opts)
return err
@@ -433,8 +449,9 @@ func (k *Kubernetes) findServices(r recordRequest, zone string) (services []msg.
if endpointsList == nil {
endpointsList = endpointsListFunc()
}
+
for _, ep := range endpointsList {
- if ep.Name != svc.Name || ep.Namespace != svc.Namespace {
+ if object.EndpointsKey(svc.Name, svc.Namespace) != ep.Index {
continue
}
diff --git a/plugin/kubernetes/kubernetes_test.go b/plugin/kubernetes/kubernetes_test.go
index eddb7645d..7458a3bdc 100644
--- a/plugin/kubernetes/kubernetes_test.go
+++ b/plugin/kubernetes/kubernetes_test.go
@@ -137,8 +137,9 @@ func (APIConnServiceTest) EpIndex(string) []*object.Endpoints {
},
},
},
- Name: "svc1",
+ Name: "svc1-slice1",
Namespace: "testns",
+ Index: object.EndpointsKey("svc1", "testns"),
},
{
Subsets: []object.EndpointSubset{
@@ -151,22 +152,9 @@ func (APIConnServiceTest) EpIndex(string) []*object.Endpoints {
},
},
},
- Name: "hdls1",
- Namespace: "testns",
- },
- {
- Subsets: []object.EndpointSubset{
- {
- Addresses: []object.EndpointAddress{
- {IP: "172.0.0.3"},
- },
- Ports: []object.EndpointPort{
- {Port: 80, Protocol: "tcp", Name: "http"},
- },
- },
- },
- Name: "hdls1",
+ Name: "hdls1-slice1",
Namespace: "testns",
+ Index: object.EndpointsKey("hdls1", "testns"),
},
{
Subsets: []object.EndpointSubset{
@@ -194,8 +182,9 @@ func (APIConnServiceTest) EndpointsList() []*object.Endpoints {
},
},
},
- Name: "svc1",
+ Name: "svc1-slice1",
Namespace: "testns",
+ Index: object.EndpointsKey("svc1", "testns"),
},
{
Subsets: []object.EndpointSubset{
@@ -208,22 +197,24 @@ func (APIConnServiceTest) EndpointsList() []*object.Endpoints {
},
},
},
- Name: "hdls1",
+ Name: "hdls1-slice1",
Namespace: "testns",
+ Index: object.EndpointsKey("hdls1", "testns"),
},
{
Subsets: []object.EndpointSubset{
{
Addresses: []object.EndpointAddress{
- {IP: "172.0.0.3"},
+ {IP: "172.0.0.2"},
},
Ports: []object.EndpointPort{
{Port: 80, Protocol: "tcp", Name: "http"},
},
},
},
- Name: "hdls1",
+ Name: "hdls1-slice2",
Namespace: "testns",
+ Index: object.EndpointsKey("hdls1", "testns"),
},
{
Subsets: []object.EndpointSubset{
@@ -275,6 +266,9 @@ func TestServices(t *testing.T) {
// External Services
{qname: "external.testns.svc.interwebs.test.", qtype: dns.TypeCNAME, answer: svcAns{host: "coredns.io", key: "/" + coredns + "/test/interwebs/svc/testns/external"}},
+
+ // Headless Services
+ {qname: "hdls1.testns.svc.interwebs.test.", qtype: dns.TypeA, answer: svcAns{host: "172.0.0.2", key: "/" + coredns + "/test/interwebs/svc/testns/hdls1/172-0-0-2"}},
}
for i, test := range tests {
diff --git a/plugin/kubernetes/metrics.go b/plugin/kubernetes/metrics.go
index 44220a6f7..8adeb6940 100644
--- a/plugin/kubernetes/metrics.go
+++ b/plugin/kubernetes/metrics.go
@@ -5,10 +5,10 @@ import (
"github.com/coredns/coredns/plugin"
"github.com/coredns/coredns/plugin/kubernetes/object"
-
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
api "k8s.io/api/core/v1"
+ meta "k8s.io/apimachinery/pkg/apis/meta/v1"
)
var (
@@ -37,11 +37,11 @@ var (
durationSinceFunc = time.Since
)
-func recordDNSProgrammingLatency(svcs []*object.Service, endpoints *api.Endpoints) {
+func recordDNSProgrammingLatency(svcs []*object.Service, endpoints meta.Object) {
// getLastChangeTriggerTime is the time.Time value of the EndpointsLastChangeTriggerTime
// annotation stored in the given endpoints object or the "zero" time if the annotation wasn't set
var lastChangeTriggerTime time.Time
- stringVal, ok := endpoints.Annotations[api.EndpointsLastChangeTriggerTime]
+ stringVal, ok := endpoints.GetAnnotations()[api.EndpointsLastChangeTriggerTime]
if ok {
ts, err := time.Parse(time.RFC3339Nano, stringVal)
if err != nil {
diff --git a/plugin/kubernetes/metrics_test.go b/plugin/kubernetes/metrics_test.go
index 0ab6f3c20..43b5ca382 100644
--- a/plugin/kubernetes/metrics_test.go
+++ b/plugin/kubernetes/metrics_test.go
@@ -10,6 +10,7 @@ import (
"github.com/prometheus/client_golang/prometheus/testutil"
api "k8s.io/api/core/v1"
+ discovery "k8s.io/api/discovery/v1beta1"
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/kubernetes"
@@ -20,15 +21,92 @@ const (
namespace = "testns"
)
-func TestDNSProgrammingLatency(t *testing.T) {
+var expected = `
+ # HELP coredns_kubernetes_dns_programming_duration_seconds Histogram of the time (in seconds) it took to program a dns instance.
+ # TYPE coredns_kubernetes_dns_programming_duration_seconds histogram
+ coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.001"} 0
+ coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.002"} 0
+ coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.004"} 0
+ coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.008"} 0
+ coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.016"} 0
+ coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.032"} 0
+ coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.064"} 0
+ coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.128"} 0
+ coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.256"} 0
+ coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.512"} 0
+ coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="1.024"} 1
+ coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="2.048"} 2
+ coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="4.096"} 2
+ coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="8.192"} 2
+ coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="16.384"} 2
+ coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="32.768"} 2
+ coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="65.536"} 2
+ coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="131.072"} 2
+ coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="262.144"} 2
+ coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="524.288"} 2
+ coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="+Inf"} 2
+ coredns_kubernetes_dns_programming_duration_seconds_sum{service_kind="headless_with_selector"} 3
+ coredns_kubernetes_dns_programming_duration_seconds_count{service_kind="headless_with_selector"} 2
+ `
+
+func TestDNSProgrammingLatencyEndpointSlices(t *testing.T) {
+ client := fake.NewSimpleClientset()
+ now := time.Now()
+ ctx := context.TODO()
+ controller := newdnsController(ctx, client, dnsControlOpts{
+ initEndpointsCache: true,
+ useEndpointSlices: true,
+ // This is needed as otherwise the fake k8s client doesn't work properly.
+ skipAPIObjectsCleanup: true,
+ })
+
+ durationSinceFunc = func(t time.Time) time.Duration {
+ return now.Sub(t)
+ }
+ DNSProgrammingLatency.Reset()
+ go controller.Run()
+
+ endpoints1 := []discovery.Endpoint{{
+ Addresses: []string{"1.2.3.4"},
+ }}
+
+ endpoints2 := []discovery.Endpoint{{
+ Addresses: []string{"1.2.3.45"},
+ }}
+
+ createService(t, client, controller, "my-service", api.ClusterIPNone)
+ createEndpointSlice(t, client, "my-service", now.Add(-2*time.Second), endpoints1)
+ updateEndpointSlice(t, client, "my-service", now.Add(-1*time.Second), endpoints2)
+
+ createEndpointSlice(t, client, "endpoints-no-service", now.Add(-4*time.Second), nil)
+
+ createService(t, client, controller, "clusterIP-service", "10.40.0.12")
+ createEndpointSlice(t, client, "clusterIP-service", now.Add(-8*time.Second), nil)
+
+ createService(t, client, controller, "headless-no-annotation", api.ClusterIPNone)
+ createEndpointSlice(t, client, "headless-no-annotation", nil, nil)
+
+ createService(t, client, controller, "headless-wrong-annotation", api.ClusterIPNone)
+ createEndpointSlice(t, client, "headless-wrong-annotation", "wrong-value", nil)
+
+ controller.Stop()
+
+ if err := testutil.CollectAndCompare(DNSProgrammingLatency, strings.NewReader(expected)); err != nil {
+ t.Error(err)
+ }
+}
+
+func TestDnsProgrammingLatencyEndpoints(t *testing.T) {
client := fake.NewSimpleClientset()
now := time.Now()
ctx := context.TODO()
controller := newdnsController(ctx, client, dnsControlOpts{
initEndpointsCache: true,
+ useEndpointSlices: false,
// This is needed as otherwise the fake k8s client doesn't work properly.
skipAPIObjectsCleanup: true,
})
+
durationSinceFunc = func(t time.Time) time.Duration {
return now.Sub(t)
}
@@ -59,33 +137,7 @@ func TestDNSProgrammingLatency(t *testing.T) {
createEndpoints(t, client, "headless-wrong-annotation", "wrong-value", nil)
controller.Stop()
- expected := `
- # HELP coredns_kubernetes_dns_programming_duration_seconds Histogram of the time (in seconds) it took to program a dns instance.
- # TYPE coredns_kubernetes_dns_programming_duration_seconds histogram
- coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.001"} 0
- coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.002"} 0
- coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.004"} 0
- coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.008"} 0
- coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.016"} 0
- coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.032"} 0
- coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.064"} 0
- coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.128"} 0
- coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.256"} 0
- coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.512"} 0
- coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="1.024"} 1
- coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="2.048"} 2
- coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="4.096"} 2
- coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="8.192"} 2
- coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="16.384"} 2
- coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="32.768"} 2
- coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="65.536"} 2
- coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="131.072"} 2
- coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="262.144"} 2
- coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="524.288"} 2
- coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="+Inf"} 2
- coredns_kubernetes_dns_programming_duration_seconds_sum{service_kind="headless_with_selector"} 3
- coredns_kubernetes_dns_programming_duration_seconds_count{service_kind="headless_with_selector"} 2
- `
+
if err := testutil.CollectAndCompare(DNSProgrammingLatency, strings.NewReader(expected)); err != nil {
t.Error(err)
}
@@ -105,6 +157,24 @@ func buildEndpoints(name string, lastChangeTriggerTime interface{}, subsets []ap
}
}
+func buildEndpointSlice(name string, lastChangeTriggerTime interface{}, endpoints []discovery.Endpoint) *discovery.EndpointSlice {
+ annotations := make(map[string]string)
+ switch v := lastChangeTriggerTime.(type) {
+ case string:
+ annotations[api.EndpointsLastChangeTriggerTime] = v
+ case time.Time:
+ annotations[api.EndpointsLastChangeTriggerTime] = v.Format(time.RFC3339Nano)
+ }
+ return &discovery.EndpointSlice{
+ ObjectMeta: meta.ObjectMeta{
+ Namespace: namespace, Name: name + "-12345",
+ Labels: map[string]string{discovery.LabelServiceName: name},
+ Annotations: annotations,
+ },
+ Endpoints: endpoints,
+ }
+}
+
func createEndpoints(t *testing.T, client kubernetes.Interface, name string, triggerTime interface{}, subsets []api.EndpointSubset) {
ctx := context.TODO()
_, err := client.CoreV1().Endpoints(namespace).Create(ctx, buildEndpoints(name, triggerTime, subsets), meta.CreateOptions{})
@@ -121,11 +191,27 @@ func updateEndpoints(t *testing.T, client kubernetes.Interface, name string, tri
}
}
-func createService(t *testing.T, client kubernetes.Interface, controller dnsController, name string, clusterIP string) {
+func createEndpointSlice(t *testing.T, client kubernetes.Interface, name string, triggerTime interface{}, endpoints []discovery.Endpoint) {
+ ctx := context.TODO()
+ _, err := client.DiscoveryV1beta1().EndpointSlices(namespace).Create(ctx, buildEndpointSlice(name, triggerTime, endpoints), meta.CreateOptions{})
+ if err != nil {
+ t.Fatal(err)
+ }
+}
+
+func updateEndpointSlice(t *testing.T, client kubernetes.Interface, name string, triggerTime interface{}, endpoints []discovery.Endpoint) {
+ ctx := context.TODO()
+ _, err := client.DiscoveryV1beta1().EndpointSlices(namespace).Update(ctx, buildEndpointSlice(name, triggerTime, endpoints), meta.UpdateOptions{})
+ if err != nil {
+ t.Fatal(err)
+ }
+}
+
+func createService(t *testing.T, client kubernetes.Interface, controller dnsController, name string, clusterIp string) {
ctx := context.TODO()
if _, err := client.CoreV1().Services(namespace).Create(ctx, &api.Service{
ObjectMeta: meta.ObjectMeta{Namespace: namespace, Name: name},
- Spec: api.ServiceSpec{ClusterIP: clusterIP},
+ Spec: api.ServiceSpec{ClusterIP: clusterIp},
}, meta.CreateOptions{}); err != nil {
t.Fatal(err)
}
diff --git a/plugin/kubernetes/ns.go b/plugin/kubernetes/ns.go
index 2d4bc398a..ba687cb13 100644
--- a/plugin/kubernetes/ns.go
+++ b/plugin/kubernetes/ns.go
@@ -4,7 +4,6 @@ import (
"net"
"strings"
- "github.com/coredns/coredns/plugin/kubernetes/object"
"github.com/miekg/dns"
api "k8s.io/api/core/v1"
)
@@ -27,7 +26,7 @@ func (k *Kubernetes) nsAddrs(external bool, zone string) []dns.RR {
// Collect IPs for all Services of the Endpoints
for _, endpoint := range endpoints {
- svcs := k.APIConn.SvcIndex(object.ServiceKey(endpoint.Name, endpoint.Namespace))
+ svcs := k.APIConn.SvcIndex(endpoint.Index)
for _, svc := range svcs {
if external {
svcName := strings.Join([]string{svc.Name, svc.Namespace, zone}, ".")
diff --git a/plugin/kubernetes/ns_test.go b/plugin/kubernetes/ns_test.go
index 0dc55f489..682303104 100644
--- a/plugin/kubernetes/ns_test.go
+++ b/plugin/kubernetes/ns_test.go
@@ -61,43 +61,28 @@ func (APIConnTest) EpIndexReverse(ip string) []*object.Endpoints {
}
eps := []*object.Endpoints{
{
+ Name: "dns-service-slice1",
+ Namespace: "kube-system",
+ Index: object.EndpointsKey("dns-service", "kube-system"),
Subsets: []object.EndpointSubset{
- {
- Addresses: []object.EndpointAddress{
- {
- IP: "10.244.0.20",
- },
- },
- },
+ {Addresses: []object.EndpointAddress{{IP: "10.244.0.20"}}},
},
- Name: "dns-service",
- Namespace: "kube-system",
},
{
+ Name: "hdls-dns-service-slice1",
+ Namespace: "kube-system",
+ Index: object.EndpointsKey("hdls-dns-service", "kube-system"),
Subsets: []object.EndpointSubset{
- {
- Addresses: []object.EndpointAddress{
- {
- IP: "10.244.0.20",
- },
- },
- },
+ {Addresses: []object.EndpointAddress{{IP: "10.244.0.20"}}},
},
- Name: "hdls-dns-service",
- Namespace: "kube-system",
},
{
+ Name: "dns6-service-slice1",
+ Namespace: "kube-system",
+ Index: object.EndpointsKey("dns6-service", "kube-system"),
Subsets: []object.EndpointSubset{
- {
- Addresses: []object.EndpointAddress{
- {
- IP: "10.244.0.20",
- },
- },
- },
+ {Addresses: []object.EndpointAddress{{IP: "10.244.0.20"}}},
},
- Name: "dns6-service",
- Namespace: "kube-system",
},
}
return eps
diff --git a/plugin/kubernetes/object/endpoint.go b/plugin/kubernetes/object/endpoint.go
index d4c495861..304aaa861 100644
--- a/plugin/kubernetes/object/endpoint.go
+++ b/plugin/kubernetes/object/endpoint.go
@@ -4,6 +4,7 @@ import (
"fmt"
api "k8s.io/api/core/v1"
+ discovery "k8s.io/api/discovery/v1beta1"
"k8s.io/apimachinery/pkg/runtime"
)
@@ -56,6 +57,17 @@ func ToEndpoints(skipCleanup bool) ToFunc {
}
}
+// EndpointSliceToEndpoints returns a function that converts an *discovery.EndpointSlice to a *Endpoints.
+func EndpointSliceToEndpoints(skipCleanup bool) ToFunc {
+ return func(obj interface{}) (interface{}, error) {
+ eps, ok := obj.(*discovery.EndpointSlice)
+ if !ok {
+ return nil, fmt.Errorf("unexpected object %v", obj)
+ }
+ return endpointSliceToEndpoints(skipCleanup, eps), nil
+ }
+}
+
// toEndpoints converts an *api.Endpoints to a *Endpoints.
func toEndpoints(skipCleanup bool, end *api.Endpoints) *Endpoints {
e := &Endpoints{
@@ -108,6 +120,49 @@ func toEndpoints(skipCleanup bool, end *api.Endpoints) *Endpoints {
return e
}
+// endpointSliceToEndpoints converts a *discovery.EndpointSlice to a *Endpoints.
+func endpointSliceToEndpoints(skipCleanup bool, ends *discovery.EndpointSlice) *Endpoints {
+ e := &Endpoints{
+ Version: ends.GetResourceVersion(),
+ Name: ends.GetName(),
+ Namespace: ends.GetNamespace(),
+ Index: EndpointsKey(ends.Labels[discovery.LabelServiceName], ends.GetNamespace()),
+ Subsets: make([]EndpointSubset, 1),
+ }
+
+ if len(ends.Ports) == 0 {
+ // Add sentinel if there are no ports.
+ e.Subsets[0].Ports = []EndpointPort{{Port: -1}}
+ } else {
+ e.Subsets[0].Ports = make([]EndpointPort, len(ends.Ports))
+ for k, p := range ends.Ports {
+ ep := EndpointPort{Port: *p.Port, Name: *p.Name, Protocol: string(*p.Protocol)}
+ e.Subsets[0].Ports[k] = ep
+ }
+ }
+
+ for _, end := range ends.Endpoints {
+ for _, a := range end.Addresses {
+ ea := EndpointAddress{IP: a}
+ if end.Hostname != nil {
+ ea.Hostname = *end.Hostname
+ }
+ if end.TargetRef != nil {
+ ea.TargetRefName = end.TargetRef.Name
+ }
+ // EndpointSlice does not contain NodeName, leave blank
+ e.Subsets[0].Addresses = append(e.Subsets[0].Addresses, ea)
+ e.IndexIP = append(e.IndexIP, a)
+ }
+ }
+
+ if !skipCleanup {
+ *ends = discovery.EndpointSlice{}
+ }
+
+ return e
+}
+
// CopyWithoutSubsets copies e, without the subsets.
func (e *Endpoints) CopyWithoutSubsets() *Endpoints {
e1 := &Endpoints{
diff --git a/plugin/kubernetes/object/informer.go b/plugin/kubernetes/object/informer.go
index f37af4796..afd134e56 100644
--- a/plugin/kubernetes/object/informer.go
+++ b/plugin/kubernetes/object/informer.go
@@ -21,10 +21,11 @@ func NewIndexerInformer(lw cache.ListerWatcher, objType runtime.Object, h cache.
return clientState, cache.New(cfg)
}
-type recordLatencyFunc func(meta.Object)
+// RecordLatencyFunc is a function for recording api object delta latency
+type RecordLatencyFunc func(meta.Object)
// DefaultProcessor is based on the Process function from cache.NewIndexerInformer except it does a conversion.
-func DefaultProcessor(convert ToFunc, recordLatency recordLatencyFunc) ProcessorBuilder {
+func DefaultProcessor(convert ToFunc, recordLatency RecordLatencyFunc) ProcessorBuilder {
return func(clientState cache.Indexer, h cache.ResourceEventHandler) cache.ProcessFunc {
return func(obj interface{}) error {
for _, d := range obj.(cache.Deltas) {
diff --git a/plugin/kubernetes/reverse.go b/plugin/kubernetes/reverse.go
index b80a91fc2..2a5c5cdce 100644
--- a/plugin/kubernetes/reverse.go
+++ b/plugin/kubernetes/reverse.go
@@ -46,7 +46,7 @@ func (k *Kubernetes) serviceRecordForIP(ip, name string) []msg.Service {
for _, eps := range ep.Subsets {
for _, addr := range eps.Addresses {
if addr.IP == ip {
- domain := strings.Join([]string{endpointHostname(addr, k.endpointNameMode), ep.Name, ep.Namespace, Svc, k.primaryZone()}, ".")
+ domain := strings.Join([]string{endpointHostname(addr, k.endpointNameMode), ep.Index, Svc, k.primaryZone()}, ".")
svcs = append(svcs, msg.Service{Host: domain, TTL: k.ttl})
}
}
diff --git a/plugin/kubernetes/reverse_test.go b/plugin/kubernetes/reverse_test.go
index 5869d34bc..3b3b6872b 100644
--- a/plugin/kubernetes/reverse_test.go
+++ b/plugin/kubernetes/reverse_test.go
@@ -56,23 +56,53 @@ func (APIConnReverseTest) SvcIndexReverse(ip string) []*object.Service {
}
func (APIConnReverseTest) EpIndexReverse(ip string) []*object.Endpoints {
- ep1 := object.Endpoints{
+ ep1s1 := object.Endpoints{
Subsets: []object.EndpointSubset{
{
Addresses: []object.EndpointAddress{
{IP: "10.0.0.100", Hostname: "ep1a"},
+ {IP: "10.0.0.99", Hostname: "double-ep"}, // this endpoint is used by two services
+ },
+ Ports: []object.EndpointPort{
+ {Port: 80, Protocol: "tcp", Name: "http"},
+ },
+ },
+ },
+ Name: "svc1-slice1",
+ Namespace: "testns",
+ Index: object.EndpointsKey("svc1", "testns"),
+ }
+ ep1s2 := object.Endpoints{
+ Subsets: []object.EndpointSubset{
+ {
+ Addresses: []object.EndpointAddress{
{IP: "1234:abcd::1", Hostname: "ep1b"},
{IP: "fd00:77:30::a", Hostname: "ip6svc1ex"},
{IP: "fd00:77:30::2:9ba6", Hostname: "ip6svc1in"},
- {IP: "10.0.0.99", Hostname: "double-ep"}, // this endpoint is used by two services
},
Ports: []object.EndpointPort{
{Port: 80, Protocol: "tcp", Name: "http"},
},
},
},
- Name: "svc1",
+ Name: "svc1-slice2",
Namespace: "testns",
+ Index: object.EndpointsKey("svc1", "testns"),
+ }
+ ep1s3 := object.Endpoints{
+ Subsets: []object.EndpointSubset{
+ {
+ Addresses: []object.EndpointAddress{
+ {IP: "10.0.0.100", Hostname: "ep1a"}, // duplicate endpointslice address
+ },
+ Ports: []object.EndpointPort{
+ {Port: 80, Protocol: "tcp", Name: "http"},
+ },
+ },
+ },
+ Name: "svc1-ccccc",
+ Namespace: "testns",
+ Index: object.EndpointsKey("svc1", "testns"),
}
ep2 := object.Endpoints{
Subsets: []object.EndpointSubset{
@@ -85,20 +115,21 @@ func (APIConnReverseTest) EpIndexReverse(ip string) []*object.Endpoints {
},
},
},
- Name: "svc2",
+ Name: "svc2-slice1",
Namespace: "testns",
+ Index: object.EndpointsKey("svc2", "testns"),
}
switch ip {
- case "10.0.0.100":
- fallthrough
case "1234:abcd::1":
fallthrough
case "fd00:77:30::a":
fallthrough
case "fd00:77:30::2:9ba6":
- return []*object.Endpoints{&ep1}
- case "10.0.0.99":
- return []*object.Endpoints{&ep1, &ep2}
+ return []*object.Endpoints{&ep1s2}
+ case "10.0.0.100": // two EndpointSlices for a Service contain this IP (EndpointSlice skew)
+ return []*object.Endpoints{&ep1s1, &ep1s3}
+ case "10.0.0.99": // two different Services select this IP
+ return []*object.Endpoints{&ep1s1, &ep2}
}
return nil
}
diff --git a/plugin/kubernetes/setup.go b/plugin/kubernetes/setup.go
index 03bbc6330..700a7421e 100644
--- a/plugin/kubernetes/setup.go
+++ b/plugin/kubernetes/setup.go
@@ -113,6 +113,7 @@ func ParseStanza(c *caddy.Controller) (*Kubernetes, error) {
opts := dnsControlOpts{
initEndpointsCache: true,
+ useEndpointSlices: false,
ignoreEmptyService: false,
}
k8s.opts = opts
diff --git a/plugin/kubernetes/watch.go b/plugin/kubernetes/watch.go
deleted file mode 100644
index d15ed4cf9..000000000
--- a/plugin/kubernetes/watch.go
+++ /dev/null
@@ -1,54 +0,0 @@
-package kubernetes
-
-import (
- "context"
-
- meta "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/apimachinery/pkg/labels"
- "k8s.io/apimachinery/pkg/watch"
- "k8s.io/client-go/kubernetes"
-)
-
-func serviceWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
- return func(options meta.ListOptions) (watch.Interface, error) {
- if s != nil {
- options.LabelSelector = s.String()
- }
- w, err := c.CoreV1().Services(ns).Watch(ctx, options)
- return w, err
- }
-}
-
-func podWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
- return func(options meta.ListOptions) (watch.Interface, error) {
- if s != nil {
- options.LabelSelector = s.String()
- }
- if len(options.FieldSelector) > 0 {
- options.FieldSelector = options.FieldSelector + ","
- }
- options.FieldSelector = options.FieldSelector + "status.phase!=Succeeded,status.phase!=Failed,status.phase!=Unknown"
- w, err := c.CoreV1().Pods(ns).Watch(ctx, options)
- return w, err
- }
-}
-
-func endpointsWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
- return func(options meta.ListOptions) (watch.Interface, error) {
- if s != nil {
- options.LabelSelector = s.String()
- }
- w, err := c.CoreV1().Endpoints(ns).Watch(ctx, options)
- return w, err
- }
-}
-
-func namespaceWatchFunc(ctx context.Context, c kubernetes.Interface, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
- return func(options meta.ListOptions) (watch.Interface, error) {
- if s != nil {
- options.LabelSelector = s.String()
- }
- w, err := c.CoreV1().Namespaces().Watch(ctx, options)
- return w, err
- }
-}
diff --git a/plugin/kubernetes/xfr.go b/plugin/kubernetes/xfr.go
index 0392f0252..550a70dfd 100644
--- a/plugin/kubernetes/xfr.go
+++ b/plugin/kubernetes/xfr.go
@@ -84,10 +84,6 @@ func (k *Kubernetes) Transfer(zone string, serial uint32) (<-chan []dns.RR, erro
endpointsList := k.APIConn.EpIndex(svc.Name + "." + svc.Namespace)
for _, ep := range endpointsList {
- if ep.Name != svc.Name || ep.Namespace != svc.Namespace {
- continue
- }
-
for _, eps := range ep.Subsets {
srvWeight := calcSRVWeight(len(eps.Addresses))
for _, addr := range eps.Addresses {