aboutsummaryrefslogtreecommitdiff
path: root/plugin
diff options
context:
space:
mode:
authorGravatar Chris O'Haver <cohaver@infoblox.com> 2020-05-15 12:47:29 -0400
committerGravatar GitHub <noreply@github.com> 2020-05-15 09:47:29 -0700
commita3aeb3d5034be71a352f874cfe7d7d31c218059d (patch)
treec9839454a0b2e57ecd19c76af738f6943753f2f9 /plugin
parentbb7ee5010ed7eeba7aa269bdbe73b31e620c17cb (diff)
downloadcoredns-a3aeb3d5034be71a352f874cfe7d7d31c218059d.tar.gz
coredns-a3aeb3d5034be71a352f874cfe7d7d31c218059d.tar.zst
coredns-a3aeb3d5034be71a352f874cfe7d7d31c218059d.zip
plugin/kubernetes: handle tombstones in default processor (#3890)
* handle deletion tombstones in default processor Signed-off-by: Chris O'Haver <cohaver@infoblox.com> * fix terminating pod exclusion Signed-off-by: Chris O'Haver <cohaver@infoblox.com>
Diffstat (limited to 'plugin')
-rw-r--r--plugin/kubernetes/object/informer.go19
-rw-r--r--plugin/kubernetes/object/object.go2
-rw-r--r--plugin/kubernetes/object/pod.go32
-rw-r--r--plugin/kubernetes/object/service.go17
4 files changed, 45 insertions, 25 deletions
diff --git a/plugin/kubernetes/object/informer.go b/plugin/kubernetes/object/informer.go
index bd4d05d30..e0d7f180c 100644
--- a/plugin/kubernetes/object/informer.go
+++ b/plugin/kubernetes/object/informer.go
@@ -25,11 +25,12 @@ func DefaultProcessor(convert ToFunc) ProcessorBuilder {
return func(clientState cache.Indexer, h cache.ResourceEventHandler) cache.ProcessFunc {
return func(obj interface{}) error {
for _, d := range obj.(cache.Deltas) {
-
- obj := convert(d.Object)
-
switch d.Type {
case cache.Sync, cache.Added, cache.Updated:
+ obj, err := convert(d.Object)
+ if err != nil {
+ return err
+ }
if old, exists, err := clientState.Get(obj); err == nil && exists {
if err := clientState.Update(obj); err != nil {
return err
@@ -42,6 +43,18 @@ func DefaultProcessor(convert ToFunc) ProcessorBuilder {
h.OnAdd(obj)
}
case cache.Deleted:
+ var obj interface{}
+ var err error
+ tombstone, ok := d.Object.(cache.DeletedFinalStateUnknown)
+ if ok {
+ obj, err = convert(tombstone.Obj)
+ } else {
+ obj, err = convert(d.Object)
+ }
+ if err != nil && err != errPodTerminating {
+ return err
+ }
+
if err := clientState.Delete(obj); err != nil {
return err
}
diff --git a/plugin/kubernetes/object/object.go b/plugin/kubernetes/object/object.go
index d08960b11..f591f6d0a 100644
--- a/plugin/kubernetes/object/object.go
+++ b/plugin/kubernetes/object/object.go
@@ -23,7 +23,7 @@ import (
)
// ToFunc converts one empty interface to another.
-type ToFunc func(interface{}) interface{}
+type ToFunc func(interface{}) (interface{}, error)
// ProcessorBuilder returns function to process cache events.
type ProcessorBuilder func(cache.Indexer, cache.ResourceEventHandler) cache.ProcessFunc
diff --git a/plugin/kubernetes/object/pod.go b/plugin/kubernetes/object/pod.go
index 9fc9b5726..04cbe1ad2 100644
--- a/plugin/kubernetes/object/pod.go
+++ b/plugin/kubernetes/object/pod.go
@@ -1,6 +1,9 @@
package object
import (
+ "errors"
+ "fmt"
+
api "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
)
@@ -16,30 +19,33 @@ type Pod struct {
*Empty
}
+var errPodTerminating = errors.New("pod terminating")
+
// ToPod returns a function that converts an api.Pod to a *Pod.
func ToPod(skipCleanup bool) ToFunc {
- return func(obj interface{}) interface{} {
- return toPod(skipCleanup, obj)
+ return func(obj interface{}) (interface{}, error) {
+ apiPod, ok := obj.(*api.Pod)
+ if !ok {
+ return nil, fmt.Errorf("unexpected object %v", obj)
+ }
+ pod := toPod(skipCleanup, apiPod)
+ t := apiPod.ObjectMeta.DeletionTimestamp
+ if t != nil && !(*t).Time.IsZero() {
+ // if the pod is in the process of termination, return an error so it can be ignored
+ // during add/update event processing
+ return pod, errPodTerminating
+ }
+ return pod, nil
}
}
-func toPod(skipCleanup bool, obj interface{}) interface{} {
- pod, ok := obj.(*api.Pod)
- if !ok {
- return nil
- }
-
+func toPod(skipCleanup bool, pod *api.Pod) *Pod {
p := &Pod{
Version: pod.GetResourceVersion(),
PodIP: pod.Status.PodIP,
Namespace: pod.GetNamespace(),
Name: pod.GetName(),
}
- // don't add pods that are being deleted.
- t := pod.ObjectMeta.DeletionTimestamp
- if t != nil && !(*t).Time.IsZero() {
- return nil
- }
if !skipCleanup {
*pod = api.Pod{}
diff --git a/plugin/kubernetes/object/service.go b/plugin/kubernetes/object/service.go
index 295715e2d..3dc061528 100644
--- a/plugin/kubernetes/object/service.go
+++ b/plugin/kubernetes/object/service.go
@@ -1,6 +1,8 @@
package object
import (
+ "fmt"
+
api "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
)
@@ -28,17 +30,16 @@ func ServiceKey(name, namespace string) string { return name + "." + namespace }
// ToService returns a function that converts an api.Service to a *Service.
func ToService(skipCleanup bool) ToFunc {
- return func(obj interface{}) interface{} {
- return toService(skipCleanup, obj)
+ return func(obj interface{}) (interface{}, error) {
+ svc, ok := obj.(*api.Service)
+ if !ok {
+ return nil, fmt.Errorf("unexpected object %v", obj)
+ }
+ return toService(skipCleanup, svc), nil
}
}
-func toService(skipCleanup bool, obj interface{}) interface{} {
- svc, ok := obj.(*api.Service)
- if !ok {
- return nil
- }
-
+func toService(skipCleanup bool, svc *api.Service) *Service {
s := &Service{
Version: svc.GetResourceVersion(),
Name: svc.GetName(),