aboutsummaryrefslogtreecommitdiff
path: root/plugin/kubernetes
diff options
context:
space:
mode:
Diffstat (limited to 'plugin/kubernetes')
-rw-r--r--plugin/kubernetes/controller.go41
-rw-r--r--plugin/kubernetes/controller_test.go22
-rw-r--r--plugin/kubernetes/external_test.go3
-rw-r--r--plugin/kubernetes/handler_test.go2
-rw-r--r--plugin/kubernetes/kubernetes.go4
-rw-r--r--plugin/kubernetes/kubernetes_test.go2
-rw-r--r--plugin/kubernetes/metrics_test.go15
-rw-r--r--plugin/kubernetes/ns_test.go3
-rw-r--r--plugin/kubernetes/reverse_test.go2
-rw-r--r--plugin/kubernetes/setup.go3
-rw-r--r--plugin/kubernetes/watch.go18
11 files changed, 65 insertions, 50 deletions
diff --git a/plugin/kubernetes/controller.go b/plugin/kubernetes/controller.go
index 98f82341b..2964f80bb 100644
--- a/plugin/kubernetes/controller.go
+++ b/plugin/kubernetes/controller.go
@@ -1,6 +1,7 @@
package kubernetes
import (
+ "context"
"errors"
"fmt"
"sync"
@@ -34,7 +35,7 @@ type dnsController interface {
EpIndex(string) []*object.Endpoints
EpIndexReverse(string) []*object.Endpoints
- GetNodeByName(string) (*api.Node, error)
+ GetNodeByName(context.Context, string) (*api.Node, error)
GetNamespaceByName(string) (*api.Namespace, error)
Run()
@@ -94,7 +95,7 @@ type dnsControlOpts struct {
}
// newDNSController creates a controller for CoreDNS.
-func newdnsController(kubeClient kubernetes.Interface, opts dnsControlOpts) *dnsControl {
+func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, opts dnsControlOpts) *dnsControl {
dns := dnsControl{
client: kubeClient,
selector: opts.selector,
@@ -106,8 +107,8 @@ func newdnsController(kubeClient kubernetes.Interface, opts dnsControlOpts) *dns
dns.svcLister, dns.svcController = object.NewIndexerInformer(
&cache.ListWatch{
- ListFunc: serviceListFunc(dns.client, api.NamespaceAll, dns.selector),
- WatchFunc: serviceWatchFunc(dns.client, api.NamespaceAll, dns.selector),
+ ListFunc: serviceListFunc(ctx, dns.client, api.NamespaceAll, dns.selector),
+ WatchFunc: serviceWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector),
},
&api.Service{},
cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete},
@@ -118,8 +119,8 @@ func newdnsController(kubeClient kubernetes.Interface, opts dnsControlOpts) *dns
if opts.initPodCache {
dns.podLister, dns.podController = object.NewIndexerInformer(
&cache.ListWatch{
- ListFunc: podListFunc(dns.client, api.NamespaceAll, dns.selector),
- WatchFunc: podWatchFunc(dns.client, api.NamespaceAll, dns.selector),
+ ListFunc: podListFunc(ctx, dns.client, api.NamespaceAll, dns.selector),
+ WatchFunc: podWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector),
},
&api.Pod{},
cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete},
@@ -131,8 +132,8 @@ func newdnsController(kubeClient kubernetes.Interface, opts dnsControlOpts) *dns
if opts.initEndpointsCache {
dns.epLister, dns.epController = object.NewIndexerInformer(
&cache.ListWatch{
- ListFunc: endpointsListFunc(dns.client, api.NamespaceAll, dns.selector),
- WatchFunc: endpointsWatchFunc(dns.client, api.NamespaceAll, dns.selector),
+ ListFunc: endpointsListFunc(ctx, dns.client, api.NamespaceAll, dns.selector),
+ WatchFunc: endpointsWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector),
},
&api.Endpoints{},
cache.ResourceEventHandlerFuncs{},
@@ -183,8 +184,8 @@ func newdnsController(kubeClient kubernetes.Interface, opts dnsControlOpts) *dns
dns.nsLister, dns.nsController = cache.NewInformer(
&cache.ListWatch{
- ListFunc: namespaceListFunc(dns.client, dns.namespaceSelector),
- WatchFunc: namespaceWatchFunc(dns.client, dns.namespaceSelector),
+ ListFunc: namespaceListFunc(ctx, dns.client, dns.namespaceSelector),
+ WatchFunc: namespaceWatchFunc(ctx, dns.client, dns.namespaceSelector),
},
&api.Namespace{},
defaultResyncPeriod,
@@ -237,17 +238,17 @@ func epIPIndexFunc(obj interface{}) ([]string, error) {
return ep.IndexIP, nil
}
-func serviceListFunc(c kubernetes.Interface, ns string, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) {
+func serviceListFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) {
return func(opts meta.ListOptions) (runtime.Object, error) {
if s != nil {
opts.LabelSelector = s.String()
}
- listV1, err := c.CoreV1().Services(ns).List(opts)
+ listV1, err := c.CoreV1().Services(ns).List(ctx, opts)
return listV1, err
}
}
-func podListFunc(c kubernetes.Interface, ns string, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) {
+func podListFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) {
return func(opts meta.ListOptions) (runtime.Object, error) {
if s != nil {
opts.LabelSelector = s.String()
@@ -256,27 +257,27 @@ func podListFunc(c kubernetes.Interface, ns string, s labels.Selector) func(meta
opts.FieldSelector = opts.FieldSelector + ","
}
opts.FieldSelector = opts.FieldSelector + "status.phase!=Succeeded,status.phase!=Failed,status.phase!=Unknown"
- listV1, err := c.CoreV1().Pods(ns).List(opts)
+ listV1, err := c.CoreV1().Pods(ns).List(ctx, opts)
return listV1, err
}
}
-func endpointsListFunc(c kubernetes.Interface, ns string, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) {
+func endpointsListFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) {
return func(opts meta.ListOptions) (runtime.Object, error) {
if s != nil {
opts.LabelSelector = s.String()
}
- listV1, err := c.CoreV1().Endpoints(ns).List(opts)
+ listV1, err := c.CoreV1().Endpoints(ns).List(ctx, opts)
return listV1, err
}
}
-func namespaceListFunc(c kubernetes.Interface, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) {
+func namespaceListFunc(ctx context.Context, c kubernetes.Interface, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) {
return func(opts meta.ListOptions) (runtime.Object, error) {
if s != nil {
opts.LabelSelector = s.String()
}
- listV1, err := c.CoreV1().Namespaces().List(opts)
+ listV1, err := c.CoreV1().Namespaces().List(ctx, opts)
return listV1, err
}
}
@@ -428,8 +429,8 @@ func (dns *dnsControl) EpIndexReverse(ip string) (ep []*object.Endpoints) {
// GetNodeByName return the node by name. If nothing is found an error is
// returned. This query causes a roundtrip to the k8s API server, so use
// sparingly. Currently this is only used for Federation.
-func (dns *dnsControl) GetNodeByName(name string) (*api.Node, error) {
- v1node, err := dns.client.CoreV1().Nodes().Get(name, meta.GetOptions{})
+func (dns *dnsControl) GetNodeByName(ctx context.Context, name string) (*api.Node, error) {
+ v1node, err := dns.client.CoreV1().Nodes().Get(ctx, name, meta.GetOptions{})
return v1node, err
}
diff --git a/plugin/kubernetes/controller_test.go b/plugin/kubernetes/controller_test.go
index f945684cb..a6a94e06b 100644
--- a/plugin/kubernetes/controller_test.go
+++ b/plugin/kubernetes/controller_test.go
@@ -29,7 +29,8 @@ func BenchmarkController(b *testing.B) {
dco := dnsControlOpts{
zones: []string{"cluster.local."},
}
- controller := newdnsController(client, dco)
+ ctx := context.Background()
+ controller := newdnsController(ctx, client, dco)
cidr := "10.0.0.0/19"
// Add resources
@@ -39,7 +40,6 @@ func BenchmarkController(b *testing.B) {
m.SetQuestion("svc1.testns.svc.cluster.local.", dns.TypeA)
k := New([]string{"cluster.local."})
k.APIConn = controller
- ctx := context.Background()
rw := &test.ResponseWriter{}
b.ResetTimer()
@@ -70,6 +70,7 @@ func generateEndpoints(cidr string, client kubernetes.Interface) {
Namespace: "testns",
},
}
+ ctx := context.TODO()
for ip := ip.Mask(ipnet.Mask); ipnet.Contains(ip); inc(ip) {
ep.Subsets[0].Addresses = []api.EndpointAddress{
{
@@ -78,7 +79,7 @@ func generateEndpoints(cidr string, client kubernetes.Interface) {
},
}
ep.ObjectMeta.Name = "svc" + strconv.Itoa(count)
- client.CoreV1().Endpoints("testns").Create(ep)
+ client.CoreV1().Endpoints("testns").Create(ctx, ep, meta.CreateOptions{})
count++
}
}
@@ -121,7 +122,8 @@ func generateSvcs(cidr string, svcType string, client kubernetes.Interface) {
}
func createClusterIPSvc(suffix int, client kubernetes.Interface, ip net.IP) {
- client.CoreV1().Services("testns").Create(&api.Service{
+ ctx := context.TODO()
+ client.CoreV1().Services("testns").Create(ctx, &api.Service{
ObjectMeta: meta.ObjectMeta{
Name: "svc" + strconv.Itoa(suffix),
Namespace: "testns",
@@ -134,11 +136,12 @@ func createClusterIPSvc(suffix int, client kubernetes.Interface, ip net.IP) {
Port: 80,
}},
},
- })
+ }, meta.CreateOptions{})
}
func createHeadlessSvc(suffix int, client kubernetes.Interface, ip net.IP) {
- client.CoreV1().Services("testns").Create(&api.Service{
+ ctx := context.TODO()
+ client.CoreV1().Services("testns").Create(ctx, &api.Service{
ObjectMeta: meta.ObjectMeta{
Name: "hdls" + strconv.Itoa(suffix),
Namespace: "testns",
@@ -146,11 +149,12 @@ func createHeadlessSvc(suffix int, client kubernetes.Interface, ip net.IP) {
Spec: api.ServiceSpec{
ClusterIP: api.ClusterIPNone,
},
- })
+ }, meta.CreateOptions{})
}
func createExternalSvc(suffix int, client kubernetes.Interface, ip net.IP) {
- client.CoreV1().Services("testns").Create(&api.Service{
+ ctx := context.TODO()
+ client.CoreV1().Services("testns").Create(ctx, &api.Service{
ObjectMeta: meta.ObjectMeta{
Name: "external" + strconv.Itoa(suffix),
Namespace: "testns",
@@ -164,5 +168,5 @@ func createExternalSvc(suffix int, client kubernetes.Interface, ip net.IP) {
}},
Type: api.ServiceTypeExternalName,
},
- })
+ }, meta.CreateOptions{})
}
diff --git a/plugin/kubernetes/external_test.go b/plugin/kubernetes/external_test.go
index 7ccbb2798..d31b49066 100644
--- a/plugin/kubernetes/external_test.go
+++ b/plugin/kubernetes/external_test.go
@@ -1,6 +1,7 @@
package kubernetes
import (
+ "context"
"testing"
"github.com/coredns/coredns/plugin/etcd/msg"
@@ -86,7 +87,7 @@ func (external) SvcIndexReverse(string) []*object.Service { return nil }
func (external) Modified() int64 { return 0 }
func (external) EpIndex(s string) []*object.Endpoints { return nil }
func (external) EndpointsList() []*object.Endpoints { return nil }
-func (external) GetNodeByName(name string) (*api.Node, error) { return nil, nil }
+func (external) GetNodeByName(ctx context.Context, name string) (*api.Node, error) { return nil, nil }
func (external) SvcIndex(s string) []*object.Service { return svcIndexExternal[s] }
func (external) PodIndex(string) []*object.Pod { return nil }
diff --git a/plugin/kubernetes/handler_test.go b/plugin/kubernetes/handler_test.go
index 9579f8421..302136837 100644
--- a/plugin/kubernetes/handler_test.go
+++ b/plugin/kubernetes/handler_test.go
@@ -704,7 +704,7 @@ func (APIConnServeTest) EndpointsList() []*object.Endpoints {
return eps
}
-func (APIConnServeTest) GetNodeByName(name string) (*api.Node, error) {
+func (APIConnServeTest) GetNodeByName(ctx context.Context, name string) (*api.Node, error) {
return &api.Node{
ObjectMeta: meta.ObjectMeta{
Name: "test.node.foo.bar",
diff --git a/plugin/kubernetes/kubernetes.go b/plugin/kubernetes/kubernetes.go
index 47b2194a0..7805bff3d 100644
--- a/plugin/kubernetes/kubernetes.go
+++ b/plugin/kubernetes/kubernetes.go
@@ -212,7 +212,7 @@ func (k *Kubernetes) getClientConfig() (*rest.Config, error) {
}
// InitKubeCache initializes a new Kubernetes cache.
-func (k *Kubernetes) InitKubeCache() (err error) {
+func (k *Kubernetes) InitKubeCache(ctx context.Context) (err error) {
config, err := k.getClientConfig()
if err != nil {
return err
@@ -245,7 +245,7 @@ func (k *Kubernetes) InitKubeCache() (err error) {
k.opts.zones = k.Zones
k.opts.endpointNameMode = k.endpointNameMode
- k.APIConn = newdnsController(kubeClient, k.opts)
+ k.APIConn = newdnsController(ctx, kubeClient, k.opts)
return err
}
diff --git a/plugin/kubernetes/kubernetes_test.go b/plugin/kubernetes/kubernetes_test.go
index c0cf25355..eddb7645d 100644
--- a/plugin/kubernetes/kubernetes_test.go
+++ b/plugin/kubernetes/kubernetes_test.go
@@ -238,7 +238,7 @@ func (APIConnServiceTest) EndpointsList() []*object.Endpoints {
return eps
}
-func (APIConnServiceTest) GetNodeByName(name string) (*api.Node, error) {
+func (APIConnServiceTest) GetNodeByName(ctx context.Context, name string) (*api.Node, error) {
return &api.Node{
ObjectMeta: meta.ObjectMeta{
Name: "test.node.foo.bar",
diff --git a/plugin/kubernetes/metrics_test.go b/plugin/kubernetes/metrics_test.go
index 96039c62f..e1be40d4e 100644
--- a/plugin/kubernetes/metrics_test.go
+++ b/plugin/kubernetes/metrics_test.go
@@ -1,6 +1,7 @@
package kubernetes
import (
+ "context"
"strings"
"testing"
"time"
@@ -22,7 +23,8 @@ const (
func TestDnsProgrammingLatency(t *testing.T) {
client := fake.NewSimpleClientset()
now := time.Now()
- controller := newdnsController(client, dnsControlOpts{
+ ctx := context.TODO()
+ controller := newdnsController(ctx, client, dnsControlOpts{
initEndpointsCache: true,
// This is needed as otherwise the fake k8s client doesn't work properly.
skipAPIObjectsCleanup: true,
@@ -104,24 +106,27 @@ func buildEndpoints(name string, lastChangeTriggerTime interface{}, subsets []ap
}
func createEndpoints(t *testing.T, client kubernetes.Interface, name string, triggerTime interface{}, subsets []api.EndpointSubset) {
- _, err := client.CoreV1().Endpoints(namespace).Create(buildEndpoints(name, triggerTime, subsets))
+ ctx := context.TODO()
+ _, err := client.CoreV1().Endpoints(namespace).Create(ctx, buildEndpoints(name, triggerTime, subsets), meta.CreateOptions{})
if err != nil {
t.Fatal(err)
}
}
func updateEndpoints(t *testing.T, client kubernetes.Interface, name string, triggerTime interface{}, subsets []api.EndpointSubset) {
- _, err := client.CoreV1().Endpoints(namespace).Update(buildEndpoints(name, triggerTime, subsets))
+ ctx := context.TODO()
+ _, err := client.CoreV1().Endpoints(namespace).Update(ctx, buildEndpoints(name, triggerTime, subsets), meta.UpdateOptions{})
if err != nil {
t.Fatal(err)
}
}
func createService(t *testing.T, client kubernetes.Interface, controller dnsController, name string, clusterIp string) {
- if _, err := client.CoreV1().Services(namespace).Create(&api.Service{
+ ctx := context.TODO()
+ if _, err := client.CoreV1().Services(namespace).Create(ctx, &api.Service{
ObjectMeta: meta.ObjectMeta{Namespace: namespace, Name: name},
Spec: api.ServiceSpec{ClusterIP: clusterIp},
- }); err != nil {
+ }, meta.CreateOptions{}); err != nil {
t.Fatal(err)
}
if err := wait.PollImmediate(10*time.Millisecond, 10*time.Second, func() (bool, error) {
diff --git a/plugin/kubernetes/ns_test.go b/plugin/kubernetes/ns_test.go
index 19bd9b788..4e767f092 100644
--- a/plugin/kubernetes/ns_test.go
+++ b/plugin/kubernetes/ns_test.go
@@ -1,6 +1,7 @@
package kubernetes
import (
+ "context"
"net"
"testing"
@@ -102,7 +103,7 @@ func (APIConnTest) EpIndexReverse(ip string) []*object.Endpoints {
return eps
}
-func (APIConnTest) GetNodeByName(name string) (*api.Node, error) { return &api.Node{}, nil }
+func (APIConnTest) GetNodeByName(ctx context.Context, name string) (*api.Node, error) { return &api.Node{}, nil }
func (APIConnTest) GetNamespaceByName(name string) (*api.Namespace, error) {
return &api.Namespace{}, nil
}
diff --git a/plugin/kubernetes/reverse_test.go b/plugin/kubernetes/reverse_test.go
index 78bbef3a3..5869d34bc 100644
--- a/plugin/kubernetes/reverse_test.go
+++ b/plugin/kubernetes/reverse_test.go
@@ -103,7 +103,7 @@ func (APIConnReverseTest) EpIndexReverse(ip string) []*object.Endpoints {
return nil
}
-func (APIConnReverseTest) GetNodeByName(name string) (*api.Node, error) {
+func (APIConnReverseTest) GetNodeByName(ctx context.Context, name string) (*api.Node, error) {
return &api.Node{
ObjectMeta: meta.ObjectMeta{
Name: "test.node.foo.bar",
diff --git a/plugin/kubernetes/setup.go b/plugin/kubernetes/setup.go
index 2c539bd6c..0edde3bdb 100644
--- a/plugin/kubernetes/setup.go
+++ b/plugin/kubernetes/setup.go
@@ -1,6 +1,7 @@
package kubernetes
import (
+ "context"
"errors"
"fmt"
"os"
@@ -38,7 +39,7 @@ func setup(c *caddy.Controller) error {
return plugin.Error("kubernetes", err)
}
- err = k.InitKubeCache()
+ err = k.InitKubeCache(context.Background())
if err != nil {
return plugin.Error("kubernetes", err)
}
diff --git a/plugin/kubernetes/watch.go b/plugin/kubernetes/watch.go
index fd6e68c8c..d15ed4cf9 100644
--- a/plugin/kubernetes/watch.go
+++ b/plugin/kubernetes/watch.go
@@ -1,23 +1,25 @@
package kubernetes
import (
+ "context"
+
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
)
-func serviceWatchFunc(c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
+func serviceWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
return func(options meta.ListOptions) (watch.Interface, error) {
if s != nil {
options.LabelSelector = s.String()
}
- w, err := c.CoreV1().Services(ns).Watch(options)
+ w, err := c.CoreV1().Services(ns).Watch(ctx, options)
return w, err
}
}
-func podWatchFunc(c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
+func podWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
return func(options meta.ListOptions) (watch.Interface, error) {
if s != nil {
options.LabelSelector = s.String()
@@ -26,27 +28,27 @@ func podWatchFunc(c kubernetes.Interface, ns string, s labels.Selector) func(opt
options.FieldSelector = options.FieldSelector + ","
}
options.FieldSelector = options.FieldSelector + "status.phase!=Succeeded,status.phase!=Failed,status.phase!=Unknown"
- w, err := c.CoreV1().Pods(ns).Watch(options)
+ w, err := c.CoreV1().Pods(ns).Watch(ctx, options)
return w, err
}
}
-func endpointsWatchFunc(c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
+func endpointsWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
return func(options meta.ListOptions) (watch.Interface, error) {
if s != nil {
options.LabelSelector = s.String()
}
- w, err := c.CoreV1().Endpoints(ns).Watch(options)
+ w, err := c.CoreV1().Endpoints(ns).Watch(ctx, options)
return w, err
}
}
-func namespaceWatchFunc(c kubernetes.Interface, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
+func namespaceWatchFunc(ctx context.Context, c kubernetes.Interface, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) {
return func(options meta.ListOptions) (watch.Interface, error) {
if s != nil {
options.LabelSelector = s.String()
}
- w, err := c.CoreV1().Namespaces().Watch(options)
+ w, err := c.CoreV1().Namespaces().Watch(ctx, options)
return w, err
}
}