aboutsummaryrefslogtreecommitdiff
path: root/plugin
diff options
context:
space:
mode:
Diffstat (limited to 'plugin')
-rw-r--r--plugin/kubernetes/controller.go35
-rw-r--r--plugin/kubernetes/object/endpoint.go9
2 files changed, 31 insertions, 13 deletions
diff --git a/plugin/kubernetes/controller.go b/plugin/kubernetes/controller.go
index 2964f80bb..01cce28f2 100644
--- a/plugin/kubernetes/controller.go
+++ b/plugin/kubernetes/controller.go
@@ -141,11 +141,14 @@ func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, opts
func(clientState cache.Indexer, h cache.ResourceEventHandler) cache.ProcessFunc {
return func(obj interface{}) error {
for _, d := range obj.(cache.Deltas) {
-
- apiEndpoints, obj := object.ToEndpoints(d.Object)
-
switch d.Type {
case cache.Sync, cache.Added, cache.Updated:
+ apiEndpoints, ok := d.Object.(*api.Endpoints)
+ if !ok {
+ return errors.New("got non-endpoint add/update")
+ }
+ obj := object.ToEndpoints(apiEndpoints)
+
if old, exists, err := clientState.Get(obj); err == nil && exists {
if err := clientState.Update(obj); err != nil {
return err
@@ -163,17 +166,37 @@ func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, opts
h.OnAdd(d.Object)
dns.updateModifed()
recordDNSProgrammingLatency(dns.getServices(obj), apiEndpoints)
+ if !opts.skipAPIObjectsCleanup {
+ *apiEndpoints = api.Endpoints{}
+ }
}
case cache.Deleted:
+ apiEndpoints, ok := d.Object.(*api.Endpoints)
+ if !ok {
+ // Assume that the object must be a cache.DeletedFinalStateUnknown.
+ // This is essentially an indicator that the Endpoint was deleted, without a containing a
+ // up-to date copy of the Endpoints object. We need to use cache.DeletedFinalStateUnknown
+ // object so it can be properly deleted by store.Delete() below, which knows how to handle it.
+ tombstone, ok := d.Object.(cache.DeletedFinalStateUnknown)
+ if !ok {
+ return errors.New("expected tombstone")
+ }
+ apiEndpoints, ok = tombstone.Obj.(*api.Endpoints)
+ if !ok {
+ return errors.New("got non-endpoint tombstone")
+ }
+ }
+ obj := object.ToEndpoints(apiEndpoints)
+
if err := clientState.Delete(obj); err != nil {
return err
}
h.OnDelete(d.Object)
dns.updateModifed()
recordDNSProgrammingLatency(dns.getServices(obj), apiEndpoints)
- }
- if !opts.skipAPIObjectsCleanup {
- *apiEndpoints = api.Endpoints{}
+ if !opts.skipAPIObjectsCleanup {
+ *apiEndpoints = api.Endpoints{}
+ }
}
}
return nil
diff --git a/plugin/kubernetes/object/endpoint.go b/plugin/kubernetes/object/endpoint.go
index c7d6b7323..2a7d69acf 100644
--- a/plugin/kubernetes/object/endpoint.go
+++ b/plugin/kubernetes/object/endpoint.go
@@ -44,12 +44,7 @@ type EndpointPort struct {
func EndpointsKey(name, namespace string) string { return name + "." + namespace }
// ToEndpoints converts an api.Endpoints to a *Endpoints.
-func ToEndpoints(obj interface{}) (*api.Endpoints, *Endpoints) {
- end, ok := obj.(*api.Endpoints)
- if !ok {
- return nil, nil
- }
-
+func ToEndpoints(end *api.Endpoints) *Endpoints {
e := &Endpoints{
Version: end.GetResourceVersion(),
Name: end.GetName(),
@@ -93,7 +88,7 @@ func ToEndpoints(obj interface{}) (*api.Endpoints, *Endpoints) {
}
}
- return end, e
+ return e
}
// CopyWithoutSubsets copies e, without the subsets.