aboutsummaryrefslogtreecommitdiff
path: root/plugin/kubernetes/object/informer.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugin/kubernetes/object/informer.go')
-rw-r--r--plugin/kubernetes/object/informer.go51
1 files changed, 51 insertions, 0 deletions
diff --git a/plugin/kubernetes/object/informer.go b/plugin/kubernetes/object/informer.go
new file mode 100644
index 000000000..9336571dc
--- /dev/null
+++ b/plugin/kubernetes/object/informer.go
@@ -0,0 +1,51 @@
+package object
+
+import (
+ "time"
+
+ "k8s.io/apimachinery/pkg/runtime"
+ "k8s.io/client-go/tools/cache"
+)
+
+// NewIndexerInformer is a copy of the cache.NewIndexInformer function, but allows Process to have a conversion function (ToFunc).
+func NewIndexerInformer(lw cache.ListerWatcher, objType runtime.Object, resyncPeriod time.Duration, h cache.ResourceEventHandler, indexers cache.Indexers, convert ToFunc) (cache.Indexer, cache.Controller) {
+ clientState := cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, indexers)
+
+ fifo := cache.NewDeltaFIFO(cache.MetaNamespaceKeyFunc, clientState)
+
+ cfg := &cache.Config{
+ Queue: fifo,
+ ListerWatcher: lw,
+ ObjectType: objType,
+ FullResyncPeriod: resyncPeriod,
+ RetryOnError: false,
+ Process: func(obj interface{}) error {
+ for _, d := range obj.(cache.Deltas) {
+
+ obj := convert(d.Object)
+
+ switch d.Type {
+ case cache.Sync, cache.Added, cache.Updated:
+ if old, exists, err := clientState.Get(obj); err == nil && exists {
+ if err := clientState.Update(obj); err != nil {
+ return err
+ }
+ h.OnUpdate(old, obj)
+ } else {
+ if err := clientState.Add(obj); err != nil {
+ return err
+ }
+ h.OnAdd(obj)
+ }
+ case cache.Deleted:
+ if err := clientState.Delete(obj); err != nil {
+ return err
+ }
+ h.OnDelete(obj)
+ }
+ }
+ return nil
+ },
+ }
+ return clientState, cache.New(cfg)
+}