aboutsummaryrefslogtreecommitdiff
path: root/plugin/kubernetes/object
diff options
context:
space:
mode:
Diffstat (limited to 'plugin/kubernetes/object')
-rw-r--r--plugin/kubernetes/object/endpoint.go8
-rw-r--r--plugin/kubernetes/object/informer.go21
-rw-r--r--plugin/kubernetes/object/object.go6
-rw-r--r--plugin/kubernetes/object/pod.go14
-rw-r--r--plugin/kubernetes/object/service.go14
5 files changed, 43 insertions, 20 deletions
diff --git a/plugin/kubernetes/object/endpoint.go b/plugin/kubernetes/object/endpoint.go
index b2c77fc10..c7d6b7323 100644
--- a/plugin/kubernetes/object/endpoint.go
+++ b/plugin/kubernetes/object/endpoint.go
@@ -44,10 +44,10 @@ type EndpointPort struct {
func EndpointsKey(name, namespace string) string { return name + "." + namespace }
// ToEndpoints converts an api.Endpoints to a *Endpoints.
-func ToEndpoints(obj interface{}) interface{} {
+func ToEndpoints(obj interface{}) (*api.Endpoints, *Endpoints) {
end, ok := obj.(*api.Endpoints)
if !ok {
- return nil
+ return nil, nil
}
e := &Endpoints{
@@ -93,9 +93,7 @@ func ToEndpoints(obj interface{}) interface{} {
}
}
- *end = api.Endpoints{}
-
- return e
+ return end, e
}
// CopyWithoutSubsets copies e, without the subsets.
diff --git a/plugin/kubernetes/object/informer.go b/plugin/kubernetes/object/informer.go
index 919fce12d..bd4d05d30 100644
--- a/plugin/kubernetes/object/informer.go
+++ b/plugin/kubernetes/object/informer.go
@@ -5,19 +5,25 @@ import (
"k8s.io/client-go/tools/cache"
)
-// NewIndexerInformer is a copy of the cache.NewIndexInformer function, but allows Process to have a conversion function (ToFunc).
-func NewIndexerInformer(lw cache.ListerWatcher, objType runtime.Object, h cache.ResourceEventHandler, indexers cache.Indexers, convert ToFunc) (cache.Indexer, cache.Controller) {
+// NewIndexerInformer is a copy of the cache.NewIndexerInformer function, but allows custom process function
+func NewIndexerInformer(lw cache.ListerWatcher, objType runtime.Object, h cache.ResourceEventHandler, indexers cache.Indexers, builder ProcessorBuilder) (cache.Indexer, cache.Controller) {
clientState := cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, indexers)
- fifo := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, clientState)
-
cfg := &cache.Config{
- Queue: fifo,
+ Queue: cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, clientState),
ListerWatcher: lw,
ObjectType: objType,
FullResyncPeriod: defaultResyncPeriod,
RetryOnError: false,
- Process: func(obj interface{}) error {
+ Process: builder(clientState, h),
+ }
+ return clientState, cache.New(cfg)
+}
+
+// DefaultProcessor is a copy of Process function from cache.NewIndexerInformer except it does a conversion.
+func DefaultProcessor(convert ToFunc) ProcessorBuilder {
+ return func(clientState cache.Indexer, h cache.ResourceEventHandler) cache.ProcessFunc {
+ return func(obj interface{}) error {
for _, d := range obj.(cache.Deltas) {
obj := convert(d.Object)
@@ -43,9 +49,8 @@ func NewIndexerInformer(lw cache.ListerWatcher, objType runtime.Object, h cache.
}
}
return nil
- },
+ }
}
- return clientState, cache.New(cfg)
}
const defaultResyncPeriod = 0
diff --git a/plugin/kubernetes/object/object.go b/plugin/kubernetes/object/object.go
index 6b1c7d839..132b5be6c 100644
--- a/plugin/kubernetes/object/object.go
+++ b/plugin/kubernetes/object/object.go
@@ -16,14 +16,18 @@
package object
import (
- "k8s.io/apimachinery/pkg/apis/meta/v1"
+ v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
+ "k8s.io/client-go/tools/cache"
)
// ToFunc converts one empty interface to another.
type ToFunc func(interface{}) interface{}
+// ProcessorBuilder returns function to process cache events.
+type ProcessorBuilder func(cache.Indexer, cache.ResourceEventHandler) cache.ProcessFunc
+
// Empty is an empty struct.
type Empty struct{}
diff --git a/plugin/kubernetes/object/pod.go b/plugin/kubernetes/object/pod.go
index 072d8d56d..9fc9b5726 100644
--- a/plugin/kubernetes/object/pod.go
+++ b/plugin/kubernetes/object/pod.go
@@ -16,8 +16,14 @@ type Pod struct {
*Empty
}
-// ToPod converts an api.Pod to a *Pod.
-func ToPod(obj interface{}) interface{} {
+// ToPod returns a function that converts an api.Pod to a *Pod.
+func ToPod(skipCleanup bool) ToFunc {
+ return func(obj interface{}) interface{} {
+ return toPod(skipCleanup, obj)
+ }
+}
+
+func toPod(skipCleanup bool, obj interface{}) interface{} {
pod, ok := obj.(*api.Pod)
if !ok {
return nil
@@ -35,7 +41,9 @@ func ToPod(obj interface{}) interface{} {
return nil
}
- *pod = api.Pod{}
+ if !skipCleanup {
+ *pod = api.Pod{}
+ }
return p
}
diff --git a/plugin/kubernetes/object/service.go b/plugin/kubernetes/object/service.go
index a41100ab9..295715e2d 100644
--- a/plugin/kubernetes/object/service.go
+++ b/plugin/kubernetes/object/service.go
@@ -26,8 +26,14 @@ type Service struct {
// ServiceKey return a string using for the index.
func ServiceKey(name, namespace string) string { return name + "." + namespace }
-// ToService converts an api.Service to a *Service.
-func ToService(obj interface{}) interface{} {
+// ToService returns a function that converts an api.Service to a *Service.
+func ToService(skipCleanup bool) ToFunc {
+ return func(obj interface{}) interface{} {
+ return toService(skipCleanup, obj)
+ }
+}
+
+func toService(skipCleanup bool, obj interface{}) interface{} {
svc, ok := obj.(*api.Service)
if !ok {
return nil
@@ -58,7 +64,9 @@ func ToService(obj interface{}) interface{} {
s.ExternalIPs[li+i] = lb.IP
}
- *svc = api.Service{}
+ if !skipCleanup {
+ *svc = api.Service{}
+ }
return s
}