diff options
Diffstat (limited to 'middleware/kubernetes/kubernetes.go')
-rw-r--r-- | middleware/kubernetes/kubernetes.go | 173 |
1 files changed, 71 insertions, 102 deletions
diff --git a/middleware/kubernetes/kubernetes.go b/middleware/kubernetes/kubernetes.go index 6288cb394..a8d33bb7e 100644 --- a/middleware/kubernetes/kubernetes.go +++ b/middleware/kubernetes/kubernetes.go @@ -4,33 +4,70 @@ package kubernetes import ( "errors" "log" + "strings" "time" "github.com/miekg/coredns/middleware" - k8sc "github.com/miekg/coredns/middleware/kubernetes/k8sclient" "github.com/miekg/coredns/middleware/kubernetes/msg" "github.com/miekg/coredns/middleware/kubernetes/nametemplate" "github.com/miekg/coredns/middleware/kubernetes/util" "github.com/miekg/coredns/middleware/proxy" "github.com/miekg/dns" + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/client/unversioned/clientcmd" + clientcmdapi "k8s.io/kubernetes/pkg/client/unversioned/clientcmd/api" +) + +const ( + defaultResyncPeriod = 5 * time.Minute ) type Kubernetes struct { Next middleware.Handler Zones []string Proxy proxy.Proxy // Proxy for looking up names during the resolution process - APIConn *k8sc.K8sConnector + APIEndpoint string + APIConn *dnsController + ResyncPeriod time.Duration NameTemplate *nametemplate.NameTemplate Namespaces []string } +func (g *Kubernetes) StartKubeCache() error { + // For a custom api server or running outside a k8s cluster + // set URL in env.KUBERNETES_MASTER + loadingRules := clientcmd.NewDefaultClientConfigLoadingRules() + overrides := &clientcmd.ConfigOverrides{} + if len(g.APIEndpoint) > 0 { + overrides.ClusterInfo = clientcmdapi.Cluster{Server: g.APIEndpoint} + } + clientConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loadingRules, overrides) + config, err := clientConfig.ClientConfig() + if err != nil { + log.Printf("[debug] error connecting to the client: %v", err) + return err + } + kubeClient, err := unversioned.New(config) + + if err != nil { + log.Printf("[ERROR] Failed to create kubernetes notification controller: %v", err) + return err + } + g.APIConn = newdnsController(kubeClient, g.ResyncPeriod) + + go g.APIConn.Run() + + return err +} + // getZoneForName returns the zone string that matches the name and a // list of the DNS labels from name that are within the zone. // For example, if "coredns.local" is a zone configured for the // Kubernetes middleware, then getZoneForName("a.b.coredns.local") // will return ("coredns.local", ["a", "b"]). -func (g Kubernetes) getZoneForName(name string) (string, []string) { +func (g *Kubernetes) getZoneForName(name string) (string, []string) { var zone string var serviceSegments []string @@ -51,7 +88,14 @@ func (g Kubernetes) getZoneForName(name string) (string, []string) { // If exact is true, it will lookup just // this name. This is used when find matches when completing SRV lookups // for instance. -func (g Kubernetes) Records(name string, exact bool) ([]msg.Service, error) { +func (g *Kubernetes) Records(name string, exact bool) ([]msg.Service, error) { + // TODO: refector this. + // Right now GetNamespaceFromSegmentArray do not supports PRE queries + if strings.HasSuffix(name, arpaSuffix) { + ip, _ := extractIP(name) + records := g.getServiceRecordForIP(ip, name) + return records, nil + } var ( serviceName string namespace string @@ -99,6 +143,7 @@ func (g Kubernetes) Records(name string, exact bool) ([]msg.Service, error) { return nil, nil } + log.Printf("before g.Get(namespace, nsWildcard, serviceName, serviceWildcard): %v %v %v %v", namespace, nsWildcard, serviceName, serviceWildcard) k8sItems, err := g.Get(namespace, nsWildcard, serviceName, serviceWildcard) log.Printf("[debug] k8s items: %v\n", k8sItems) if err != nil { @@ -115,7 +160,7 @@ func (g Kubernetes) Records(name string, exact bool) ([]msg.Service, error) { } // TODO: assemble name from parts found in k8s data based on name template rather than reusing query string -func (g Kubernetes) getRecordsForServiceItems(serviceItems []k8sc.ServiceItem, values nametemplate.NameValues) []msg.Service { +func (g *Kubernetes) getRecordsForServiceItems(serviceItems []api.Service, values nametemplate.NameValues) []msg.Service { var records []msg.Service for _, item := range serviceItems { @@ -131,7 +176,7 @@ func (g Kubernetes) getRecordsForServiceItems(serviceItems []k8sc.ServiceItem, v // Create records for each exposed port... for _, p := range item.Spec.Ports { log.Printf("[debug] port: %v\n", p.Port) - s := msg.Service{Host: clusterIP, Port: p.Port} + s := msg.Service{Host: clusterIP, Port: int(p.Port)} records = append(records, s) } } @@ -141,22 +186,24 @@ func (g Kubernetes) getRecordsForServiceItems(serviceItems []k8sc.ServiceItem, v } // Get performs the call to the Kubernetes http API. -func (g Kubernetes) Get(namespace string, nsWildcard bool, servicename string, serviceWildcard bool) ([]k8sc.ServiceItem, error) { - serviceList, err := g.APIConn.GetServiceList() +func (g *Kubernetes) Get(namespace string, nsWildcard bool, servicename string, serviceWildcard bool) ([]api.Service, error) { + serviceList := g.APIConn.GetServiceList() + /* TODO: Remove? if err != nil { log.Printf("[ERROR] Getting service list produced error: %v", err) return nil, err } + */ - var resultItems []k8sc.ServiceItem + var resultItems []api.Service for _, item := range serviceList.Items { - if symbolMatches(namespace, item.Metadata.Namespace, nsWildcard) && symbolMatches(servicename, item.Metadata.Name, serviceWildcard) { + if symbolMatches(namespace, item.Namespace, nsWildcard) && symbolMatches(servicename, item.Name, serviceWildcard) { // If namespace has a wildcard, filter results against Corefile namespace list. // (Namespaces without a wildcard were filtered before the call to this function.) - if nsWildcard && (len(g.Namespaces) > 0) && (!util.StringInSlice(item.Metadata.Namespace, g.Namespaces)) { - log.Printf("[debug] Namespace '%v' is not published by Corefile\n", item.Metadata.Namespace) + if nsWildcard && (len(g.Namespaces) > 0) && (!util.StringInSlice(item.Namespace, g.Namespaces)) { + log.Printf("[debug] Namespace '%v' is not published by Corefile\n", item.Namespace) continue } resultItems = append(resultItems, item) @@ -179,102 +226,24 @@ func symbolMatches(queryString string, candidateString string, wildcard bool) bo return result } -// TODO: Remove these unused functions. One is related to Ttl calculation -// Implement Ttl and priority calculation based on service count before -// removing this code. -/* -// splitDNSName separates the name into DNS segments and reverses the segments. -func (g Kubernetes) splitDNSName(name string) []string { - l := dns.SplitDomainName(name) - - for i, j := 0, len(l)-1; i < j; i, j = i+1, j-1 { - l[i], l[j] = l[j], l[i] - } - - return l +// kubernetesNameError checks if the error is ErrorCodeKeyNotFound from kubernetes. +func isKubernetesNameError(err error) bool { + return false } -*/ -// skydns/local/skydns/east/staging/web -// skydns/local/skydns/west/production/web -// -// skydns/local/skydns/*/*/web -// skydns/local/skydns/*/web -/* -// loopNodes recursively loops through the nodes and returns all the values. The nodes' keyname -// will be match against any wildcards when star is true. -func (g Kubernetes) loopNodes(ns []*etcdc.Node, nameParts []string, star bool, bx map[msg.Service]bool) (sx []msg.Service, err error) { - if bx == nil { - bx = make(map[msg.Service]bool) - } -Nodes: - for _, n := range ns { - if n.Dir { - nodes, err := g.loopNodes(n.Nodes, nameParts, star, bx) - if err != nil { - return nil, err - } - sx = append(sx, nodes...) - continue - } - if star { - keyParts := strings.Split(n.Key, "/") - for i, n := range nameParts { - if i > len(keyParts)-1 { - // name is longer than key - continue Nodes - } - if n == "*" || n == "any" { - continue - } - if keyParts[i] != n { - continue Nodes - } - } - } - serv := new(msg.Service) - if err := json.Unmarshal([]byte(n.Value), serv); err != nil { - return nil, err - } - b := msg.Service{Host: serv.Host, Port: serv.Port, Priority: serv.Priority, Weight: serv.Weight, Text: serv.Text, Key: n.Key} - if _, ok := bx[b]; ok { - continue - } - bx[b] = true - serv.Key = n.Key - serv.Ttl = g.Ttl(n, serv) - if serv.Priority == 0 { - serv.Priority = priority - } - sx = append(sx, *serv) +func (g *Kubernetes) getServiceRecordForIP(ip, name string) []msg.Service { + svcList, err := g.APIConn.svcLister.List() + if err != nil { + return nil } - return sx, nil -} -// Ttl returns the smaller of the kubernetes TTL and the service's -// TTL. If neither of these are set (have a zero value), a default is used. -func (g Kubernetes) Ttl(node *etcdc.Node, serv *msg.Service) uint32 { - kubernetesTtl := uint32(node.TTL) - - if kubernetesTtl == 0 && serv.Ttl == 0 { - return ttl - } - if kubernetesTtl == 0 { - return serv.Ttl - } - if serv.Ttl == 0 { - return kubernetesTtl - } - if kubernetesTtl < serv.Ttl { - return kubernetesTtl + for _, service := range svcList.Items { + if service.Spec.ClusterIP == ip { + return []msg.Service{msg.Service{Host: ip}} + } } - return serv.Ttl -} -*/ -// kubernetesNameError checks if the error is ErrorCodeKeyNotFound from kubernetes. -func isKubernetesNameError(err error) bool { - return false + return nil } const ( |