diff options
Diffstat (limited to 'plugin/kubernetes/object')
-rw-r--r-- | plugin/kubernetes/object/endpoint.go | 49 | ||||
-rw-r--r-- | plugin/kubernetes/object/informer.go | 21 | ||||
-rw-r--r-- | plugin/kubernetes/object/metrics.go | 82 | ||||
-rw-r--r-- | plugin/kubernetes/object/object.go | 4 | ||||
-rw-r--r-- | plugin/kubernetes/object/pod.go | 46 | ||||
-rw-r--r-- | plugin/kubernetes/object/service.go | 23 |
6 files changed, 143 insertions, 82 deletions
diff --git a/plugin/kubernetes/object/endpoint.go b/plugin/kubernetes/object/endpoint.go index 304aaa861..09429e0b2 100644 --- a/plugin/kubernetes/object/endpoint.go +++ b/plugin/kubernetes/object/endpoint.go @@ -5,6 +5,7 @@ import ( api "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1beta1" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" ) @@ -46,30 +47,12 @@ type EndpointPort struct { // EndpointsKey returns a string using for the index. func EndpointsKey(name, namespace string) string { return name + "." + namespace } -// ToEndpoints returns a function that converts an *api.Endpoints to a *Endpoints. -func ToEndpoints(skipCleanup bool) ToFunc { - return func(obj interface{}) (interface{}, error) { - eps, ok := obj.(*api.Endpoints) - if !ok { - return nil, fmt.Errorf("unexpected object %v", obj) - } - return toEndpoints(skipCleanup, eps), nil - } -} - -// EndpointSliceToEndpoints returns a function that converts an *discovery.EndpointSlice to a *Endpoints. -func EndpointSliceToEndpoints(skipCleanup bool) ToFunc { - return func(obj interface{}) (interface{}, error) { - eps, ok := obj.(*discovery.EndpointSlice) - if !ok { - return nil, fmt.Errorf("unexpected object %v", obj) - } - return endpointSliceToEndpoints(skipCleanup, eps), nil - } -} - // toEndpoints converts an *api.Endpoints to a *Endpoints. -func toEndpoints(skipCleanup bool, end *api.Endpoints) *Endpoints { +func ToEndpoints(obj meta.Object) (meta.Object, error) { + end, ok := obj.(*api.Endpoints) + if !ok { + return nil, fmt.Errorf("unexpected object %v", obj) + } e := &Endpoints{ Version: end.GetResourceVersion(), Name: end.GetName(), @@ -113,15 +96,17 @@ func toEndpoints(skipCleanup bool, end *api.Endpoints) *Endpoints { } } - if !skipCleanup { - *end = api.Endpoints{} - } + *end = api.Endpoints{} - return e + return e, nil } -// endpointSliceToEndpoints converts a *discovery.EndpointSlice to a *Endpoints. -func endpointSliceToEndpoints(skipCleanup bool, ends *discovery.EndpointSlice) *Endpoints { +// EndpointSliceToEndpoints converts a *discovery.EndpointSlice to a *Endpoints. +func EndpointSliceToEndpoints(obj meta.Object) (meta.Object, error) { + ends, ok := obj.(*discovery.EndpointSlice) + if !ok { + return nil, fmt.Errorf("unexpected object %v", obj) + } e := &Endpoints{ Version: ends.GetResourceVersion(), Name: ends.GetName(), @@ -156,11 +141,9 @@ func endpointSliceToEndpoints(skipCleanup bool, ends *discovery.EndpointSlice) * } } - if !skipCleanup { - *ends = discovery.EndpointSlice{} - } + *ends = discovery.EndpointSlice{} - return e + return e, nil } // CopyWithoutSubsets copies e, without the subsets. diff --git a/plugin/kubernetes/object/informer.go b/plugin/kubernetes/object/informer.go index afd134e56..e44cd218a 100644 --- a/plugin/kubernetes/object/informer.go +++ b/plugin/kubernetes/object/informer.go @@ -1,6 +1,8 @@ package object import ( + "fmt" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/tools/cache" @@ -25,13 +27,18 @@ func NewIndexerInformer(lw cache.ListerWatcher, objType runtime.Object, h cache. type RecordLatencyFunc func(meta.Object) // DefaultProcessor is based on the Process function from cache.NewIndexerInformer except it does a conversion. -func DefaultProcessor(convert ToFunc, recordLatency RecordLatencyFunc) ProcessorBuilder { +func DefaultProcessor(convert ToFunc, recordLatency *EndpointLatencyRecorder) ProcessorBuilder { return func(clientState cache.Indexer, h cache.ResourceEventHandler) cache.ProcessFunc { return func(obj interface{}) error { for _, d := range obj.(cache.Deltas) { + if recordLatency != nil { + if o, ok := d.Object.(meta.Object); ok { + recordLatency.init(o) + } + } switch d.Type { case cache.Sync, cache.Added, cache.Updated: - obj, err := convert(d.Object) + obj, err := convert(d.Object.(meta.Object)) if err != nil { return err } @@ -47,14 +54,18 @@ func DefaultProcessor(convert ToFunc, recordLatency RecordLatencyFunc) Processor h.OnAdd(obj) } if recordLatency != nil { - recordLatency(d.Object.(meta.Object)) + recordLatency.record() } case cache.Deleted: var obj interface{} obj, ok := d.Object.(cache.DeletedFinalStateUnknown) if !ok { var err error - obj, err = convert(d.Object) + metaObj, ok := d.Object.(meta.Object) + if !ok { + return fmt.Errorf("unexpected object %v", d.Object) + } + obj, err = convert(metaObj) if err != nil && err != errPodTerminating { return err } @@ -65,7 +76,7 @@ func DefaultProcessor(convert ToFunc, recordLatency RecordLatencyFunc) Processor } h.OnDelete(obj) if !ok && recordLatency != nil { - recordLatency(d.Object.(meta.Object)) + recordLatency.record() } } } diff --git a/plugin/kubernetes/object/metrics.go b/plugin/kubernetes/object/metrics.go new file mode 100644 index 000000000..929925cf1 --- /dev/null +++ b/plugin/kubernetes/object/metrics.go @@ -0,0 +1,82 @@ +package object + +import ( + "time" + + "github.com/coredns/coredns/plugin" + "github.com/coredns/coredns/plugin/pkg/log" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + api "k8s.io/api/core/v1" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +var ( + // DNSProgrammingLatency is defined as the time it took to program a DNS instance - from the time + // a service or pod has changed to the time the change was propagated and was available to be + // served by a DNS server. + // The definition of this SLI can be found at https://github.com/kubernetes/community/blob/master/sig-scalability/slos/dns_programming_latency.md + // Note that the metrics is partially based on the time exported by the endpoints controller on + // the master machine. The measurement may be inaccurate if there is a clock drift between the + // node and master machine. + // The service_kind label can be one of: + // * cluster_ip + // * headless_with_selector + // * headless_without_selector + DNSProgrammingLatency = promauto.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: plugin.Namespace, + Subsystem: "kubernetes", + Name: "dns_programming_duration_seconds", + // From 1 millisecond to ~17 minutes. + Buckets: prometheus.ExponentialBuckets(0.001, 2, 20), + Help: "Histogram of the time (in seconds) it took to program a dns instance.", + }, []string{"service_kind"}) + + // DurationSinceFunc returns the duration elapsed since the given time. + // Added as a global variable to allow injection for testing. + DurationSinceFunc = time.Since +) + +// EndpointLatencyRecorder records latency metric for endpoint objects +type EndpointLatencyRecorder struct { + TT time.Time + ServiceFunc func(meta.Object) []*Service + Services []*Service +} + +func (l *EndpointLatencyRecorder) init(o meta.Object) { + l.Services = l.ServiceFunc(o) + l.TT = time.Time{} + stringVal, ok := o.GetAnnotations()[api.EndpointsLastChangeTriggerTime] + if ok { + tt, err := time.Parse(time.RFC3339Nano, stringVal) + if err != nil { + log.Warningf("DnsProgrammingLatency cannot be calculated for Endpoints '%s/%s'; invalid %q annotation RFC3339 value of %q", + o.GetNamespace(), o.GetName(), api.EndpointsLastChangeTriggerTime, stringVal) + // In case of error val = time.Zero, which is ignored downstream. + } + l.TT = tt + } +} + +func (l *EndpointLatencyRecorder) record() { + // isHeadless indicates whether the endpoints object belongs to a headless + // service (i.e. clusterIp = None). Note that this can be a false negatives if the service + // informer is lagging, i.e. we may not see a recently created service. Given that the services + // don't change very often (comparing to much more frequent endpoints changes), cases when this method + // will return wrong answer should be relatively rare. Because of that we intentionally accept this + // flaw to keep the solution simple. + isHeadless := len(l.Services) == 1 && l.Services[0].ClusterIP == api.ClusterIPNone + + if !isHeadless || l.TT.IsZero() { + return + } + + // If we're here it means that the Endpoints object is for a headless service and that + // the Endpoints object was created by the endpoints-controller (because the + // LastChangeTriggerTime annotation is set). It means that the corresponding service is a + // "headless service with selector". + DNSProgrammingLatency.WithLabelValues("headless_with_selector"). + Observe(DurationSinceFunc(l.TT).Seconds()) +} diff --git a/plugin/kubernetes/object/object.go b/plugin/kubernetes/object/object.go index f591f6d0a..7111833e6 100644 --- a/plugin/kubernetes/object/object.go +++ b/plugin/kubernetes/object/object.go @@ -22,8 +22,8 @@ import ( "k8s.io/client-go/tools/cache" ) -// ToFunc converts one empty interface to another. -type ToFunc func(interface{}) (interface{}, error) +// ToFunc converts one v1.Object to another v1.Object. +type ToFunc func(v1.Object) (v1.Object, error) // ProcessorBuilder returns function to process cache events. type ProcessorBuilder func(cache.Indexer, cache.ResourceEventHandler) cache.ProcessFunc diff --git a/plugin/kubernetes/object/pod.go b/plugin/kubernetes/object/pod.go index 04cbe1ad2..9b9d5641c 100644 --- a/plugin/kubernetes/object/pod.go +++ b/plugin/kubernetes/object/pod.go @@ -5,6 +5,7 @@ import ( "fmt" api "k8s.io/api/core/v1" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" ) @@ -21,37 +22,28 @@ type Pod struct { var errPodTerminating = errors.New("pod terminating") -// ToPod returns a function that converts an api.Pod to a *Pod. -func ToPod(skipCleanup bool) ToFunc { - return func(obj interface{}) (interface{}, error) { - apiPod, ok := obj.(*api.Pod) - if !ok { - return nil, fmt.Errorf("unexpected object %v", obj) - } - pod := toPod(skipCleanup, apiPod) - t := apiPod.ObjectMeta.DeletionTimestamp - if t != nil && !(*t).Time.IsZero() { - // if the pod is in the process of termination, return an error so it can be ignored - // during add/update event processing - return pod, errPodTerminating - } - return pod, nil +// ToPod converts an api.Pod to a *Pod. +func ToPod(obj meta.Object) (meta.Object, error) { + apiPod, ok := obj.(*api.Pod) + if !ok { + return nil, fmt.Errorf("unexpected object %v", obj) } -} - -func toPod(skipCleanup bool, pod *api.Pod) *Pod { - p := &Pod{ - Version: pod.GetResourceVersion(), - PodIP: pod.Status.PodIP, - Namespace: pod.GetNamespace(), - Name: pod.GetName(), + pod := &Pod{ + Version: apiPod.GetResourceVersion(), + PodIP: apiPod.Status.PodIP, + Namespace: apiPod.GetNamespace(), + Name: apiPod.GetName(), } - - if !skipCleanup { - *pod = api.Pod{} + t := apiPod.ObjectMeta.DeletionTimestamp + if t != nil && !(*t).Time.IsZero() { + // if the pod is in the process of termination, return an error so it can be ignored + // during add/update event processing + return pod, errPodTerminating } - return p + *apiPod = api.Pod{} + + return pod, nil } var _ runtime.Object = &Pod{} diff --git a/plugin/kubernetes/object/service.go b/plugin/kubernetes/object/service.go index de84cf941..be1404ea0 100644 --- a/plugin/kubernetes/object/service.go +++ b/plugin/kubernetes/object/service.go @@ -4,6 +4,7 @@ import ( "fmt" api "k8s.io/api/core/v1" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" ) @@ -28,18 +29,12 @@ type Service struct { // ServiceKey returns a string using for the index. func ServiceKey(name, namespace string) string { return name + "." + namespace } -// ToService returns a function that converts an api.Service to a *Service. -func ToService(skipCleanup bool) ToFunc { - return func(obj interface{}) (interface{}, error) { - svc, ok := obj.(*api.Service) - if !ok { - return nil, fmt.Errorf("unexpected object %v", obj) - } - return toService(skipCleanup, svc), nil +// ToService converts an api.Service to a *Service. +func ToService(obj meta.Object) (meta.Object, error) { + svc, ok := obj.(*api.Service) + if !ok { + return nil, fmt.Errorf("unexpected object %v", obj) } -} - -func toService(skipCleanup bool, svc *api.Service) *Service { s := &Service{ Version: svc.GetResourceVersion(), Name: svc.GetName(), @@ -70,11 +65,9 @@ func toService(skipCleanup bool, svc *api.Service) *Service { } - if !skipCleanup { - *svc = api.Service{} - } + *svc = api.Service{} - return s + return s, nil } var _ runtime.Object = &Service{} |