aboutsummaryrefslogtreecommitdiff
path: root/plugin/kubernetes/object
diff options
context:
space:
mode:
Diffstat (limited to 'plugin/kubernetes/object')
-rw-r--r--plugin/kubernetes/object/endpoint.go21
-rw-r--r--plugin/kubernetes/object/informer.go27
2 files changed, 36 insertions, 12 deletions
diff --git a/plugin/kubernetes/object/endpoint.go b/plugin/kubernetes/object/endpoint.go
index 2a7d69acf..f3ce9c2d6 100644
--- a/plugin/kubernetes/object/endpoint.go
+++ b/plugin/kubernetes/object/endpoint.go
@@ -1,6 +1,8 @@
package object
import (
+ "fmt"
+
api "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
)
@@ -43,8 +45,19 @@ type EndpointPort struct {
// EndpointsKey return a string using for the index.
func EndpointsKey(name, namespace string) string { return name + "." + namespace }
-// ToEndpoints converts an api.Endpoints to a *Endpoints.
-func ToEndpoints(end *api.Endpoints) *Endpoints {
+// ToEndpoints returns a function that converts an *api.Endpoints to a *Endpoints.
+func ToEndpoints(skipCleanup bool) ToFunc {
+ return func(obj interface{}) (interface{}, error) {
+ eps, ok := obj.(*api.Endpoints)
+ if !ok {
+ return nil, fmt.Errorf("unexpected object %v", obj)
+ }
+ return toEndpoints(skipCleanup, eps), nil
+ }
+}
+
+// toEndpoints converts an *api.Endpoints to a *Endpoints.
+func toEndpoints(skipCleanup bool, end *api.Endpoints) *Endpoints {
e := &Endpoints{
Version: end.GetResourceVersion(),
Name: end.GetName(),
@@ -88,6 +101,10 @@ func ToEndpoints(end *api.Endpoints) *Endpoints {
}
}
+ if !skipCleanup {
+ *end = api.Endpoints{}
+ }
+
return e
}
diff --git a/plugin/kubernetes/object/informer.go b/plugin/kubernetes/object/informer.go
index e0d7f180c..f37af4796 100644
--- a/plugin/kubernetes/object/informer.go
+++ b/plugin/kubernetes/object/informer.go
@@ -1,6 +1,7 @@
package object
import (
+ meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/cache"
)
@@ -20,8 +21,10 @@ func NewIndexerInformer(lw cache.ListerWatcher, objType runtime.Object, h cache.
return clientState, cache.New(cfg)
}
-// DefaultProcessor is a copy of Process function from cache.NewIndexerInformer except it does a conversion.
-func DefaultProcessor(convert ToFunc) ProcessorBuilder {
+type recordLatencyFunc func(meta.Object)
+
+// DefaultProcessor is based on the Process function from cache.NewIndexerInformer except it does a conversion.
+func DefaultProcessor(convert ToFunc, recordLatency recordLatencyFunc) ProcessorBuilder {
return func(clientState cache.Indexer, h cache.ResourceEventHandler) cache.ProcessFunc {
return func(obj interface{}) error {
for _, d := range obj.(cache.Deltas) {
@@ -42,23 +45,27 @@ func DefaultProcessor(convert ToFunc) ProcessorBuilder {
}
h.OnAdd(obj)
}
+ if recordLatency != nil {
+ recordLatency(d.Object.(meta.Object))
+ }
case cache.Deleted:
var obj interface{}
- var err error
- tombstone, ok := d.Object.(cache.DeletedFinalStateUnknown)
- if ok {
- obj, err = convert(tombstone.Obj)
- } else {
+ obj, ok := d.Object.(cache.DeletedFinalStateUnknown)
+ if !ok {
+ var err error
obj, err = convert(d.Object)
- }
- if err != nil && err != errPodTerminating {
- return err
+ if err != nil && err != errPodTerminating {
+ return err
+ }
}
if err := clientState.Delete(obj); err != nil {
return err
}
h.OnDelete(obj)
+ if !ok && recordLatency != nil {
+ recordLatency(d.Object.(meta.Object))
+ }
}
}
return nil