diff options
author | 2017-09-29 15:58:50 -0400 | |
---|---|---|
committer | 2017-09-29 15:58:50 -0400 | |
commit | 4b3a430ff2e6a39278eebd1493936c3a8a161fa0 (patch) | |
tree | a48c18346039c9b7808ec2395eeda49f8a0e1f0c /plugin/kubernetes/controller.go | |
parent | 45b0252c1aa3c9afb1951d4185644e23805167e5 (diff) | |
download | coredns-4b3a430ff2e6a39278eebd1493936c3a8a161fa0.tar.gz coredns-4b3a430ff2e6a39278eebd1493936c3a8a161fa0.tar.zst coredns-4b3a430ff2e6a39278eebd1493936c3a8a161fa0.zip |
plugin/kubernetes: Enable protobuf, Update client api package (#1114)
* vendor
* code
Diffstat (limited to 'plugin/kubernetes/controller.go')
-rw-r--r-- | plugin/kubernetes/controller.go | 290 |
1 files changed, 82 insertions, 208 deletions
diff --git a/plugin/kubernetes/controller.go b/plugin/kubernetes/controller.go index fa2e749bb..a5b697c88 100644 --- a/plugin/kubernetes/controller.go +++ b/plugin/kubernetes/controller.go @@ -3,45 +3,31 @@ package kubernetes import ( "errors" "fmt" - "log" "sync" "time" - "k8s.io/client-go/1.5/kubernetes" - "k8s.io/client-go/1.5/pkg/api" - unversionedapi "k8s.io/client-go/1.5/pkg/api/unversioned" - "k8s.io/client-go/1.5/pkg/api/v1" - "k8s.io/client-go/1.5/pkg/labels" - "k8s.io/client-go/1.5/pkg/runtime" - "k8s.io/client-go/1.5/pkg/watch" - "k8s.io/client-go/1.5/tools/cache" + "k8s.io/client-go/kubernetes" + api "k8s.io/client-go/pkg/api/v1" + "k8s.io/client-go/tools/cache" + + meta "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" ) var ( namespace = api.NamespaceAll ) -// storeToNamespaceLister makes a Store that lists Namespaces. -type storeToNamespaceLister struct { - cache.Store -} - const podIPIndex = "PodIP" -// List lists all Namespaces in the store. -func (s *storeToNamespaceLister) List() (ns api.NamespaceList, err error) { - for _, m := range s.Store.List() { - ns.Items = append(ns.Items, *(m.(*api.Namespace))) - } - return ns, nil -} - type dnsController interface { ServiceList() []*api.Service - PodIndex(string) []interface{} - EndpointsList() api.EndpointsList + PodIndex(string) []*api.Pod + EndpointsList() []*api.Endpoints - GetNodeByName(string) (api.Node, error) + GetNodeByName(string) (*api.Node, error) Run() Stop() error @@ -52,15 +38,13 @@ type dnsControl struct { selector *labels.Selector - svcController *cache.Controller - podController *cache.Controller - nsController *cache.Controller - epController *cache.Controller + svcController cache.Controller + podController cache.Controller + epController cache.Controller - svcLister cache.StoreToServiceLister - podLister cache.StoreToPodLister - nsLister storeToNamespaceLister - epLister cache.StoreToEndpointsLister + svcLister cache.Indexer + podLister cache.Indexer + epLister cache.Store // stopLock is used to enforce only a single call to Stop is active. // Needed because we allow stopping through an http endpoint and @@ -74,7 +58,7 @@ type dnsControlOpts struct { initPodCache bool resyncPeriod time.Duration // Label handling. - labelSelector *unversionedapi.LabelSelector + labelSelector *meta.LabelSelector selector *labels.Selector } @@ -85,8 +69,7 @@ func newdnsController(kubeClient *kubernetes.Clientset, opts dnsControlOpts) *dn selector: opts.selector, stopCh: make(chan struct{}), } - - dns.svcLister.Indexer, dns.svcController = cache.NewIndexerInformer( + dns.svcLister, dns.svcController = cache.NewIndexerInformer( &cache.ListWatch{ ListFunc: serviceListFunc(dns.client, namespace, dns.selector), WatchFunc: serviceWatchFunc(dns.client, namespace, dns.selector), @@ -97,7 +80,7 @@ func newdnsController(kubeClient *kubernetes.Clientset, opts dnsControlOpts) *dn cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) if opts.initPodCache { - dns.podLister.Indexer, dns.podController = cache.NewIndexerInformer( + dns.podLister, dns.podController = cache.NewIndexerInformer( &cache.ListWatch{ ListFunc: podListFunc(dns.client, namespace, dns.selector), WatchFunc: podWatchFunc(dns.client, namespace, dns.selector), @@ -107,17 +90,7 @@ func newdnsController(kubeClient *kubernetes.Clientset, opts dnsControlOpts) *dn cache.ResourceEventHandlerFuncs{}, cache.Indexers{podIPIndex: podIPIndexFunc}) } - - dns.nsLister.Store, dns.nsController = cache.NewInformer( - &cache.ListWatch{ - ListFunc: namespaceListFunc(dns.client, dns.selector), - WatchFunc: namespaceWatchFunc(dns.client, dns.selector), - }, - &api.Namespace{}, - opts.resyncPeriod, - cache.ResourceEventHandlerFuncs{}) - - dns.epLister.Store, dns.epController = cache.NewInformer( + dns.epLister, dns.epController = cache.NewInformer( &cache.ListWatch{ ListFunc: endpointsListFunc(dns.client, namespace, dns.selector), WatchFunc: endpointsWatchFunc(dns.client, namespace, dns.selector), @@ -137,182 +110,86 @@ func podIPIndexFunc(obj interface{}) ([]string, error) { return []string{p.Status.PodIP}, nil } -func serviceListFunc(c *kubernetes.Clientset, ns string, s *labels.Selector) func(api.ListOptions) (runtime.Object, error) { - return func(opts api.ListOptions) (runtime.Object, error) { - if s != nil { - opts.LabelSelector = *s - } - listV1, err := c.Core().Services(ns).List(opts) - - if err != nil { - return nil, err - } - var listAPI api.ServiceList - err = v1.Convert_v1_ServiceList_To_api_ServiceList(listV1, &listAPI, nil) - if err != nil { - return nil, err - } - return &listAPI, err - } -} - -func podListFunc(c *kubernetes.Clientset, ns string, s *labels.Selector) func(api.ListOptions) (runtime.Object, error) { - return func(opts api.ListOptions) (runtime.Object, error) { - if s != nil { - opts.LabelSelector = *s - } - listV1, err := c.Core().Pods(ns).List(opts) - - if err != nil { - return nil, err - } - var listAPI api.PodList - err = v1.Convert_v1_PodList_To_api_PodList(listV1, &listAPI, nil) - if err != nil { - return nil, err - } - - return &listAPI, err - } -} - -func v1ToAPIFilter(in watch.Event) (out watch.Event, keep bool) { - if in.Type == watch.Error { - return in, true - } - - switch v1Obj := in.Object.(type) { - case *v1.Service: - var apiObj api.Service - err := v1.Convert_v1_Service_To_api_Service(v1Obj, &apiObj, nil) - if err != nil { - log.Printf("[ERROR] Could not convert v1.Service: %s", err) - return in, true - } - return watch.Event{Type: in.Type, Object: &apiObj}, true - case *v1.Pod: - var apiObj api.Pod - err := v1.Convert_v1_Pod_To_api_Pod(v1Obj, &apiObj, nil) - if err != nil { - log.Printf("[ERROR] Could not convert v1.Pod: %s", err) - return in, true - } - return watch.Event{Type: in.Type, Object: &apiObj}, true - case *v1.Namespace: - var apiObj api.Namespace - err := v1.Convert_v1_Namespace_To_api_Namespace(v1Obj, &apiObj, nil) - if err != nil { - log.Printf("[ERROR] Could not convert v1.Namespace: %s", err) - return in, true - } - return watch.Event{Type: in.Type, Object: &apiObj}, true - case *v1.Endpoints: - var apiObj api.Endpoints - err := v1.Convert_v1_Endpoints_To_api_Endpoints(v1Obj, &apiObj, nil) - if err != nil { - log.Printf("[ERROR] Could not convert v1.Endpoint: %s", err) - return in, true - } - return watch.Event{Type: in.Type, Object: &apiObj}, true - } - - log.Printf("[WARN] Unhandled v1 type in event: %v", in) - return in, true -} - -func serviceWatchFunc(c *kubernetes.Clientset, ns string, s *labels.Selector) func(options api.ListOptions) (watch.Interface, error) { - return func(options api.ListOptions) (watch.Interface, error) { +func serviceListFunc(c *kubernetes.Clientset, ns string, s *labels.Selector) func(meta.ListOptions) (runtime.Object, error) { + return func(opts meta.ListOptions) (runtime.Object, error) { if s != nil { - options.LabelSelector = *s + opts.LabelSelector = (*s).String() } - w, err := c.Core().Services(ns).Watch(options) + listV1, err := c.Services(ns).List(opts) if err != nil { return nil, err } - return watch.Filter(w, v1ToAPIFilter), nil + return listV1, err } } -func podWatchFunc(c *kubernetes.Clientset, ns string, s *labels.Selector) func(options api.ListOptions) (watch.Interface, error) { - return func(options api.ListOptions) (watch.Interface, error) { +func podListFunc(c *kubernetes.Clientset, ns string, s *labels.Selector) func(meta.ListOptions) (runtime.Object, error) { + return func(opts meta.ListOptions) (runtime.Object, error) { if s != nil { - options.LabelSelector = *s + opts.LabelSelector = (*s).String() } - w, err := c.Core().Pods(ns).Watch(options) - + listV1, err := c.Pods(ns).List(opts) if err != nil { return nil, err } - return watch.Filter(w, v1ToAPIFilter), nil + return listV1, err } } -func namespaceListFunc(c *kubernetes.Clientset, s *labels.Selector) func(api.ListOptions) (runtime.Object, error) { - return func(opts api.ListOptions) (runtime.Object, error) { +func serviceWatchFunc(c *kubernetes.Clientset, ns string, s *labels.Selector) func(options meta.ListOptions) (watch.Interface, error) { + return func(options meta.ListOptions) (watch.Interface, error) { if s != nil { - opts.LabelSelector = *s + options.LabelSelector = (*s).String() } - listV1, err := c.Core().Namespaces().List(opts) + w, err := c.Services(ns).Watch(options) if err != nil { return nil, err } - var listAPI api.NamespaceList - err = v1.Convert_v1_NamespaceList_To_api_NamespaceList(listV1, &listAPI, nil) - if err != nil { - return nil, err - } - return &listAPI, err + return w, nil } } -func namespaceWatchFunc(c *kubernetes.Clientset, s *labels.Selector) func(options api.ListOptions) (watch.Interface, error) { - return func(options api.ListOptions) (watch.Interface, error) { +func podWatchFunc(c *kubernetes.Clientset, ns string, s *labels.Selector) func(options meta.ListOptions) (watch.Interface, error) { + return func(options meta.ListOptions) (watch.Interface, error) { if s != nil { - options.LabelSelector = *s + options.LabelSelector = (*s).String() } - w, err := c.Core().Namespaces().Watch(options) + w, err := c.Pods(ns).Watch(options) if err != nil { return nil, err } - return watch.Filter(w, v1ToAPIFilter), nil + return w, nil } } -func endpointsListFunc(c *kubernetes.Clientset, ns string, s *labels.Selector) func(api.ListOptions) (runtime.Object, error) { - return func(opts api.ListOptions) (runtime.Object, error) { +func endpointsListFunc(c *kubernetes.Clientset, ns string, s *labels.Selector) func(meta.ListOptions) (runtime.Object, error) { + return func(opts meta.ListOptions) (runtime.Object, error) { if s != nil { - opts.LabelSelector = *s - } - listV1, err := c.Core().Endpoints(ns).List(opts) - - if err != nil { - return nil, err + opts.LabelSelector = (*s).String() } - var listAPI api.EndpointsList - err = v1.Convert_v1_EndpointsList_To_api_EndpointsList(listV1, &listAPI, nil) + listV1, err := c.Endpoints(ns).List(opts) if err != nil { return nil, err } - return &listAPI, err + return listV1, err } } -func endpointsWatchFunc(c *kubernetes.Clientset, ns string, s *labels.Selector) func(options api.ListOptions) (watch.Interface, error) { - return func(options api.ListOptions) (watch.Interface, error) { +func endpointsWatchFunc(c *kubernetes.Clientset, ns string, s *labels.Selector) func(options meta.ListOptions) (watch.Interface, error) { + return func(options meta.ListOptions) (watch.Interface, error) { if s != nil { - options.LabelSelector = *s + options.LabelSelector = (*s).String() } - w, err := c.Core().Endpoints(ns).Watch(options) + w, err := c.Endpoints(ns).Watch(options) if err != nil { return nil, err } - return watch.Filter(w, v1ToAPIFilter), nil + return w, nil } } func (dns *dnsControl) controllersInSync() bool { hs := dns.svcController.HasSynced() && - dns.nsController.HasSynced() && dns.epController.HasSynced() if dns.podController != nil { @@ -341,7 +218,6 @@ func (dns *dnsControl) Stop() error { // Run starts the controller. func (dns *dnsControl) Run() { go dns.svcController.Run(dns.stopCh) - go dns.nsController.Run(dns.stopCh) go dns.epController.Run(dns.stopCh) if dns.podController != nil { go dns.podController.Run(dns.stopCh) @@ -349,54 +225,52 @@ func (dns *dnsControl) Run() { <-dns.stopCh } -func (dns *dnsControl) NamespaceList() *api.NamespaceList { - nsList, err := dns.nsLister.List() - if err != nil { - return &api.NamespaceList{} - } - - return &nsList -} - -func (dns *dnsControl) ServiceList() []*api.Service { - svcs, err := dns.svcLister.List(labels.Everything()) - if err != nil { - return []*api.Service{} +func (dns *dnsControl) ServiceList() (svcs []*api.Service) { + os := dns.svcLister.List() + for _, o := range os { + s, ok := o.(*api.Service) + if !ok { + continue + } + svcs = append(svcs, s) } - return svcs } -func (dns *dnsControl) PodIndex(ip string) []interface{} { - if dns.podLister.Indexer == nil { +func (dns *dnsControl) PodIndex(ip string) (pods []*api.Pod) { + if dns.podLister == nil { return nil } - pods, err := dns.podLister.Indexer.ByIndex(podIPIndex, ip) + os, err := dns.podLister.ByIndex(podIPIndex, ip) if err != nil { return nil } - + for _, o := range os { + p, ok := o.(*api.Pod) + if !ok { + continue + } + pods = append(pods, p) + } return pods } -func (dns *dnsControl) EndpointsList() api.EndpointsList { - epl, err := dns.epLister.List() - if err != nil { - return api.EndpointsList{} +func (dns *dnsControl) EndpointsList() (eps []*api.Endpoints) { + os := dns.epLister.List() + for _, o := range os { + ep, ok := o.(*api.Endpoints) + if !ok { + continue + } + eps = append(eps, ep) } - - return epl + return eps } -func (dns *dnsControl) GetNodeByName(name string) (api.Node, error) { - v1node, err := dns.client.Core().Nodes().Get(name) - if err != nil { - return api.Node{}, err - } - var apinode api.Node - err = v1.Convert_v1_Node_To_api_Node(v1node, &apinode, nil) +func (dns *dnsControl) GetNodeByName(name string) (*api.Node, error) { + v1node, err := dns.client.Nodes().Get(name, meta.GetOptions{}) if err != nil { - return api.Node{}, err + return &api.Node{}, err } - return apinode, nil + return v1node, nil } |