aboutsummaryrefslogtreecommitdiff
path: root/middleware/kubernetes/kubernetes.go
diff options
context:
space:
mode:
Diffstat (limited to 'middleware/kubernetes/kubernetes.go')
-rw-r--r--middleware/kubernetes/kubernetes.go173
1 files changed, 71 insertions, 102 deletions
diff --git a/middleware/kubernetes/kubernetes.go b/middleware/kubernetes/kubernetes.go
index 6288cb394..a8d33bb7e 100644
--- a/middleware/kubernetes/kubernetes.go
+++ b/middleware/kubernetes/kubernetes.go
@@ -4,33 +4,70 @@ package kubernetes
import (
"errors"
"log"
+ "strings"
"time"
"github.com/miekg/coredns/middleware"
- k8sc "github.com/miekg/coredns/middleware/kubernetes/k8sclient"
"github.com/miekg/coredns/middleware/kubernetes/msg"
"github.com/miekg/coredns/middleware/kubernetes/nametemplate"
"github.com/miekg/coredns/middleware/kubernetes/util"
"github.com/miekg/coredns/middleware/proxy"
"github.com/miekg/dns"
+ "k8s.io/kubernetes/pkg/api"
+ "k8s.io/kubernetes/pkg/client/unversioned"
+ "k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
+ clientcmdapi "k8s.io/kubernetes/pkg/client/unversioned/clientcmd/api"
+)
+
+const (
+ defaultResyncPeriod = 5 * time.Minute
)
type Kubernetes struct {
Next middleware.Handler
Zones []string
Proxy proxy.Proxy // Proxy for looking up names during the resolution process
- APIConn *k8sc.K8sConnector
+ APIEndpoint string
+ APIConn *dnsController
+ ResyncPeriod time.Duration
NameTemplate *nametemplate.NameTemplate
Namespaces []string
}
+func (g *Kubernetes) StartKubeCache() error {
+ // For a custom api server or running outside a k8s cluster
+ // set URL in env.KUBERNETES_MASTER
+ loadingRules := clientcmd.NewDefaultClientConfigLoadingRules()
+ overrides := &clientcmd.ConfigOverrides{}
+ if len(g.APIEndpoint) > 0 {
+ overrides.ClusterInfo = clientcmdapi.Cluster{Server: g.APIEndpoint}
+ }
+ clientConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loadingRules, overrides)
+ config, err := clientConfig.ClientConfig()
+ if err != nil {
+ log.Printf("[debug] error connecting to the client: %v", err)
+ return err
+ }
+ kubeClient, err := unversioned.New(config)
+
+ if err != nil {
+ log.Printf("[ERROR] Failed to create kubernetes notification controller: %v", err)
+ return err
+ }
+ g.APIConn = newdnsController(kubeClient, g.ResyncPeriod)
+
+ go g.APIConn.Run()
+
+ return err
+}
+
// getZoneForName returns the zone string that matches the name and a
// list of the DNS labels from name that are within the zone.
// For example, if "coredns.local" is a zone configured for the
// Kubernetes middleware, then getZoneForName("a.b.coredns.local")
// will return ("coredns.local", ["a", "b"]).
-func (g Kubernetes) getZoneForName(name string) (string, []string) {
+func (g *Kubernetes) getZoneForName(name string) (string, []string) {
var zone string
var serviceSegments []string
@@ -51,7 +88,14 @@ func (g Kubernetes) getZoneForName(name string) (string, []string) {
// If exact is true, it will lookup just
// this name. This is used when find matches when completing SRV lookups
// for instance.
-func (g Kubernetes) Records(name string, exact bool) ([]msg.Service, error) {
+func (g *Kubernetes) Records(name string, exact bool) ([]msg.Service, error) {
+ // TODO: refector this.
+ // Right now GetNamespaceFromSegmentArray do not supports PRE queries
+ if strings.HasSuffix(name, arpaSuffix) {
+ ip, _ := extractIP(name)
+ records := g.getServiceRecordForIP(ip, name)
+ return records, nil
+ }
var (
serviceName string
namespace string
@@ -99,6 +143,7 @@ func (g Kubernetes) Records(name string, exact bool) ([]msg.Service, error) {
return nil, nil
}
+ log.Printf("before g.Get(namespace, nsWildcard, serviceName, serviceWildcard): %v %v %v %v", namespace, nsWildcard, serviceName, serviceWildcard)
k8sItems, err := g.Get(namespace, nsWildcard, serviceName, serviceWildcard)
log.Printf("[debug] k8s items: %v\n", k8sItems)
if err != nil {
@@ -115,7 +160,7 @@ func (g Kubernetes) Records(name string, exact bool) ([]msg.Service, error) {
}
// TODO: assemble name from parts found in k8s data based on name template rather than reusing query string
-func (g Kubernetes) getRecordsForServiceItems(serviceItems []k8sc.ServiceItem, values nametemplate.NameValues) []msg.Service {
+func (g *Kubernetes) getRecordsForServiceItems(serviceItems []api.Service, values nametemplate.NameValues) []msg.Service {
var records []msg.Service
for _, item := range serviceItems {
@@ -131,7 +176,7 @@ func (g Kubernetes) getRecordsForServiceItems(serviceItems []k8sc.ServiceItem, v
// Create records for each exposed port...
for _, p := range item.Spec.Ports {
log.Printf("[debug] port: %v\n", p.Port)
- s := msg.Service{Host: clusterIP, Port: p.Port}
+ s := msg.Service{Host: clusterIP, Port: int(p.Port)}
records = append(records, s)
}
}
@@ -141,22 +186,24 @@ func (g Kubernetes) getRecordsForServiceItems(serviceItems []k8sc.ServiceItem, v
}
// Get performs the call to the Kubernetes http API.
-func (g Kubernetes) Get(namespace string, nsWildcard bool, servicename string, serviceWildcard bool) ([]k8sc.ServiceItem, error) {
- serviceList, err := g.APIConn.GetServiceList()
+func (g *Kubernetes) Get(namespace string, nsWildcard bool, servicename string, serviceWildcard bool) ([]api.Service, error) {
+ serviceList := g.APIConn.GetServiceList()
+ /* TODO: Remove?
if err != nil {
log.Printf("[ERROR] Getting service list produced error: %v", err)
return nil, err
}
+ */
- var resultItems []k8sc.ServiceItem
+ var resultItems []api.Service
for _, item := range serviceList.Items {
- if symbolMatches(namespace, item.Metadata.Namespace, nsWildcard) && symbolMatches(servicename, item.Metadata.Name, serviceWildcard) {
+ if symbolMatches(namespace, item.Namespace, nsWildcard) && symbolMatches(servicename, item.Name, serviceWildcard) {
// If namespace has a wildcard, filter results against Corefile namespace list.
// (Namespaces without a wildcard were filtered before the call to this function.)
- if nsWildcard && (len(g.Namespaces) > 0) && (!util.StringInSlice(item.Metadata.Namespace, g.Namespaces)) {
- log.Printf("[debug] Namespace '%v' is not published by Corefile\n", item.Metadata.Namespace)
+ if nsWildcard && (len(g.Namespaces) > 0) && (!util.StringInSlice(item.Namespace, g.Namespaces)) {
+ log.Printf("[debug] Namespace '%v' is not published by Corefile\n", item.Namespace)
continue
}
resultItems = append(resultItems, item)
@@ -179,102 +226,24 @@ func symbolMatches(queryString string, candidateString string, wildcard bool) bo
return result
}
-// TODO: Remove these unused functions. One is related to Ttl calculation
-// Implement Ttl and priority calculation based on service count before
-// removing this code.
-/*
-// splitDNSName separates the name into DNS segments and reverses the segments.
-func (g Kubernetes) splitDNSName(name string) []string {
- l := dns.SplitDomainName(name)
-
- for i, j := 0, len(l)-1; i < j; i, j = i+1, j-1 {
- l[i], l[j] = l[j], l[i]
- }
-
- return l
+// kubernetesNameError checks if the error is ErrorCodeKeyNotFound from kubernetes.
+func isKubernetesNameError(err error) bool {
+ return false
}
-*/
-// skydns/local/skydns/east/staging/web
-// skydns/local/skydns/west/production/web
-//
-// skydns/local/skydns/*/*/web
-// skydns/local/skydns/*/web
-/*
-// loopNodes recursively loops through the nodes and returns all the values. The nodes' keyname
-// will be match against any wildcards when star is true.
-func (g Kubernetes) loopNodes(ns []*etcdc.Node, nameParts []string, star bool, bx map[msg.Service]bool) (sx []msg.Service, err error) {
- if bx == nil {
- bx = make(map[msg.Service]bool)
- }
-Nodes:
- for _, n := range ns {
- if n.Dir {
- nodes, err := g.loopNodes(n.Nodes, nameParts, star, bx)
- if err != nil {
- return nil, err
- }
- sx = append(sx, nodes...)
- continue
- }
- if star {
- keyParts := strings.Split(n.Key, "/")
- for i, n := range nameParts {
- if i > len(keyParts)-1 {
- // name is longer than key
- continue Nodes
- }
- if n == "*" || n == "any" {
- continue
- }
- if keyParts[i] != n {
- continue Nodes
- }
- }
- }
- serv := new(msg.Service)
- if err := json.Unmarshal([]byte(n.Value), serv); err != nil {
- return nil, err
- }
- b := msg.Service{Host: serv.Host, Port: serv.Port, Priority: serv.Priority, Weight: serv.Weight, Text: serv.Text, Key: n.Key}
- if _, ok := bx[b]; ok {
- continue
- }
- bx[b] = true
- serv.Key = n.Key
- serv.Ttl = g.Ttl(n, serv)
- if serv.Priority == 0 {
- serv.Priority = priority
- }
- sx = append(sx, *serv)
+func (g *Kubernetes) getServiceRecordForIP(ip, name string) []msg.Service {
+ svcList, err := g.APIConn.svcLister.List()
+ if err != nil {
+ return nil
}
- return sx, nil
-}
-// Ttl returns the smaller of the kubernetes TTL and the service's
-// TTL. If neither of these are set (have a zero value), a default is used.
-func (g Kubernetes) Ttl(node *etcdc.Node, serv *msg.Service) uint32 {
- kubernetesTtl := uint32(node.TTL)
-
- if kubernetesTtl == 0 && serv.Ttl == 0 {
- return ttl
- }
- if kubernetesTtl == 0 {
- return serv.Ttl
- }
- if serv.Ttl == 0 {
- return kubernetesTtl
- }
- if kubernetesTtl < serv.Ttl {
- return kubernetesTtl
+ for _, service := range svcList.Items {
+ if service.Spec.ClusterIP == ip {
+ return []msg.Service{msg.Service{Host: ip}}
+ }
}
- return serv.Ttl
-}
-*/
-// kubernetesNameError checks if the error is ErrorCodeKeyNotFound from kubernetes.
-func isKubernetesNameError(err error) bool {
- return false
+ return nil
}
const (