diff options
author | 2020-10-30 08:14:30 -0400 | |
---|---|---|
committer | 2020-10-30 08:14:30 -0400 | |
commit | 272ccb195d31cd1622d48f961f3a189ce3abb937 (patch) | |
tree | b5db771e2371b2e4ede772dff2c2c4217188115c /plugin/kubernetes/object | |
parent | c840caf1ef77d8f86ee7d11f644e0d6ea42c469a (diff) | |
download | coredns-272ccb195d31cd1622d48f961f3a189ce3abb937.tar.gz coredns-272ccb195d31cd1622d48f961f3a189ce3abb937.tar.zst coredns-272ccb195d31cd1622d48f961f3a189ce3abb937.zip |
plugin/kubernetes: Watch EndpointSlices (#4209)
* initial commit
Signed-off-by: Chris O'Haver <cohaver@infoblox.com>
* convert endpointslices to object.endpoints
Signed-off-by: Chris O'Haver <cohaver@infoblox.com>
* add opt hard coded for now
Signed-off-by: Chris O'Haver <cohaver@infoblox.com>
* check that server supports endpointslice
Signed-off-by: Chris O'Haver <cohaver@infoblox.com>
* fix import grouping
Signed-off-by: Chris O'Haver <cohaver@infoblox.com>
* dont use endpoint slice in 1.17 or 1.18
Signed-off-by: Chris O'Haver <cohaver@infoblox.com>
* bump kind/k8s in circle ci to latest
Signed-off-by: Chris O'Haver <cohaver@infoblox.com>
* drop k8s to latest supported by kind
Signed-off-by: Chris O'Haver <cohaver@infoblox.com>
* use endpointslice name as endoint Name; index by Service name
Signed-off-by: Chris O'Haver <cohaver@infoblox.com>
* use index key comparison in nsAddrs()
Signed-off-by: Chris O'Haver <cohaver@infoblox.com>
* add Index to object.Endpoint fixtures; fix direct endpoint name compares
Signed-off-by: Chris O'Haver <cohaver@infoblox.com>
* add slice dup check and test
Signed-off-by: Chris O'Haver <cohaver@infoblox.com>
* todo
Signed-off-by: Chris O'Haver <cohaver@infoblox.com>
* add ep-slice skew dup test for reverse
Signed-off-by: Chris O'Haver <cohaver@infoblox.com>
* nsaddrs: de-dup ep-slice skew dups; add test
Signed-off-by: Chris O'Haver <cohaver@infoblox.com>
* remove todo
Signed-off-by: Chris O'Haver <cohaver@infoblox.com>
* address various feedback
Signed-off-by: Chris O'Haver <cohaver@infoblox.com>
* consolidate endpoint/slice informer code
Signed-off-by: Chris O'Haver <cohaver@infoblox.com>
* fix endpoint informer consolidation; use clearer func name
Signed-off-by: Chris O'Haver <cohaver@infoblox.com>
* log info; use major/minor fields
Signed-off-by: Chris O'Haver <cohaver@infoblox.com>
* fix nsAddr and unit test
Signed-off-by: Chris O'Haver <cohaver@infoblox.com>
* add latency tracking for endpointslices
Signed-off-by: Chris O'Haver <cohaver@infoblox.com>
* endpointslice latency unit test & fix
Signed-off-by: Chris O'Haver <cohaver@infoblox.com>
* code shuffling
Signed-off-by: Chris O'Haver <cohaver@infoblox.com>
* rename endpointslices in tests
Signed-off-by: Chris O'Haver <cohaver@infoblox.com>
* remove de-dup from nsAddrs and test
Signed-off-by: Chris O'Haver <cohaver@infoblox.com>
* remove de-dup from findServices / test
Signed-off-by: Chris O'Haver <cohaver@infoblox.com>
Diffstat (limited to 'plugin/kubernetes/object')
-rw-r--r-- | plugin/kubernetes/object/endpoint.go | 55 | ||||
-rw-r--r-- | plugin/kubernetes/object/informer.go | 5 |
2 files changed, 58 insertions, 2 deletions
diff --git a/plugin/kubernetes/object/endpoint.go b/plugin/kubernetes/object/endpoint.go index d4c495861..304aaa861 100644 --- a/plugin/kubernetes/object/endpoint.go +++ b/plugin/kubernetes/object/endpoint.go @@ -4,6 +4,7 @@ import ( "fmt" api "k8s.io/api/core/v1" + discovery "k8s.io/api/discovery/v1beta1" "k8s.io/apimachinery/pkg/runtime" ) @@ -56,6 +57,17 @@ func ToEndpoints(skipCleanup bool) ToFunc { } } +// EndpointSliceToEndpoints returns a function that converts an *discovery.EndpointSlice to a *Endpoints. +func EndpointSliceToEndpoints(skipCleanup bool) ToFunc { + return func(obj interface{}) (interface{}, error) { + eps, ok := obj.(*discovery.EndpointSlice) + if !ok { + return nil, fmt.Errorf("unexpected object %v", obj) + } + return endpointSliceToEndpoints(skipCleanup, eps), nil + } +} + // toEndpoints converts an *api.Endpoints to a *Endpoints. func toEndpoints(skipCleanup bool, end *api.Endpoints) *Endpoints { e := &Endpoints{ @@ -108,6 +120,49 @@ func toEndpoints(skipCleanup bool, end *api.Endpoints) *Endpoints { return e } +// endpointSliceToEndpoints converts a *discovery.EndpointSlice to a *Endpoints. +func endpointSliceToEndpoints(skipCleanup bool, ends *discovery.EndpointSlice) *Endpoints { + e := &Endpoints{ + Version: ends.GetResourceVersion(), + Name: ends.GetName(), + Namespace: ends.GetNamespace(), + Index: EndpointsKey(ends.Labels[discovery.LabelServiceName], ends.GetNamespace()), + Subsets: make([]EndpointSubset, 1), + } + + if len(ends.Ports) == 0 { + // Add sentinel if there are no ports. + e.Subsets[0].Ports = []EndpointPort{{Port: -1}} + } else { + e.Subsets[0].Ports = make([]EndpointPort, len(ends.Ports)) + for k, p := range ends.Ports { + ep := EndpointPort{Port: *p.Port, Name: *p.Name, Protocol: string(*p.Protocol)} + e.Subsets[0].Ports[k] = ep + } + } + + for _, end := range ends.Endpoints { + for _, a := range end.Addresses { + ea := EndpointAddress{IP: a} + if end.Hostname != nil { + ea.Hostname = *end.Hostname + } + if end.TargetRef != nil { + ea.TargetRefName = end.TargetRef.Name + } + // EndpointSlice does not contain NodeName, leave blank + e.Subsets[0].Addresses = append(e.Subsets[0].Addresses, ea) + e.IndexIP = append(e.IndexIP, a) + } + } + + if !skipCleanup { + *ends = discovery.EndpointSlice{} + } + + return e +} + // CopyWithoutSubsets copies e, without the subsets. func (e *Endpoints) CopyWithoutSubsets() *Endpoints { e1 := &Endpoints{ diff --git a/plugin/kubernetes/object/informer.go b/plugin/kubernetes/object/informer.go index f37af4796..afd134e56 100644 --- a/plugin/kubernetes/object/informer.go +++ b/plugin/kubernetes/object/informer.go @@ -21,10 +21,11 @@ func NewIndexerInformer(lw cache.ListerWatcher, objType runtime.Object, h cache. return clientState, cache.New(cfg) } -type recordLatencyFunc func(meta.Object) +// RecordLatencyFunc is a function for recording api object delta latency +type RecordLatencyFunc func(meta.Object) // DefaultProcessor is based on the Process function from cache.NewIndexerInformer except it does a conversion. -func DefaultProcessor(convert ToFunc, recordLatency recordLatencyFunc) ProcessorBuilder { +func DefaultProcessor(convert ToFunc, recordLatency RecordLatencyFunc) ProcessorBuilder { return func(clientState cache.Indexer, h cache.ResourceEventHandler) cache.ProcessFunc { return func(obj interface{}) error { for _, d := range obj.(cache.Deltas) { |