diff options
Diffstat (limited to 'plugin/kubernetes/object')
-rw-r--r-- | plugin/kubernetes/object/endpoint.go | 21 | ||||
-rw-r--r-- | plugin/kubernetes/object/informer.go | 27 |
2 files changed, 36 insertions, 12 deletions
diff --git a/plugin/kubernetes/object/endpoint.go b/plugin/kubernetes/object/endpoint.go index 2a7d69acf..f3ce9c2d6 100644 --- a/plugin/kubernetes/object/endpoint.go +++ b/plugin/kubernetes/object/endpoint.go @@ -1,6 +1,8 @@ package object import ( + "fmt" + api "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" ) @@ -43,8 +45,19 @@ type EndpointPort struct { // EndpointsKey return a string using for the index. func EndpointsKey(name, namespace string) string { return name + "." + namespace } -// ToEndpoints converts an api.Endpoints to a *Endpoints. -func ToEndpoints(end *api.Endpoints) *Endpoints { +// ToEndpoints returns a function that converts an *api.Endpoints to a *Endpoints. +func ToEndpoints(skipCleanup bool) ToFunc { + return func(obj interface{}) (interface{}, error) { + eps, ok := obj.(*api.Endpoints) + if !ok { + return nil, fmt.Errorf("unexpected object %v", obj) + } + return toEndpoints(skipCleanup, eps), nil + } +} + +// toEndpoints converts an *api.Endpoints to a *Endpoints. +func toEndpoints(skipCleanup bool, end *api.Endpoints) *Endpoints { e := &Endpoints{ Version: end.GetResourceVersion(), Name: end.GetName(), @@ -88,6 +101,10 @@ func ToEndpoints(end *api.Endpoints) *Endpoints { } } + if !skipCleanup { + *end = api.Endpoints{} + } + return e } diff --git a/plugin/kubernetes/object/informer.go b/plugin/kubernetes/object/informer.go index e0d7f180c..f37af4796 100644 --- a/plugin/kubernetes/object/informer.go +++ b/plugin/kubernetes/object/informer.go @@ -1,6 +1,7 @@ package object import ( + meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/tools/cache" ) @@ -20,8 +21,10 @@ func NewIndexerInformer(lw cache.ListerWatcher, objType runtime.Object, h cache. return clientState, cache.New(cfg) } -// DefaultProcessor is a copy of Process function from cache.NewIndexerInformer except it does a conversion. -func DefaultProcessor(convert ToFunc) ProcessorBuilder { +type recordLatencyFunc func(meta.Object) + +// DefaultProcessor is based on the Process function from cache.NewIndexerInformer except it does a conversion. +func DefaultProcessor(convert ToFunc, recordLatency recordLatencyFunc) ProcessorBuilder { return func(clientState cache.Indexer, h cache.ResourceEventHandler) cache.ProcessFunc { return func(obj interface{}) error { for _, d := range obj.(cache.Deltas) { @@ -42,23 +45,27 @@ func DefaultProcessor(convert ToFunc) ProcessorBuilder { } h.OnAdd(obj) } + if recordLatency != nil { + recordLatency(d.Object.(meta.Object)) + } case cache.Deleted: var obj interface{} - var err error - tombstone, ok := d.Object.(cache.DeletedFinalStateUnknown) - if ok { - obj, err = convert(tombstone.Obj) - } else { + obj, ok := d.Object.(cache.DeletedFinalStateUnknown) + if !ok { + var err error obj, err = convert(d.Object) - } - if err != nil && err != errPodTerminating { - return err + if err != nil && err != errPodTerminating { + return err + } } if err := clientState.Delete(obj); err != nil { return err } h.OnDelete(obj) + if !ok && recordLatency != nil { + recordLatency(d.Object.(meta.Object)) + } } } return nil |