diff options
Diffstat (limited to 'plugin/kubernetes')
-rw-r--r-- | plugin/kubernetes/controller.go | 41 | ||||
-rw-r--r-- | plugin/kubernetes/controller_test.go | 22 | ||||
-rw-r--r-- | plugin/kubernetes/external_test.go | 3 | ||||
-rw-r--r-- | plugin/kubernetes/handler_test.go | 2 | ||||
-rw-r--r-- | plugin/kubernetes/kubernetes.go | 4 | ||||
-rw-r--r-- | plugin/kubernetes/kubernetes_test.go | 2 | ||||
-rw-r--r-- | plugin/kubernetes/metrics_test.go | 15 | ||||
-rw-r--r-- | plugin/kubernetes/ns_test.go | 3 | ||||
-rw-r--r-- | plugin/kubernetes/reverse_test.go | 2 | ||||
-rw-r--r-- | plugin/kubernetes/setup.go | 3 | ||||
-rw-r--r-- | plugin/kubernetes/watch.go | 18 |
11 files changed, 65 insertions, 50 deletions
diff --git a/plugin/kubernetes/controller.go b/plugin/kubernetes/controller.go index 98f82341b..2964f80bb 100644 --- a/plugin/kubernetes/controller.go +++ b/plugin/kubernetes/controller.go @@ -1,6 +1,7 @@ package kubernetes import ( + "context" "errors" "fmt" "sync" @@ -34,7 +35,7 @@ type dnsController interface { EpIndex(string) []*object.Endpoints EpIndexReverse(string) []*object.Endpoints - GetNodeByName(string) (*api.Node, error) + GetNodeByName(context.Context, string) (*api.Node, error) GetNamespaceByName(string) (*api.Namespace, error) Run() @@ -94,7 +95,7 @@ type dnsControlOpts struct { } // newDNSController creates a controller for CoreDNS. -func newdnsController(kubeClient kubernetes.Interface, opts dnsControlOpts) *dnsControl { +func newdnsController(ctx context.Context, kubeClient kubernetes.Interface, opts dnsControlOpts) *dnsControl { dns := dnsControl{ client: kubeClient, selector: opts.selector, @@ -106,8 +107,8 @@ func newdnsController(kubeClient kubernetes.Interface, opts dnsControlOpts) *dns dns.svcLister, dns.svcController = object.NewIndexerInformer( &cache.ListWatch{ - ListFunc: serviceListFunc(dns.client, api.NamespaceAll, dns.selector), - WatchFunc: serviceWatchFunc(dns.client, api.NamespaceAll, dns.selector), + ListFunc: serviceListFunc(ctx, dns.client, api.NamespaceAll, dns.selector), + WatchFunc: serviceWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector), }, &api.Service{}, cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete}, @@ -118,8 +119,8 @@ func newdnsController(kubeClient kubernetes.Interface, opts dnsControlOpts) *dns if opts.initPodCache { dns.podLister, dns.podController = object.NewIndexerInformer( &cache.ListWatch{ - ListFunc: podListFunc(dns.client, api.NamespaceAll, dns.selector), - WatchFunc: podWatchFunc(dns.client, api.NamespaceAll, dns.selector), + ListFunc: podListFunc(ctx, dns.client, api.NamespaceAll, dns.selector), + WatchFunc: podWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector), }, &api.Pod{}, cache.ResourceEventHandlerFuncs{AddFunc: dns.Add, UpdateFunc: dns.Update, DeleteFunc: dns.Delete}, @@ -131,8 +132,8 @@ func newdnsController(kubeClient kubernetes.Interface, opts dnsControlOpts) *dns if opts.initEndpointsCache { dns.epLister, dns.epController = object.NewIndexerInformer( &cache.ListWatch{ - ListFunc: endpointsListFunc(dns.client, api.NamespaceAll, dns.selector), - WatchFunc: endpointsWatchFunc(dns.client, api.NamespaceAll, dns.selector), + ListFunc: endpointsListFunc(ctx, dns.client, api.NamespaceAll, dns.selector), + WatchFunc: endpointsWatchFunc(ctx, dns.client, api.NamespaceAll, dns.selector), }, &api.Endpoints{}, cache.ResourceEventHandlerFuncs{}, @@ -183,8 +184,8 @@ func newdnsController(kubeClient kubernetes.Interface, opts dnsControlOpts) *dns dns.nsLister, dns.nsController = cache.NewInformer( &cache.ListWatch{ - ListFunc: namespaceListFunc(dns.client, dns.namespaceSelector), - WatchFunc: namespaceWatchFunc(dns.client, dns.namespaceSelector), + ListFunc: namespaceListFunc(ctx, dns.client, dns.namespaceSelector), + WatchFunc: namespaceWatchFunc(ctx, dns.client, dns.namespaceSelector), }, &api.Namespace{}, defaultResyncPeriod, @@ -237,17 +238,17 @@ func epIPIndexFunc(obj interface{}) ([]string, error) { return ep.IndexIP, nil } -func serviceListFunc(c kubernetes.Interface, ns string, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) { +func serviceListFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) { return func(opts meta.ListOptions) (runtime.Object, error) { if s != nil { opts.LabelSelector = s.String() } - listV1, err := c.CoreV1().Services(ns).List(opts) + listV1, err := c.CoreV1().Services(ns).List(ctx, opts) return listV1, err } } -func podListFunc(c kubernetes.Interface, ns string, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) { +func podListFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) { return func(opts meta.ListOptions) (runtime.Object, error) { if s != nil { opts.LabelSelector = s.String() @@ -256,27 +257,27 @@ func podListFunc(c kubernetes.Interface, ns string, s labels.Selector) func(meta opts.FieldSelector = opts.FieldSelector + "," } opts.FieldSelector = opts.FieldSelector + "status.phase!=Succeeded,status.phase!=Failed,status.phase!=Unknown" - listV1, err := c.CoreV1().Pods(ns).List(opts) + listV1, err := c.CoreV1().Pods(ns).List(ctx, opts) return listV1, err } } -func endpointsListFunc(c kubernetes.Interface, ns string, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) { +func endpointsListFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) { return func(opts meta.ListOptions) (runtime.Object, error) { if s != nil { opts.LabelSelector = s.String() } - listV1, err := c.CoreV1().Endpoints(ns).List(opts) + listV1, err := c.CoreV1().Endpoints(ns).List(ctx, opts) return listV1, err } } -func namespaceListFunc(c kubernetes.Interface, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) { +func namespaceListFunc(ctx context.Context, c kubernetes.Interface, s labels.Selector) func(meta.ListOptions) (runtime.Object, error) { return func(opts meta.ListOptions) (runtime.Object, error) { if s != nil { opts.LabelSelector = s.String() } - listV1, err := c.CoreV1().Namespaces().List(opts) + listV1, err := c.CoreV1().Namespaces().List(ctx, opts) return listV1, err } } @@ -428,8 +429,8 @@ func (dns *dnsControl) EpIndexReverse(ip string) (ep []*object.Endpoints) { // GetNodeByName return the node by name. If nothing is found an error is // returned. This query causes a roundtrip to the k8s API server, so use // sparingly. Currently this is only used for Federation. -func (dns *dnsControl) GetNodeByName(name string) (*api.Node, error) { - v1node, err := dns.client.CoreV1().Nodes().Get(name, meta.GetOptions{}) +func (dns *dnsControl) GetNodeByName(ctx context.Context, name string) (*api.Node, error) { + v1node, err := dns.client.CoreV1().Nodes().Get(ctx, name, meta.GetOptions{}) return v1node, err } diff --git a/plugin/kubernetes/controller_test.go b/plugin/kubernetes/controller_test.go index f945684cb..a6a94e06b 100644 --- a/plugin/kubernetes/controller_test.go +++ b/plugin/kubernetes/controller_test.go @@ -29,7 +29,8 @@ func BenchmarkController(b *testing.B) { dco := dnsControlOpts{ zones: []string{"cluster.local."}, } - controller := newdnsController(client, dco) + ctx := context.Background() + controller := newdnsController(ctx, client, dco) cidr := "10.0.0.0/19" // Add resources @@ -39,7 +40,6 @@ func BenchmarkController(b *testing.B) { m.SetQuestion("svc1.testns.svc.cluster.local.", dns.TypeA) k := New([]string{"cluster.local."}) k.APIConn = controller - ctx := context.Background() rw := &test.ResponseWriter{} b.ResetTimer() @@ -70,6 +70,7 @@ func generateEndpoints(cidr string, client kubernetes.Interface) { Namespace: "testns", }, } + ctx := context.TODO() for ip := ip.Mask(ipnet.Mask); ipnet.Contains(ip); inc(ip) { ep.Subsets[0].Addresses = []api.EndpointAddress{ { @@ -78,7 +79,7 @@ func generateEndpoints(cidr string, client kubernetes.Interface) { }, } ep.ObjectMeta.Name = "svc" + strconv.Itoa(count) - client.CoreV1().Endpoints("testns").Create(ep) + client.CoreV1().Endpoints("testns").Create(ctx, ep, meta.CreateOptions{}) count++ } } @@ -121,7 +122,8 @@ func generateSvcs(cidr string, svcType string, client kubernetes.Interface) { } func createClusterIPSvc(suffix int, client kubernetes.Interface, ip net.IP) { - client.CoreV1().Services("testns").Create(&api.Service{ + ctx := context.TODO() + client.CoreV1().Services("testns").Create(ctx, &api.Service{ ObjectMeta: meta.ObjectMeta{ Name: "svc" + strconv.Itoa(suffix), Namespace: "testns", @@ -134,11 +136,12 @@ func createClusterIPSvc(suffix int, client kubernetes.Interface, ip net.IP) { Port: 80, }}, }, - }) + }, meta.CreateOptions{}) } func createHeadlessSvc(suffix int, client kubernetes.Interface, ip net.IP) { - client.CoreV1().Services("testns").Create(&api.Service{ + ctx := context.TODO() + client.CoreV1().Services("testns").Create(ctx, &api.Service{ ObjectMeta: meta.ObjectMeta{ Name: "hdls" + strconv.Itoa(suffix), Namespace: "testns", @@ -146,11 +149,12 @@ func createHeadlessSvc(suffix int, client kubernetes.Interface, ip net.IP) { Spec: api.ServiceSpec{ ClusterIP: api.ClusterIPNone, }, - }) + }, meta.CreateOptions{}) } func createExternalSvc(suffix int, client kubernetes.Interface, ip net.IP) { - client.CoreV1().Services("testns").Create(&api.Service{ + ctx := context.TODO() + client.CoreV1().Services("testns").Create(ctx, &api.Service{ ObjectMeta: meta.ObjectMeta{ Name: "external" + strconv.Itoa(suffix), Namespace: "testns", @@ -164,5 +168,5 @@ func createExternalSvc(suffix int, client kubernetes.Interface, ip net.IP) { }}, Type: api.ServiceTypeExternalName, }, - }) + }, meta.CreateOptions{}) } diff --git a/plugin/kubernetes/external_test.go b/plugin/kubernetes/external_test.go index 7ccbb2798..d31b49066 100644 --- a/plugin/kubernetes/external_test.go +++ b/plugin/kubernetes/external_test.go @@ -1,6 +1,7 @@ package kubernetes import ( + "context" "testing" "github.com/coredns/coredns/plugin/etcd/msg" @@ -86,7 +87,7 @@ func (external) SvcIndexReverse(string) []*object.Service { return nil } func (external) Modified() int64 { return 0 } func (external) EpIndex(s string) []*object.Endpoints { return nil } func (external) EndpointsList() []*object.Endpoints { return nil } -func (external) GetNodeByName(name string) (*api.Node, error) { return nil, nil } +func (external) GetNodeByName(ctx context.Context, name string) (*api.Node, error) { return nil, nil } func (external) SvcIndex(s string) []*object.Service { return svcIndexExternal[s] } func (external) PodIndex(string) []*object.Pod { return nil } diff --git a/plugin/kubernetes/handler_test.go b/plugin/kubernetes/handler_test.go index 9579f8421..302136837 100644 --- a/plugin/kubernetes/handler_test.go +++ b/plugin/kubernetes/handler_test.go @@ -704,7 +704,7 @@ func (APIConnServeTest) EndpointsList() []*object.Endpoints { return eps } -func (APIConnServeTest) GetNodeByName(name string) (*api.Node, error) { +func (APIConnServeTest) GetNodeByName(ctx context.Context, name string) (*api.Node, error) { return &api.Node{ ObjectMeta: meta.ObjectMeta{ Name: "test.node.foo.bar", diff --git a/plugin/kubernetes/kubernetes.go b/plugin/kubernetes/kubernetes.go index 47b2194a0..7805bff3d 100644 --- a/plugin/kubernetes/kubernetes.go +++ b/plugin/kubernetes/kubernetes.go @@ -212,7 +212,7 @@ func (k *Kubernetes) getClientConfig() (*rest.Config, error) { } // InitKubeCache initializes a new Kubernetes cache. -func (k *Kubernetes) InitKubeCache() (err error) { +func (k *Kubernetes) InitKubeCache(ctx context.Context) (err error) { config, err := k.getClientConfig() if err != nil { return err @@ -245,7 +245,7 @@ func (k *Kubernetes) InitKubeCache() (err error) { k.opts.zones = k.Zones k.opts.endpointNameMode = k.endpointNameMode - k.APIConn = newdnsController(kubeClient, k.opts) + k.APIConn = newdnsController(ctx, kubeClient, k.opts) return err } diff --git a/plugin/kubernetes/kubernetes_test.go b/plugin/kubernetes/kubernetes_test.go index c0cf25355..eddb7645d 100644 --- a/plugin/kubernetes/kubernetes_test.go +++ b/plugin/kubernetes/kubernetes_test.go @@ -238,7 +238,7 @@ func (APIConnServiceTest) EndpointsList() []*object.Endpoints { return eps } -func (APIConnServiceTest) GetNodeByName(name string) (*api.Node, error) { +func (APIConnServiceTest) GetNodeByName(ctx context.Context, name string) (*api.Node, error) { return &api.Node{ ObjectMeta: meta.ObjectMeta{ Name: "test.node.foo.bar", diff --git a/plugin/kubernetes/metrics_test.go b/plugin/kubernetes/metrics_test.go index 96039c62f..e1be40d4e 100644 --- a/plugin/kubernetes/metrics_test.go +++ b/plugin/kubernetes/metrics_test.go @@ -1,6 +1,7 @@ package kubernetes import ( + "context" "strings" "testing" "time" @@ -22,7 +23,8 @@ const ( func TestDnsProgrammingLatency(t *testing.T) { client := fake.NewSimpleClientset() now := time.Now() - controller := newdnsController(client, dnsControlOpts{ + ctx := context.TODO() + controller := newdnsController(ctx, client, dnsControlOpts{ initEndpointsCache: true, // This is needed as otherwise the fake k8s client doesn't work properly. skipAPIObjectsCleanup: true, @@ -104,24 +106,27 @@ func buildEndpoints(name string, lastChangeTriggerTime interface{}, subsets []ap } func createEndpoints(t *testing.T, client kubernetes.Interface, name string, triggerTime interface{}, subsets []api.EndpointSubset) { - _, err := client.CoreV1().Endpoints(namespace).Create(buildEndpoints(name, triggerTime, subsets)) + ctx := context.TODO() + _, err := client.CoreV1().Endpoints(namespace).Create(ctx, buildEndpoints(name, triggerTime, subsets), meta.CreateOptions{}) if err != nil { t.Fatal(err) } } func updateEndpoints(t *testing.T, client kubernetes.Interface, name string, triggerTime interface{}, subsets []api.EndpointSubset) { - _, err := client.CoreV1().Endpoints(namespace).Update(buildEndpoints(name, triggerTime, subsets)) + ctx := context.TODO() + _, err := client.CoreV1().Endpoints(namespace).Update(ctx, buildEndpoints(name, triggerTime, subsets), meta.UpdateOptions{}) if err != nil { t.Fatal(err) } } func createService(t *testing.T, client kubernetes.Interface, controller dnsController, name string, clusterIp string) { - if _, err := client.CoreV1().Services(namespace).Create(&api.Service{ + ctx := context.TODO() + if _, err := client.CoreV1().Services(namespace).Create(ctx, &api.Service{ ObjectMeta: meta.ObjectMeta{Namespace: namespace, Name: name}, Spec: api.ServiceSpec{ClusterIP: clusterIp}, - }); err != nil { + }, meta.CreateOptions{}); err != nil { t.Fatal(err) } if err := wait.PollImmediate(10*time.Millisecond, 10*time.Second, func() (bool, error) { diff --git a/plugin/kubernetes/ns_test.go b/plugin/kubernetes/ns_test.go index 19bd9b788..4e767f092 100644 --- a/plugin/kubernetes/ns_test.go +++ b/plugin/kubernetes/ns_test.go @@ -1,6 +1,7 @@ package kubernetes import ( + "context" "net" "testing" @@ -102,7 +103,7 @@ func (APIConnTest) EpIndexReverse(ip string) []*object.Endpoints { return eps } -func (APIConnTest) GetNodeByName(name string) (*api.Node, error) { return &api.Node{}, nil } +func (APIConnTest) GetNodeByName(ctx context.Context, name string) (*api.Node, error) { return &api.Node{}, nil } func (APIConnTest) GetNamespaceByName(name string) (*api.Namespace, error) { return &api.Namespace{}, nil } diff --git a/plugin/kubernetes/reverse_test.go b/plugin/kubernetes/reverse_test.go index 78bbef3a3..5869d34bc 100644 --- a/plugin/kubernetes/reverse_test.go +++ b/plugin/kubernetes/reverse_test.go @@ -103,7 +103,7 @@ func (APIConnReverseTest) EpIndexReverse(ip string) []*object.Endpoints { return nil } -func (APIConnReverseTest) GetNodeByName(name string) (*api.Node, error) { +func (APIConnReverseTest) GetNodeByName(ctx context.Context, name string) (*api.Node, error) { return &api.Node{ ObjectMeta: meta.ObjectMeta{ Name: "test.node.foo.bar", diff --git a/plugin/kubernetes/setup.go b/plugin/kubernetes/setup.go index 2c539bd6c..0edde3bdb 100644 --- a/plugin/kubernetes/setup.go +++ b/plugin/kubernetes/setup.go @@ -1,6 +1,7 @@ package kubernetes import ( + "context" "errors" "fmt" "os" @@ -38,7 +39,7 @@ func setup(c *caddy.Controller) error { return plugin.Error("kubernetes", err) } - err = k.InitKubeCache() + err = k.InitKubeCache(context.Background()) if err != nil { return plugin.Error("kubernetes", err) } diff --git a/plugin/kubernetes/watch.go b/plugin/kubernetes/watch.go index fd6e68c8c..d15ed4cf9 100644 --- a/plugin/kubernetes/watch.go +++ b/plugin/kubernetes/watch.go @@ -1,23 +1,25 @@ package kubernetes import ( + "context" + meta "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" ) -func serviceWatchFunc(c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) { +func serviceWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) { return func(options meta.ListOptions) (watch.Interface, error) { if s != nil { options.LabelSelector = s.String() } - w, err := c.CoreV1().Services(ns).Watch(options) + w, err := c.CoreV1().Services(ns).Watch(ctx, options) return w, err } } -func podWatchFunc(c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) { +func podWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) { return func(options meta.ListOptions) (watch.Interface, error) { if s != nil { options.LabelSelector = s.String() @@ -26,27 +28,27 @@ func podWatchFunc(c kubernetes.Interface, ns string, s labels.Selector) func(opt options.FieldSelector = options.FieldSelector + "," } options.FieldSelector = options.FieldSelector + "status.phase!=Succeeded,status.phase!=Failed,status.phase!=Unknown" - w, err := c.CoreV1().Pods(ns).Watch(options) + w, err := c.CoreV1().Pods(ns).Watch(ctx, options) return w, err } } -func endpointsWatchFunc(c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) { +func endpointsWatchFunc(ctx context.Context, c kubernetes.Interface, ns string, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) { return func(options meta.ListOptions) (watch.Interface, error) { if s != nil { options.LabelSelector = s.String() } - w, err := c.CoreV1().Endpoints(ns).Watch(options) + w, err := c.CoreV1().Endpoints(ns).Watch(ctx, options) return w, err } } -func namespaceWatchFunc(c kubernetes.Interface, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) { +func namespaceWatchFunc(ctx context.Context, c kubernetes.Interface, s labels.Selector) func(options meta.ListOptions) (watch.Interface, error) { return func(options meta.ListOptions) (watch.Interface, error) { if s != nil { options.LabelSelector = s.String() } - w, err := c.CoreV1().Namespaces().Watch(options) + w, err := c.CoreV1().Namespaces().Watch(ctx, options) return w, err } } |