From 9121e784966d52bb93696d83d9ab394ac2efd77d Mon Sep 17 00:00:00 2001 From: Chris O'Haver Date: Tue, 1 Dec 2020 15:29:05 -0500 Subject: plugin/kubernetes: Fix dns programming duration metric (#4255) * get data reqd to record latency before calling toFuncs * refactor out unnecessary toFunc wrappers * remove latency metric unit tests per PR feedback Signed-off-by: Chris O'Haver --- plugin/kubernetes/controller.go | 40 +++--- plugin/kubernetes/informer_test.go | 10 +- plugin/kubernetes/metrics.go | 73 ----------- plugin/kubernetes/metrics_test.backup | 203 +++++++++++++++++++++++++++++++ plugin/kubernetes/metrics_test.go | 223 ---------------------------------- plugin/kubernetes/object/endpoint.go | 49 +++----- plugin/kubernetes/object/informer.go | 21 +++- plugin/kubernetes/object/metrics.go | 82 +++++++++++++ plugin/kubernetes/object/object.go | 4 +- plugin/kubernetes/object/pod.go | 46 +++---- plugin/kubernetes/object/service.go | 23 ++-- 11 files changed, 372 insertions(+), 402 deletions(-) delete mode 100644 plugin/kubernetes/metrics.go create mode 100644 plugin/kubernetes/metrics_test.backup delete mode 100644 plugin/kubernetes/metrics_test.go create mode 100644 plugin/kubernetes/object/metrics.go (limited to 'plugin') diff --git a/plugin/kubernetes/controller.go b/plugin/kubernetes/controller.go index f9373eee9..2319cf203 100644 --- a/plugin/kubernetes/controller.go +++ b/plugin/kubernetes/controller.go @@ -92,9 +92,8 @@ type dnsControlOpts struct { namespaceLabelSelector *meta.LabelSelector namespaceSelector labels.Selector - zones []string - endpointNameMode bool - skipAPIObjectsCleanup bool + zones []string + endpointNameMode bool } // newDNSController creates a controller for CoreDNS. @@ -116,7 +115,7 @@ func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, opts &api.Service{}, cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete}, cache.Indexers{svcNameNamespaceIndex: svcNameNamespaceIndexFunc, svcIPIndex: svcIPIndexFunc}, - object.DefaultProcessor(object.ToService(opts.skipAPIObjectsCleanup), nil), + object.DefaultProcessor(object.ToService, nil), ) if opts.initPodCache { @@ -128,7 +127,7 @@ func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, opts &api.Pod{}, cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete}, cache.Indexers{podIPIndex: podIPIndexFunc}, - object.DefaultProcessor(object.ToPod(opts.skipAPIObjectsCleanup), nil), + object.DefaultProcessor(object.ToPod, nil), ) } @@ -136,28 +135,28 @@ func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, opts var ( apiObj runtime.Object listWatch cache.ListWatch - to func(bool) object.ToFunc - latency object.RecordLatencyFunc + to object.ToFunc + latency *object.EndpointLatencyRecorder ) if opts.useEndpointSlices { apiObj = &discovery.EndpointSlice{} listWatch.ListFunc = endpointSliceListFunc(ctx, dns.client, api.NamespaceAll, dns.selector) listWatch.WatchFunc = endpointSliceWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector) to = object.EndpointSliceToEndpoints - latency = dns.recordEndpointSliceDNSProgrammingLatency + latency = dns.EndpointSliceLatencyRecorder() } else { apiObj = &api.Endpoints{} listWatch.ListFunc = endpointsListFunc(ctx, dns.client, api.NamespaceAll, dns.selector) listWatch.WatchFunc = endpointsWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector) to = object.ToEndpoints - latency = dns.recordEndpointDNSProgrammingLatency + latency = dns.EndpointsLatencyRecorder() } dns.epLister, dns.epController = object.NewIndexerInformer( &listWatch, apiObj, cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete}, cache.Indexers{epNameNamespaceIndex: epNameNamespaceIndexFunc, epIPIndex: epIPIndexFunc}, - object.DefaultProcessor(to(opts.skipAPIObjectsCleanup), latency), + object.DefaultProcessor(to, latency), ) } @@ -173,12 +172,19 @@ func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, opts return &dns } -func (dns *dnsControl) recordEndpointDNSProgrammingLatency(obj meta.Object) { - recordDNSProgrammingLatency(dns.getServices(obj.(*api.Endpoints)), obj) +func (dns *dnsControl) EndpointsLatencyRecorder() *object.EndpointLatencyRecorder { + return &object.EndpointLatencyRecorder{ + ServiceFunc: func(o meta.Object) []*object.Service { + return dns.SvcIndex(object.ServiceKey(o.GetName(), o.GetNamespace())) + }, + } } - -func (dns *dnsControl) recordEndpointSliceDNSProgrammingLatency(obj meta.Object) { - recordDNSProgrammingLatency(dns.SvcIndex(object.ServiceKey(obj.GetLabels()[discovery.LabelServiceName], obj.GetNamespace())), obj) +func (dns *dnsControl) EndpointSliceLatencyRecorder() *object.EndpointLatencyRecorder { + return &object.EndpointLatencyRecorder{ + ServiceFunc: func(o meta.Object) []*object.Service { + return dns.SvcIndex(object.ServiceKey(o.GetLabels()[discovery.LabelServiceName], o.GetNamespace())) + }, + } } func podIPIndexFunc(obj interface{}) ([]string, error) { @@ -518,10 +524,6 @@ func (dns *dnsControl) detectChanges(oldObj, newObj interface{}) { } } -func (dns *dnsControl) getServices(endpoints *api.Endpoints) []*object.Service { - return dns.SvcIndex(object.ServiceKey(endpoints.GetName(), endpoints.GetNamespace())) -} - // subsetsEquivalent checks if two endpoint subsets are significantly equivalent // I.e. that they have the same ready addresses, host names, ports (including protocol // and service names for SRV) diff --git a/plugin/kubernetes/informer_test.go b/plugin/kubernetes/informer_test.go index 5156554e9..7aa9d1e83 100644 --- a/plugin/kubernetes/informer_test.go +++ b/plugin/kubernetes/informer_test.go @@ -11,7 +11,7 @@ import ( ) func TestDefaultProcessor(t *testing.T) { - pbuild := object.DefaultProcessor(object.ToService(true), nil) + pbuild := object.DefaultProcessor(object.ToService, nil) reh := cache.ResourceEventHandlerFuncs{} idx := cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{}) processor := pbuild(idx, reh) @@ -30,8 +30,8 @@ func testProcessor(t *testing.T, processor cache.ProcessFunc, idx cache.Indexer) // Add the objects err := processor(cache.Deltas{ - {Type: cache.Added, Object: obj}, - {Type: cache.Added, Object: obj2}, + {Type: cache.Added, Object: obj.DeepCopy()}, + {Type: cache.Added, Object: obj2.DeepCopy()}, }) if err != nil { t.Fatalf("add failed: %v", err) @@ -55,7 +55,7 @@ func testProcessor(t *testing.T, processor cache.ProcessFunc, idx cache.Indexer) obj.Spec.ClusterIP = "1.2.3.5" err = processor(cache.Deltas{{ Type: cache.Updated, - Object: obj, + Object: obj.DeepCopy(), }}) if err != nil { t.Fatalf("update failed: %v", err) @@ -78,7 +78,7 @@ func testProcessor(t *testing.T, processor cache.ProcessFunc, idx cache.Indexer) // Delete an object err = processor(cache.Deltas{{ Type: cache.Deleted, - Object: obj2, + Object: obj2.DeepCopy(), }}) if err != nil { t.Fatalf("delete test failed: %v", err) diff --git a/plugin/kubernetes/metrics.go b/plugin/kubernetes/metrics.go deleted file mode 100644 index 8adeb6940..000000000 --- a/plugin/kubernetes/metrics.go +++ /dev/null @@ -1,73 +0,0 @@ -package kubernetes - -import ( - "time" - - "github.com/coredns/coredns/plugin" - "github.com/coredns/coredns/plugin/kubernetes/object" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" - api "k8s.io/api/core/v1" - meta "k8s.io/apimachinery/pkg/apis/meta/v1" -) - -var ( - // DNSProgrammingLatency is defined as the time it took to program a DNS instance - from the time - // a service or pod has changed to the time the change was propagated and was available to be - // served by a DNS server. - // The definition of this SLI can be found at https://github.com/kubernetes/community/blob/master/sig-scalability/slos/dns_programming_latency.md - // Note that the metrics is partially based on the time exported by the endpoints controller on - // the master machine. The measurement may be inaccurate if there is a clock drift between the - // node and master machine. - // The service_kind label can be one of: - // * cluster_ip - // * headless_with_selector - // * headless_without_selector - DNSProgrammingLatency = promauto.NewHistogramVec(prometheus.HistogramOpts{ - Namespace: plugin.Namespace, - Subsystem: pluginName, - Name: "dns_programming_duration_seconds", - // From 1 millisecond to ~17 minutes. - Buckets: prometheus.ExponentialBuckets(0.001, 2, 20), - Help: "Histogram of the time (in seconds) it took to program a dns instance.", - }, []string{"service_kind"}) - - // durationSinceFunc returns the duration elapsed since the given time. - // Added as a global variable to allow injection for testing. - durationSinceFunc = time.Since -) - -func recordDNSProgrammingLatency(svcs []*object.Service, endpoints meta.Object) { - // getLastChangeTriggerTime is the time.Time value of the EndpointsLastChangeTriggerTime - // annotation stored in the given endpoints object or the "zero" time if the annotation wasn't set - var lastChangeTriggerTime time.Time - stringVal, ok := endpoints.GetAnnotations()[api.EndpointsLastChangeTriggerTime] - if ok { - ts, err := time.Parse(time.RFC3339Nano, stringVal) - if err != nil { - log.Warningf("DnsProgrammingLatency cannot be calculated for Endpoints '%s/%s'; invalid %q annotation RFC3339 value of %q", - endpoints.GetNamespace(), endpoints.GetName(), api.EndpointsLastChangeTriggerTime, stringVal) - // In case of error val = time.Zero, which is ignored in the upstream code. - } - lastChangeTriggerTime = ts - } - - // isHeadless indicates whether the endpoints object belongs to a headless - // service (i.e. clusterIp = None). Note that this can be a false negatives if the service - // informer is lagging, i.e. we may not see a recently created service. Given that the services - // don't change very often (comparing to much more frequent endpoints changes), cases when this method - // will return wrong answer should be relatively rare. Because of that we intentionally accept this - // flaw to keep the solution simple. - isHeadless := len(svcs) == 1 && svcs[0].ClusterIP == api.ClusterIPNone - - if endpoints == nil || !isHeadless || lastChangeTriggerTime.IsZero() { - return - } - - // If we're here it means that the Endpoints object is for a headless service and that - // the Endpoints object was created by the endpoints-controller (because the - // LastChangeTriggerTime annotation is set). It means that the corresponding service is a - // "headless service with selector". - DNSProgrammingLatency.WithLabelValues("headless_with_selector"). - Observe(durationSinceFunc(lastChangeTriggerTime).Seconds()) -} diff --git a/plugin/kubernetes/metrics_test.backup b/plugin/kubernetes/metrics_test.backup new file mode 100644 index 000000000..8274eef13 --- /dev/null +++ b/plugin/kubernetes/metrics_test.backup @@ -0,0 +1,203 @@ +package kubernetes + +import ( + "strings" + "testing" + "time" + + "github.com/coredns/coredns/plugin/kubernetes/object" + "github.com/prometheus/client_golang/prometheus/testutil" + api "k8s.io/api/core/v1" + discovery "k8s.io/api/discovery/v1beta1" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/tools/cache" +) + +const ( + namespace = "testns" +) + +var expected = ` + # HELP coredns_kubernetes_dns_programming_duration_seconds Histogram of the time (in seconds) it took to program a dns instance. + # TYPE coredns_kubernetes_dns_programming_duration_seconds histogram + coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.001"} 0 + coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.002"} 0 + coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.004"} 0 + coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.008"} 0 + coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.016"} 0 + coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.032"} 0 + coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.064"} 0 + coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.128"} 0 + coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.256"} 0 + coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.512"} 0 + coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="1.024"} 1 + coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="2.048"} 2 + coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="4.096"} 2 + coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="8.192"} 2 + coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="16.384"} 2 + coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="32.768"} 2 + coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="65.536"} 2 + coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="131.072"} 2 + coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="262.144"} 2 + coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="524.288"} 2 + coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="+Inf"} 2 + coredns_kubernetes_dns_programming_duration_seconds_sum{service_kind="headless_with_selector"} 3 + coredns_kubernetes_dns_programming_duration_seconds_count{service_kind="headless_with_selector"} 2 + ` + +func TestDNSProgrammingLatencyEndpointSlices(t *testing.T) { + now := time.Now() + + svcIdx := cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{svcNameNamespaceIndex: svcNameNamespaceIndexFunc}) + epIdx := cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{}) + + dns := dnsControl{svcLister: svcIdx} + svcProc := object.DefaultProcessor(object.ToService, nil)(svcIdx, cache.ResourceEventHandlerFuncs{}) + epProc := object.DefaultProcessor(object.EndpointSliceToEndpoints, dns.EndpointSliceLatencyRecorder())(epIdx, cache.ResourceEventHandlerFuncs{}) + + object.DurationSinceFunc = func(t time.Time) time.Duration { + return now.Sub(t) + } + object.DNSProgrammingLatency.Reset() + + endpoints1 := []discovery.Endpoint{{ + Addresses: []string{"1.2.3.4"}, + }} + + endpoints2 := []discovery.Endpoint{{ + Addresses: []string{"1.2.3.45"}, + }} + + createService(t, svcProc, "my-service", api.ClusterIPNone) + createEndpointSlice(t, epProc, "my-service", now.Add(-2*time.Second), endpoints1) + updateEndpointSlice(t, epProc, "my-service", now.Add(-1*time.Second), endpoints2) + + createEndpointSlice(t, epProc, "endpoints-no-service", now.Add(-4*time.Second), nil) + + createService(t, svcProc, "clusterIP-service", "10.40.0.12") + createEndpointSlice(t, epProc, "clusterIP-service", now.Add(-8*time.Second), nil) + + createService(t, svcProc, "headless-no-annotation", api.ClusterIPNone) + createEndpointSlice(t, epProc, "headless-no-annotation", nil, nil) + + createService(t, svcProc, "headless-wrong-annotation", api.ClusterIPNone) + createEndpointSlice(t, epProc, "headless-wrong-annotation", "wrong-value", nil) + + if err := testutil.CollectAndCompare(object.DNSProgrammingLatency, strings.NewReader(expected)); err != nil { + t.Error(err) + } +} + +func TestDnsProgrammingLatencyEndpoints(t *testing.T) { + now := time.Now() + + svcIdx := cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{svcNameNamespaceIndex: svcNameNamespaceIndexFunc}) + epIdx := cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{}) + + dns := dnsControl{svcLister: svcIdx} + svcProc := object.DefaultProcessor(object.ToService, nil)(svcIdx, cache.ResourceEventHandlerFuncs{}) + epProc := object.DefaultProcessor(object.ToEndpoints, dns.EndpointsLatencyRecorder())(epIdx, cache.ResourceEventHandlerFuncs{}) + + object.DurationSinceFunc = func(t time.Time) time.Duration { + return now.Sub(t) + } + object.DNSProgrammingLatency.Reset() + + subset1 := []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "1.2.3.4", Hostname: "foo"}}, + }} + + subset2 := []api.EndpointSubset{{ + Addresses: []api.EndpointAddress{{IP: "1.2.3.5", Hostname: "foo"}}, + }} + + createService(t, svcProc, "my-service", api.ClusterIPNone) + createEndpoints(t, epProc, "my-service", now.Add(-2*time.Second), subset1) + updateEndpoints(t, epProc, "my-service", now.Add(-1*time.Second), subset2) + + createEndpoints(t, epProc, "endpoints-no-service", now.Add(-4*time.Second), nil) + + createService(t, svcProc, "clusterIP-service", "10.40.0.12") + createEndpoints(t, epProc, "clusterIP-service", now.Add(-8*time.Second), nil) + + createService(t, svcProc, "headless-no-annotation", api.ClusterIPNone) + createEndpoints(t, epProc, "headless-no-annotation", nil, nil) + + createService(t, svcProc, "headless-wrong-annotation", api.ClusterIPNone) + createEndpoints(t, epProc, "headless-wrong-annotation", "wrong-value", nil) + + if err := testutil.CollectAndCompare(object.DNSProgrammingLatency, strings.NewReader(expected)); err != nil { + t.Error(err) + } +} + +func buildEndpoints(name string, lastChangeTriggerTime interface{}, subsets []api.EndpointSubset) *api.Endpoints { + annotations := make(map[string]string) + switch v := lastChangeTriggerTime.(type) { + case string: + annotations[api.EndpointsLastChangeTriggerTime] = v + case time.Time: + annotations[api.EndpointsLastChangeTriggerTime] = v.Format(time.RFC3339Nano) + } + return &api.Endpoints{ + ObjectMeta: meta.ObjectMeta{Namespace: namespace, Name: name, Annotations: annotations}, + Subsets: subsets, + } +} + +func buildEndpointSlice(name string, lastChangeTriggerTime interface{}, endpoints []discovery.Endpoint) *discovery.EndpointSlice { + annotations := make(map[string]string) + switch v := lastChangeTriggerTime.(type) { + case string: + annotations[api.EndpointsLastChangeTriggerTime] = v + case time.Time: + annotations[api.EndpointsLastChangeTriggerTime] = v.Format(time.RFC3339Nano) + } + return &discovery.EndpointSlice{ + ObjectMeta: meta.ObjectMeta{ + Namespace: namespace, Name: name + "-12345", + Labels: map[string]string{discovery.LabelServiceName: name}, + Annotations: annotations, + }, + Endpoints: endpoints, + } +} + +func createEndpoints(t *testing.T, processor cache.ProcessFunc, name string, triggerTime interface{}, subsets []api.EndpointSubset) { + err := processor(cache.Deltas{{Type: cache.Added, Object: buildEndpoints(name, triggerTime, subsets)}}) + if err != nil { + t.Fatal(err) + } +} + +func updateEndpoints(t *testing.T, processor cache.ProcessFunc, name string, triggerTime interface{}, subsets []api.EndpointSubset) { + err := processor(cache.Deltas{{Type: cache.Updated, Object: buildEndpoints(name, triggerTime, subsets)}}) + if err != nil { + t.Fatal(err) + } +} + +func createEndpointSlice(t *testing.T, processor cache.ProcessFunc, name string, triggerTime interface{}, endpoints []discovery.Endpoint) { + err := processor(cache.Deltas{{Type: cache.Added, Object: buildEndpointSlice(name, triggerTime, endpoints)}}) + if err != nil { + t.Fatal(err) + } +} + +func updateEndpointSlice(t *testing.T, processor cache.ProcessFunc, name string, triggerTime interface{}, endpoints []discovery.Endpoint) { + err := processor(cache.Deltas{{Type: cache.Updated, Object: buildEndpointSlice(name, triggerTime, endpoints)}}) + if err != nil { + t.Fatal(err) + } +} + +func createService(t *testing.T, processor cache.ProcessFunc, name string, clusterIp string) { + obj := &api.Service{ + ObjectMeta: meta.ObjectMeta{Namespace: namespace, Name: name}, + Spec: api.ServiceSpec{ClusterIP: clusterIp}, + } + err := processor(cache.Deltas{{Type: cache.Added, Object: obj}}) + if err != nil { + t.Fatal(err) + } +} diff --git a/plugin/kubernetes/metrics_test.go b/plugin/kubernetes/metrics_test.go deleted file mode 100644 index 43b5ca382..000000000 --- a/plugin/kubernetes/metrics_test.go +++ /dev/null @@ -1,223 +0,0 @@ -package kubernetes - -import ( - "context" - "strings" - "testing" - "time" - - "github.com/coredns/coredns/plugin/kubernetes/object" - - "github.com/prometheus/client_golang/prometheus/testutil" - api "k8s.io/api/core/v1" - discovery "k8s.io/api/discovery/v1beta1" - meta "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/kubernetes/fake" -) - -const ( - namespace = "testns" -) - -var expected = ` - # HELP coredns_kubernetes_dns_programming_duration_seconds Histogram of the time (in seconds) it took to program a dns instance. - # TYPE coredns_kubernetes_dns_programming_duration_seconds histogram - coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.001"} 0 - coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.002"} 0 - coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.004"} 0 - coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.008"} 0 - coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.016"} 0 - coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.032"} 0 - coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.064"} 0 - coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.128"} 0 - coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.256"} 0 - coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="0.512"} 0 - coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="1.024"} 1 - coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="2.048"} 2 - coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="4.096"} 2 - coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="8.192"} 2 - coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="16.384"} 2 - coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="32.768"} 2 - coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="65.536"} 2 - coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="131.072"} 2 - coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="262.144"} 2 - coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="524.288"} 2 - coredns_kubernetes_dns_programming_duration_seconds_bucket{service_kind="headless_with_selector",le="+Inf"} 2 - coredns_kubernetes_dns_programming_duration_seconds_sum{service_kind="headless_with_selector"} 3 - coredns_kubernetes_dns_programming_duration_seconds_count{service_kind="headless_with_selector"} 2 - ` - -func TestDNSProgrammingLatencyEndpointSlices(t *testing.T) { - client := fake.NewSimpleClientset() - now := time.Now() - ctx := context.TODO() - controller := newdnsController(ctx, client, dnsControlOpts{ - initEndpointsCache: true, - useEndpointSlices: true, - // This is needed as otherwise the fake k8s client doesn't work properly. - skipAPIObjectsCleanup: true, - }) - - durationSinceFunc = func(t time.Time) time.Duration { - return now.Sub(t) - } - DNSProgrammingLatency.Reset() - go controller.Run() - - endpoints1 := []discovery.Endpoint{{ - Addresses: []string{"1.2.3.4"}, - }} - - endpoints2 := []discovery.Endpoint{{ - Addresses: []string{"1.2.3.45"}, - }} - - createService(t, client, controller, "my-service", api.ClusterIPNone) - createEndpointSlice(t, client, "my-service", now.Add(-2*time.Second), endpoints1) - updateEndpointSlice(t, client, "my-service", now.Add(-1*time.Second), endpoints2) - - createEndpointSlice(t, client, "endpoints-no-service", now.Add(-4*time.Second), nil) - - createService(t, client, controller, "clusterIP-service", "10.40.0.12") - createEndpointSlice(t, client, "clusterIP-service", now.Add(-8*time.Second), nil) - - createService(t, client, controller, "headless-no-annotation", api.ClusterIPNone) - createEndpointSlice(t, client, "headless-no-annotation", nil, nil) - - createService(t, client, controller, "headless-wrong-annotation", api.ClusterIPNone) - createEndpointSlice(t, client, "headless-wrong-annotation", "wrong-value", nil) - - controller.Stop() - - if err := testutil.CollectAndCompare(DNSProgrammingLatency, strings.NewReader(expected)); err != nil { - t.Error(err) - } -} - -func TestDnsProgrammingLatencyEndpoints(t *testing.T) { - client := fake.NewSimpleClientset() - now := time.Now() - ctx := context.TODO() - controller := newdnsController(ctx, client, dnsControlOpts{ - initEndpointsCache: true, - useEndpointSlices: false, - // This is needed as otherwise the fake k8s client doesn't work properly. - skipAPIObjectsCleanup: true, - }) - - durationSinceFunc = func(t time.Time) time.Duration { - return now.Sub(t) - } - DNSProgrammingLatency.Reset() - go controller.Run() - - subset1 := []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "1.2.3.4", Hostname: "foo"}}, - }} - - subset2 := []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "1.2.3.5", Hostname: "foo"}}, - }} - - createService(t, client, controller, "my-service", api.ClusterIPNone) - createEndpoints(t, client, "my-service", now.Add(-2*time.Second), subset1) - updateEndpoints(t, client, "my-service", now.Add(-1*time.Second), subset2) - - createEndpoints(t, client, "endpoints-no-service", now.Add(-4*time.Second), nil) - - createService(t, client, controller, "clusterIP-service", "10.40.0.12") - createEndpoints(t, client, "clusterIP-service", now.Add(-8*time.Second), nil) - - createService(t, client, controller, "headless-no-annotation", api.ClusterIPNone) - createEndpoints(t, client, "headless-no-annotation", nil, nil) - - createService(t, client, controller, "headless-wrong-annotation", api.ClusterIPNone) - createEndpoints(t, client, "headless-wrong-annotation", "wrong-value", nil) - - controller.Stop() - - if err := testutil.CollectAndCompare(DNSProgrammingLatency, strings.NewReader(expected)); err != nil { - t.Error(err) - } -} - -func buildEndpoints(name string, lastChangeTriggerTime interface{}, subsets []api.EndpointSubset) *api.Endpoints { - annotations := make(map[string]string) - switch v := lastChangeTriggerTime.(type) { - case string: - annotations[api.EndpointsLastChangeTriggerTime] = v - case time.Time: - annotations[api.EndpointsLastChangeTriggerTime] = v.Format(time.RFC3339Nano) - } - return &api.Endpoints{ - ObjectMeta: meta.ObjectMeta{Namespace: namespace, Name: name, Annotations: annotations}, - Subsets: subsets, - } -} - -func buildEndpointSlice(name string, lastChangeTriggerTime interface{}, endpoints []discovery.Endpoint) *discovery.EndpointSlice { - annotations := make(map[string]string) - switch v := lastChangeTriggerTime.(type) { - case string: - annotations[api.EndpointsLastChangeTriggerTime] = v - case time.Time: - annotations[api.EndpointsLastChangeTriggerTime] = v.Format(time.RFC3339Nano) - } - return &discovery.EndpointSlice{ - ObjectMeta: meta.ObjectMeta{ - Namespace: namespace, Name: name + "-12345", - Labels: map[string]string{discovery.LabelServiceName: name}, - Annotations: annotations, - }, - Endpoints: endpoints, - } -} - -func createEndpoints(t *testing.T, client kubernetes.Interface, name string, triggerTime interface{}, subsets []api.EndpointSubset) { - ctx := context.TODO() - _, err := client.CoreV1().Endpoints(namespace).Create(ctx, buildEndpoints(name, triggerTime, subsets), meta.CreateOptions{}) - if err != nil { - t.Fatal(err) - } -} - -func updateEndpoints(t *testing.T, client kubernetes.Interface, name string, triggerTime interface{}, subsets []api.EndpointSubset) { - ctx := context.TODO() - _, err := client.CoreV1().Endpoints(namespace).Update(ctx, buildEndpoints(name, triggerTime, subsets), meta.UpdateOptions{}) - if err != nil { - t.Fatal(err) - } -} - -func createEndpointSlice(t *testing.T, client kubernetes.Interface, name string, triggerTime interface{}, endpoints []discovery.Endpoint) { - ctx := context.TODO() - _, err := client.DiscoveryV1beta1().EndpointSlices(namespace).Create(ctx, buildEndpointSlice(name, triggerTime, endpoints), meta.CreateOptions{}) - if err != nil { - t.Fatal(err) - } -} - -func updateEndpointSlice(t *testing.T, client kubernetes.Interface, name string, triggerTime interface{}, endpoints []discovery.Endpoint) { - ctx := context.TODO() - _, err := client.DiscoveryV1beta1().EndpointSlices(namespace).Update(ctx, buildEndpointSlice(name, triggerTime, endpoints), meta.UpdateOptions{}) - if err != nil { - t.Fatal(err) - } -} - -func createService(t *testing.T, client kubernetes.Interface, controller dnsController, name string, clusterIp string) { - ctx := context.TODO() - if _, err := client.CoreV1().Services(namespace).Create(ctx, &api.Service{ - ObjectMeta: meta.ObjectMeta{Namespace: namespace, Name: name}, - Spec: api.ServiceSpec{ClusterIP: clusterIp}, - }, meta.CreateOptions{}); err != nil { - t.Fatal(err) - } - if err := wait.PollImmediate(10*time.Millisecond, 10*time.Second, func() (bool, error) { - return len(controller.SvcIndex(object.ServiceKey(name, namespace))) == 1, nil - }); err != nil { - t.Fatal(err) - } -} diff --git a/plugin/kubernetes/object/endpoint.go b/plugin/kubernetes/object/endpoint.go index 304aaa861..09429e0b2 100644 --- a/plugin/kubernetes/object/endpoint.go +++ b/plugin/kubernetes/object/endpoint.go @@ -5,6 +5,7 @@ import ( api "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1beta1" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" ) @@ -46,30 +47,12 @@ type EndpointPort struct { // EndpointsKey returns a string using for the index. func EndpointsKey(name, namespace string) string { return name + "." + namespace } -// ToEndpoints returns a function that converts an *api.Endpoints to a *Endpoints. -func ToEndpoints(skipCleanup bool) ToFunc { - return func(obj interface{}) (interface{}, error) { - eps, ok := obj.(*api.Endpoints) - if !ok { - return nil, fmt.Errorf("unexpected object %v", obj) - } - return toEndpoints(skipCleanup, eps), nil - } -} - -// EndpointSliceToEndpoints returns a function that converts an *discovery.EndpointSlice to a *Endpoints. -func EndpointSliceToEndpoints(skipCleanup bool) ToFunc { - return func(obj interface{}) (interface{}, error) { - eps, ok := obj.(*discovery.EndpointSlice) - if !ok { - return nil, fmt.Errorf("unexpected object %v", obj) - } - return endpointSliceToEndpoints(skipCleanup, eps), nil - } -} - // toEndpoints converts an *api.Endpoints to a *Endpoints. -func toEndpoints(skipCleanup bool, end *api.Endpoints) *Endpoints { +func ToEndpoints(obj meta.Object) (meta.Object, error) { + end, ok := obj.(*api.Endpoints) + if !ok { + return nil, fmt.Errorf("unexpected object %v", obj) + } e := &Endpoints{ Version: end.GetResourceVersion(), Name: end.GetName(), @@ -113,15 +96,17 @@ func toEndpoints(skipCleanup bool, end *api.Endpoints) *Endpoints { } } - if !skipCleanup { - *end = api.Endpoints{} - } + *end = api.Endpoints{} - return e + return e, nil } -// endpointSliceToEndpoints converts a *discovery.EndpointSlice to a *Endpoints. -func endpointSliceToEndpoints(skipCleanup bool, ends *discovery.EndpointSlice) *Endpoints { +// EndpointSliceToEndpoints converts a *discovery.EndpointSlice to a *Endpoints. +func EndpointSliceToEndpoints(obj meta.Object) (meta.Object, error) { + ends, ok := obj.(*discovery.EndpointSlice) + if !ok { + return nil, fmt.Errorf("unexpected object %v", obj) + } e := &Endpoints{ Version: ends.GetResourceVersion(), Name: ends.GetName(), @@ -156,11 +141,9 @@ func endpointSliceToEndpoints(skipCleanup bool, ends *discovery.EndpointSlice) * } } - if !skipCleanup { - *ends = discovery.EndpointSlice{} - } + *ends = discovery.EndpointSlice{} - return e + return e, nil } // CopyWithoutSubsets copies e, without the subsets. diff --git a/plugin/kubernetes/object/informer.go b/plugin/kubernetes/object/informer.go index afd134e56..e44cd218a 100644 --- a/plugin/kubernetes/object/informer.go +++ b/plugin/kubernetes/object/informer.go @@ -1,6 +1,8 @@ package object import ( + "fmt" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/tools/cache" @@ -25,13 +27,18 @@ func NewIndexerInformer(lw cache.ListerWatcher, objType runtime.Object, h cache. type RecordLatencyFunc func(meta.Object) // DefaultProcessor is based on the Process function from cache.NewIndexerInformer except it does a conversion. -func DefaultProcessor(convert ToFunc, recordLatency RecordLatencyFunc) ProcessorBuilder { +func DefaultProcessor(convert ToFunc, recordLatency *EndpointLatencyRecorder) ProcessorBuilder { return func(clientState cache.Indexer, h cache.ResourceEventHandler) cache.ProcessFunc { return func(obj interface{}) error { for _, d := range obj.(cache.Deltas) { + if recordLatency != nil { + if o, ok := d.Object.(meta.Object); ok { + recordLatency.init(o) + } + } switch d.Type { case cache.Sync, cache.Added, cache.Updated: - obj, err := convert(d.Object) + obj, err := convert(d.Object.(meta.Object)) if err != nil { return err } @@ -47,14 +54,18 @@ func DefaultProcessor(convert ToFunc, recordLatency RecordLatencyFunc) Processor h.OnAdd(obj) } if recordLatency != nil { - recordLatency(d.Object.(meta.Object)) + recordLatency.record() } case cache.Deleted: var obj interface{} obj, ok := d.Object.(cache.DeletedFinalStateUnknown) if !ok { var err error - obj, err = convert(d.Object) + metaObj, ok := d.Object.(meta.Object) + if !ok { + return fmt.Errorf("unexpected object %v", d.Object) + } + obj, err = convert(metaObj) if err != nil && err != errPodTerminating { return err } @@ -65,7 +76,7 @@ func DefaultProcessor(convert ToFunc, recordLatency RecordLatencyFunc) Processor } h.OnDelete(obj) if !ok && recordLatency != nil { - recordLatency(d.Object.(meta.Object)) + recordLatency.record() } } } diff --git a/plugin/kubernetes/object/metrics.go b/plugin/kubernetes/object/metrics.go new file mode 100644 index 000000000..929925cf1 --- /dev/null +++ b/plugin/kubernetes/object/metrics.go @@ -0,0 +1,82 @@ +package object + +import ( + "time" + + "github.com/coredns/coredns/plugin" + "github.com/coredns/coredns/plugin/pkg/log" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + api "k8s.io/api/core/v1" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +var ( + // DNSProgrammingLatency is defined as the time it took to program a DNS instance - from the time + // a service or pod has changed to the time the change was propagated and was available to be + // served by a DNS server. + // The definition of this SLI can be found at https://github.com/kubernetes/community/blob/master/sig-scalability/slos/dns_programming_latency.md + // Note that the metrics is partially based on the time exported by the endpoints controller on + // the master machine. The measurement may be inaccurate if there is a clock drift between the + // node and master machine. + // The service_kind label can be one of: + // * cluster_ip + // * headless_with_selector + // * headless_without_selector + DNSProgrammingLatency = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: plugin.Namespace, + Subsystem: "kubernetes", + Name: "dns_programming_duration_seconds", + // From 1 millisecond to ~17 minutes. + Buckets: prometheus.ExponentialBuckets(0.001, 2, 20), + Help: "Histogram of the time (in seconds) it took to program a dns instance.", + }, []string{"service_kind"}) + + // DurationSinceFunc returns the duration elapsed since the given time. + // Added as a global variable to allow injection for testing. + DurationSinceFunc = time.Since +) + +// EndpointLatencyRecorder records latency metric for endpoint objects +type EndpointLatencyRecorder struct { + TT time.Time + ServiceFunc func(meta.Object) []*Service + Services []*Service +} + +func (l *EndpointLatencyRecorder) init(o meta.Object) { + l.Services = l.ServiceFunc(o) + l.TT = time.Time{} + stringVal, ok := o.GetAnnotations()[api.EndpointsLastChangeTriggerTime] + if ok { + tt, err := time.Parse(time.RFC3339Nano, stringVal) + if err != nil { + log.Warningf("DnsProgrammingLatency cannot be calculated for Endpoints '%s/%s'; invalid %q annotation RFC3339 value of %q", + o.GetNamespace(), o.GetName(), api.EndpointsLastChangeTriggerTime, stringVal) + // In case of error val = time.Zero, which is ignored downstream. + } + l.TT = tt + } +} + +func (l *EndpointLatencyRecorder) record() { + // isHeadless indicates whether the endpoints object belongs to a headless + // service (i.e. clusterIp = None). Note that this can be a false negatives if the service + // informer is lagging, i.e. we may not see a recently created service. Given that the services + // don't change very often (comparing to much more frequent endpoints changes), cases when this method + // will return wrong answer should be relatively rare. Because of that we intentionally accept this + // flaw to keep the solution simple. + isHeadless := len(l.Services) == 1 && l.Services[0].ClusterIP == api.ClusterIPNone + + if !isHeadless || l.TT.IsZero() { + return + } + + // If we're here it means that the Endpoints object is for a headless service and that + // the Endpoints object was created by the endpoints-controller (because the + // LastChangeTriggerTime annotation is set). It means that the corresponding service is a + // "headless service with selector". + DNSProgrammingLatency.WithLabelValues("headless_with_selector"). + Observe(DurationSinceFunc(l.TT).Seconds()) +} diff --git a/plugin/kubernetes/object/object.go b/plugin/kubernetes/object/object.go index f591f6d0a..7111833e6 100644 --- a/plugin/kubernetes/object/object.go +++ b/plugin/kubernetes/object/object.go @@ -22,8 +22,8 @@ import ( "k8s.io/client-go/tools/cache" ) -// ToFunc converts one empty interface to another. -type ToFunc func(interface{}) (interface{}, error) +// ToFunc converts one v1.Object to another v1.Object. +type ToFunc func(v1.Object) (v1.Object, error) // ProcessorBuilder returns function to process cache events. type ProcessorBuilder func(cache.Indexer, cache.ResourceEventHandler) cache.ProcessFunc diff --git a/plugin/kubernetes/object/pod.go b/plugin/kubernetes/object/pod.go index 04cbe1ad2..9b9d5641c 100644 --- a/plugin/kubernetes/object/pod.go +++ b/plugin/kubernetes/object/pod.go @@ -5,6 +5,7 @@ import ( "fmt" api "k8s.io/api/core/v1" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" ) @@ -21,37 +22,28 @@ type Pod struct { var errPodTerminating = errors.New("pod terminating") -// ToPod returns a function that converts an api.Pod to a *Pod. -func ToPod(skipCleanup bool) ToFunc { - return func(obj interface{}) (interface{}, error) { - apiPod, ok := obj.(*api.Pod) - if !ok { - return nil, fmt.Errorf("unexpected object %v", obj) - } - pod := toPod(skipCleanup, apiPod) - t := apiPod.ObjectMeta.DeletionTimestamp - if t != nil && !(*t).Time.IsZero() { - // if the pod is in the process of termination, return an error so it can be ignored - // during add/update event processing - return pod, errPodTerminating - } - return pod, nil +// ToPod converts an api.Pod to a *Pod. +func ToPod(obj meta.Object) (meta.Object, error) { + apiPod, ok := obj.(*api.Pod) + if !ok { + return nil, fmt.Errorf("unexpected object %v", obj) } -} - -func toPod(skipCleanup bool, pod *api.Pod) *Pod { - p := &Pod{ - Version: pod.GetResourceVersion(), - PodIP: pod.Status.PodIP, - Namespace: pod.GetNamespace(), - Name: pod.GetName(), + pod := &Pod{ + Version: apiPod.GetResourceVersion(), + PodIP: apiPod.Status.PodIP, + Namespace: apiPod.GetNamespace(), + Name: apiPod.GetName(), } - - if !skipCleanup { - *pod = api.Pod{} + t := apiPod.ObjectMeta.DeletionTimestamp + if t != nil && !(*t).Time.IsZero() { + // if the pod is in the process of termination, return an error so it can be ignored + // during add/update event processing + return pod, errPodTerminating } - return p + *apiPod = api.Pod{} + + return pod, nil } var _ runtime.Object = &Pod{} diff --git a/plugin/kubernetes/object/service.go b/plugin/kubernetes/object/service.go index de84cf941..be1404ea0 100644 --- a/plugin/kubernetes/object/service.go +++ b/plugin/kubernetes/object/service.go @@ -4,6 +4,7 @@ import ( "fmt" api "k8s.io/api/core/v1" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" ) @@ -28,18 +29,12 @@ type Service struct { // ServiceKey returns a string using for the index. func ServiceKey(name, namespace string) string { return name + "." + namespace } -// ToService returns a function that converts an api.Service to a *Service. -func ToService(skipCleanup bool) ToFunc { - return func(obj interface{}) (interface{}, error) { - svc, ok := obj.(*api.Service) - if !ok { - return nil, fmt.Errorf("unexpected object %v", obj) - } - return toService(skipCleanup, svc), nil +// ToService converts an api.Service to a *Service. +func ToService(obj meta.Object) (meta.Object, error) { + svc, ok := obj.(*api.Service) + if !ok { + return nil, fmt.Errorf("unexpected object %v", obj) } -} - -func toService(skipCleanup bool, svc *api.Service) *Service { s := &Service{ Version: svc.GetResourceVersion(), Name: svc.GetName(), @@ -70,11 +65,9 @@ func toService(skipCleanup bool, svc *api.Service) *Service { } - if !skipCleanup { - *svc = api.Service{} - } + *svc = api.Service{} - return s + return s, nil } var _ runtime.Object = &Service{} -- cgit v1.2.3