diff options
Diffstat (limited to 'plugin/kubernetes/object')
-rw-r--r-- | plugin/kubernetes/object/endpoint.go | 8 | ||||
-rw-r--r-- | plugin/kubernetes/object/informer.go | 21 | ||||
-rw-r--r-- | plugin/kubernetes/object/object.go | 6 | ||||
-rw-r--r-- | plugin/kubernetes/object/pod.go | 14 | ||||
-rw-r--r-- | plugin/kubernetes/object/service.go | 14 |
5 files changed, 43 insertions, 20 deletions
diff --git a/plugin/kubernetes/object/endpoint.go b/plugin/kubernetes/object/endpoint.go index b2c77fc10..c7d6b7323 100644 --- a/plugin/kubernetes/object/endpoint.go +++ b/plugin/kubernetes/object/endpoint.go @@ -44,10 +44,10 @@ type EndpointPort struct { func EndpointsKey(name, namespace string) string { return name + "." + namespace } // ToEndpoints converts an api.Endpoints to a *Endpoints. -func ToEndpoints(obj interface{}) interface{} { +func ToEndpoints(obj interface{}) (*api.Endpoints, *Endpoints) { end, ok := obj.(*api.Endpoints) if !ok { - return nil + return nil, nil } e := &Endpoints{ @@ -93,9 +93,7 @@ func ToEndpoints(obj interface{}) interface{} { } } - *end = api.Endpoints{} - - return e + return end, e } // CopyWithoutSubsets copies e, without the subsets. diff --git a/plugin/kubernetes/object/informer.go b/plugin/kubernetes/object/informer.go index 919fce12d..bd4d05d30 100644 --- a/plugin/kubernetes/object/informer.go +++ b/plugin/kubernetes/object/informer.go @@ -5,19 +5,25 @@ import ( "k8s.io/client-go/tools/cache" ) -// NewIndexerInformer is a copy of the cache.NewIndexInformer function, but allows Process to have a conversion function (ToFunc). -func NewIndexerInformer(lw cache.ListerWatcher, objType runtime.Object, h cache.ResourceEventHandler, indexers cache.Indexers, convert ToFunc) (cache.Indexer, cache.Controller) { +// NewIndexerInformer is a copy of the cache.NewIndexerInformer function, but allows custom process function +func NewIndexerInformer(lw cache.ListerWatcher, objType runtime.Object, h cache.ResourceEventHandler, indexers cache.Indexers, builder ProcessorBuilder) (cache.Indexer, cache.Controller) { clientState := cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, indexers) - fifo := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, clientState) - cfg := &cache.Config{ - Queue: fifo, + Queue: cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, clientState), ListerWatcher: lw, ObjectType: objType, FullResyncPeriod: defaultResyncPeriod, RetryOnError: false, - Process: func(obj interface{}) error { + Process: builder(clientState, h), + } + return clientState, cache.New(cfg) +} + +// DefaultProcessor is a copy of Process function from cache.NewIndexerInformer except it does a conversion. +func DefaultProcessor(convert ToFunc) ProcessorBuilder { + return func(clientState cache.Indexer, h cache.ResourceEventHandler) cache.ProcessFunc { + return func(obj interface{}) error { for _, d := range obj.(cache.Deltas) { obj := convert(d.Object) @@ -43,9 +49,8 @@ func NewIndexerInformer(lw cache.ListerWatcher, objType runtime.Object, h cache. } } return nil - }, + } } - return clientState, cache.New(cfg) } const defaultResyncPeriod = 0 diff --git a/plugin/kubernetes/object/object.go b/plugin/kubernetes/object/object.go index 6b1c7d839..132b5be6c 100644 --- a/plugin/kubernetes/object/object.go +++ b/plugin/kubernetes/object/object.go @@ -16,14 +16,18 @@ package object import ( - "k8s.io/apimachinery/pkg/apis/meta/v1" + v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/tools/cache" ) // ToFunc converts one empty interface to another. type ToFunc func(interface{}) interface{} +// ProcessorBuilder returns function to process cache events. +type ProcessorBuilder func(cache.Indexer, cache.ResourceEventHandler) cache.ProcessFunc + // Empty is an empty struct. type Empty struct{} diff --git a/plugin/kubernetes/object/pod.go b/plugin/kubernetes/object/pod.go index 072d8d56d..9fc9b5726 100644 --- a/plugin/kubernetes/object/pod.go +++ b/plugin/kubernetes/object/pod.go @@ -16,8 +16,14 @@ type Pod struct { *Empty } -// ToPod converts an api.Pod to a *Pod. -func ToPod(obj interface{}) interface{} { +// ToPod returns a function that converts an api.Pod to a *Pod. +func ToPod(skipCleanup bool) ToFunc { + return func(obj interface{}) interface{} { + return toPod(skipCleanup, obj) + } +} + +func toPod(skipCleanup bool, obj interface{}) interface{} { pod, ok := obj.(*api.Pod) if !ok { return nil @@ -35,7 +41,9 @@ func ToPod(obj interface{}) interface{} { return nil } - *pod = api.Pod{} + if !skipCleanup { + *pod = api.Pod{} + } return p } diff --git a/plugin/kubernetes/object/service.go b/plugin/kubernetes/object/service.go index a41100ab9..295715e2d 100644 --- a/plugin/kubernetes/object/service.go +++ b/plugin/kubernetes/object/service.go @@ -26,8 +26,14 @@ type Service struct { // ServiceKey return a string using for the index. func ServiceKey(name, namespace string) string { return name + "." + namespace } -// ToService converts an api.Service to a *Service. -func ToService(obj interface{}) interface{} { +// ToService returns a function that converts an api.Service to a *Service. +func ToService(skipCleanup bool) ToFunc { + return func(obj interface{}) interface{} { + return toService(skipCleanup, obj) + } +} + +func toService(skipCleanup bool, obj interface{}) interface{} { svc, ok := obj.(*api.Service) if !ok { return nil @@ -58,7 +64,9 @@ func ToService(obj interface{}) interface{} { s.ExternalIPs[li+i] = lb.IP } - *svc = api.Service{} + if !skipCleanup { + *svc = api.Service{} + } return s } |