diff options
Diffstat (limited to 'plugin/kubernetes/kubernetes.go')
-rw-r--r-- | plugin/kubernetes/kubernetes.go | 457 |
1 files changed, 457 insertions, 0 deletions
diff --git a/plugin/kubernetes/kubernetes.go b/plugin/kubernetes/kubernetes.go new file mode 100644 index 000000000..90fcd6182 --- /dev/null +++ b/plugin/kubernetes/kubernetes.go @@ -0,0 +1,457 @@ +// Package kubernetes provides the kubernetes backend. +package kubernetes + +import ( + "errors" + "fmt" + "net" + "strings" + "sync/atomic" + "time" + + "github.com/coredns/coredns/plugin" + "github.com/coredns/coredns/plugin/etcd/msg" + "github.com/coredns/coredns/plugin/pkg/dnsutil" + "github.com/coredns/coredns/plugin/pkg/healthcheck" + "github.com/coredns/coredns/plugin/proxy" + "github.com/coredns/coredns/request" + + "github.com/miekg/dns" + "k8s.io/client-go/1.5/kubernetes" + "k8s.io/client-go/1.5/pkg/api" + unversionedapi "k8s.io/client-go/1.5/pkg/api/unversioned" + "k8s.io/client-go/1.5/pkg/labels" + "k8s.io/client-go/1.5/rest" + "k8s.io/client-go/1.5/tools/clientcmd" + clientcmdapi "k8s.io/client-go/1.5/tools/clientcmd/api" +) + +// Kubernetes implements a plugin that connects to a Kubernetes cluster. +type Kubernetes struct { + Next plugin.Handler + Zones []string + Proxy proxy.Proxy // Proxy for looking up names during the resolution process + APIServerList []string + APIProxy *apiProxy + APICertAuth string + APIClientCert string + APIClientKey string + APIConn dnsController + Namespaces map[string]bool + podMode string + Fallthrough bool + ttl uint32 + + primaryZoneIndex int + interfaceAddrsFunc func() net.IP + autoPathSearch []string // Local search path from /etc/resolv.conf. Needed for autopath. +} + +// New returns a intialized Kubernetes. It default interfaceAddrFunc to return 127.0.0.1. All other +// values default to their zero value, primaryZoneIndex will thus point to the first zone. +func New(zones []string) *Kubernetes { + k := new(Kubernetes) + k.Zones = zones + k.Namespaces = make(map[string]bool) + k.interfaceAddrsFunc = func() net.IP { return net.ParseIP("127.0.0.1") } + k.podMode = podModeDisabled + k.Proxy = proxy.Proxy{} + k.ttl = defaultTTL + + return k +} + +const ( + // podModeDisabled is the default value where pod requests are ignored + podModeDisabled = "disabled" + // podModeVerified is where Pod requests are answered only if they exist + podModeVerified = "verified" + // podModeInsecure is where pod requests are answered without verfying they exist + podModeInsecure = "insecure" + // DNSSchemaVersion is the schema version: https://github.com/kubernetes/dns/blob/master/docs/specification.md + DNSSchemaVersion = "1.0.1" +) + +var ( + errNoItems = errors.New("no items found") + errNsNotExposed = errors.New("namespace is not exposed") + errInvalidRequest = errors.New("invalid query name") + errAPIBadPodType = errors.New("expected type *api.Pod") + errPodsDisabled = errors.New("pod records disabled") +) + +// Services implements the ServiceBackend interface. +func (k *Kubernetes) Services(state request.Request, exact bool, opt plugin.Options) (svcs []msg.Service, err error) { + + // We're looking again at types, which we've already done in ServeDNS, but there are some types k8s just can't answer. + switch state.QType() { + + case dns.TypeTXT: + // 1 label + zone, label must be "dns-version". + t, _ := dnsutil.TrimZone(state.Name(), state.Zone) + + segs := dns.SplitDomainName(t) + if len(segs) != 1 { + return nil, fmt.Errorf("kubernetes: TXT query can only be for dns-version: %s", state.QName()) + } + if segs[0] != "dns-version" { + return nil, nil + } + svc := msg.Service{Text: DNSSchemaVersion, TTL: 28800, Key: msg.Path(state.QName(), "coredns")} + return []msg.Service{svc}, nil + + case dns.TypeNS: + // We can only get here if the qname equal the zone, see ServeDNS in handler.go. + ns := k.nsAddr() + svc := msg.Service{Host: ns.A.String(), Key: msg.Path(state.QName(), "coredns")} + return []msg.Service{svc}, nil + } + + if state.QType() == dns.TypeA && isDefaultNS(state.Name(), state.Zone) { + // If this is an A request for "ns.dns", respond with a "fake" record for coredns. + // SOA records always use this hardcoded name + ns := k.nsAddr() + svc := msg.Service{Host: ns.A.String(), Key: msg.Path(state.QName(), "coredns")} + return []msg.Service{svc}, nil + } + + s, e := k.Records(state, false) + + // SRV for external services is not yet implemented, so remove those records. + + if state.QType() != dns.TypeSRV { + return s, e + } + + internal := []msg.Service{} + for _, svc := range s { + if t, _ := svc.HostType(); t != dns.TypeCNAME { + internal = append(internal, svc) + } + } + + return internal, e +} + +// primaryZone will return the first non-reverse zone being handled by this plugin +func (k *Kubernetes) primaryZone() string { return k.Zones[k.primaryZoneIndex] } + +// Lookup implements the ServiceBackend interface. +func (k *Kubernetes) Lookup(state request.Request, name string, typ uint16) (*dns.Msg, error) { + return k.Proxy.Lookup(state, name, typ) +} + +// IsNameError implements the ServiceBackend interface. +func (k *Kubernetes) IsNameError(err error) bool { + return err == errNoItems || err == errNsNotExposed || err == errInvalidRequest +} + +func (k *Kubernetes) getClientConfig() (*rest.Config, error) { + loadingRules := &clientcmd.ClientConfigLoadingRules{} + overrides := &clientcmd.ConfigOverrides{} + clusterinfo := clientcmdapi.Cluster{} + authinfo := clientcmdapi.AuthInfo{} + + if len(k.APIServerList) == 0 { + cc, err := rest.InClusterConfig() + if err != nil { + return nil, err + } + return cc, err + } + + endpoint := k.APIServerList[0] + if len(k.APIServerList) > 1 { + // Use a random port for api proxy, will get the value later through listener.Addr() + listener, err := net.Listen("tcp", "127.0.0.1:0") + if err != nil { + return nil, fmt.Errorf("failed to create kubernetes api proxy: %v", err) + } + k.APIProxy = &apiProxy{ + listener: listener, + handler: proxyHandler{ + HealthCheck: healthcheck.HealthCheck{ + FailTimeout: 3 * time.Second, + MaxFails: 1, + Future: 10 * time.Second, + Path: "/", + Interval: 5 * time.Second, + }, + }, + } + k.APIProxy.handler.Hosts = make([]*healthcheck.UpstreamHost, len(k.APIServerList)) + for i, entry := range k.APIServerList { + + uh := &healthcheck.UpstreamHost{ + Name: strings.TrimPrefix(entry, "http://"), + + CheckDown: func(upstream *proxyHandler) healthcheck.UpstreamHostDownFunc { + return func(uh *healthcheck.UpstreamHost) bool { + + down := false + + uh.CheckMu.Lock() + until := uh.OkUntil + uh.CheckMu.Unlock() + + if !until.IsZero() && time.Now().After(until) { + down = true + } + + fails := atomic.LoadInt32(&uh.Fails) + if fails >= upstream.MaxFails && upstream.MaxFails != 0 { + down = true + } + return down + } + }(&k.APIProxy.handler), + } + + k.APIProxy.handler.Hosts[i] = uh + } + k.APIProxy.Handler = &k.APIProxy.handler + + // Find the random port used for api proxy + endpoint = fmt.Sprintf("http://%s", listener.Addr()) + } + clusterinfo.Server = endpoint + + if len(k.APICertAuth) > 0 { + clusterinfo.CertificateAuthority = k.APICertAuth + } + if len(k.APIClientCert) > 0 { + authinfo.ClientCertificate = k.APIClientCert + } + if len(k.APIClientKey) > 0 { + authinfo.ClientKey = k.APIClientKey + } + + overrides.ClusterInfo = clusterinfo + overrides.AuthInfo = authinfo + clientConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(loadingRules, overrides) + + return clientConfig.ClientConfig() +} + +// initKubeCache initializes a new Kubernetes cache. +func (k *Kubernetes) initKubeCache(opts dnsControlOpts) (err error) { + + config, err := k.getClientConfig() + if err != nil { + return err + } + + kubeClient, err := kubernetes.NewForConfig(config) + if err != nil { + return fmt.Errorf("failed to create kubernetes notification controller: %q", err) + } + + if opts.labelSelector != nil { + var selector labels.Selector + selector, err = unversionedapi.LabelSelectorAsSelector(opts.labelSelector) + if err != nil { + return fmt.Errorf("unable to create Selector for LabelSelector '%s': %q", opts.labelSelector, err) + } + opts.selector = &selector + } + + opts.initPodCache = k.podMode == podModeVerified + + k.APIConn = newdnsController(kubeClient, opts) + + return err +} + +// Records looks up services in kubernetes. +func (k *Kubernetes) Records(state request.Request, exact bool) ([]msg.Service, error) { + r, e := parseRequest(state) + if e != nil { + return nil, e + } + + if !wildcard(r.namespace) && !k.namespaceExposed(r.namespace) { + return nil, errNsNotExposed + } + + if r.podOrSvc == Pod { + pods, err := k.findPods(r, state.Zone) + return pods, err + } + + services, err := k.findServices(r, state.Zone) + return services, err +} + +func endpointHostname(addr api.EndpointAddress) string { + if addr.Hostname != "" { + return strings.ToLower(addr.Hostname) + } + if strings.Contains(addr.IP, ".") { + return strings.Replace(addr.IP, ".", "-", -1) + } + if strings.Contains(addr.IP, ":") { + return strings.ToLower(strings.Replace(addr.IP, ":", "-", -1)) + } + return "" +} + +func (k *Kubernetes) findPods(r recordRequest, zone string) (pods []msg.Service, err error) { + if k.podMode == podModeDisabled { + return nil, errPodsDisabled + } + + namespace := r.namespace + podname := r.service + zonePath := msg.Path(zone, "coredns") + ip := "" + err = errNoItems + + if strings.Count(podname, "-") == 3 && !strings.Contains(podname, "--") { + ip = strings.Replace(podname, "-", ".", -1) + } else { + ip = strings.Replace(podname, "-", ":", -1) + } + + if k.podMode == podModeInsecure { + return []msg.Service{{Key: strings.Join([]string{zonePath, Pod, namespace, podname}, "/"), Host: ip}}, nil + } + + // PodModeVerified + objList := k.APIConn.PodIndex(ip) + + for _, o := range objList { + p, ok := o.(*api.Pod) + if !ok { + return nil, errAPIBadPodType + } + // If namespace has a wildcard, filter results against Corefile namespace list. + if wildcard(namespace) && !k.namespaceExposed(p.Namespace) { + continue + } + // check for matching ip and namespace + if ip == p.Status.PodIP && match(namespace, p.Namespace) { + s := msg.Service{Key: strings.Join([]string{zonePath, Pod, namespace, podname}, "/"), Host: ip} + pods = append(pods, s) + + err = nil + } + } + return pods, err +} + +// findServices returns the services matching r from the cache. +func (k *Kubernetes) findServices(r recordRequest, zone string) (services []msg.Service, err error) { + serviceList := k.APIConn.ServiceList() + zonePath := msg.Path(zone, "coredns") + err = errNoItems // Set to errNoItems to signal really nothing found, gets reset when name is matched. + + for _, svc := range serviceList { + if !(match(r.namespace, svc.Namespace) && match(r.service, svc.Name)) { + continue + } + + // If namespace has a wildcard, filter results against Corefile namespace list. + // (Namespaces without a wildcard were filtered before the call to this function.) + if wildcard(r.namespace) && !k.namespaceExposed(svc.Namespace) { + continue + } + + // Endpoint query or headless service + if svc.Spec.ClusterIP == api.ClusterIPNone || r.endpoint != "" { + endpointsList := k.APIConn.EndpointsList() + for _, ep := range endpointsList.Items { + if ep.ObjectMeta.Name != svc.Name || ep.ObjectMeta.Namespace != svc.Namespace { + continue + } + + for _, eps := range ep.Subsets { + for _, addr := range eps.Addresses { + + // See comments in parse.go parseRequest about the endpoint handling. + + if r.endpoint != "" { + if !match(r.endpoint, endpointHostname(addr)) { + continue + } + } + + for _, p := range eps.Ports { + if !(match(r.port, p.Name) && match(r.protocol, string(p.Protocol))) { + continue + } + s := msg.Service{Host: addr.IP, Port: int(p.Port), TTL: k.ttl} + s.Key = strings.Join([]string{zonePath, Svc, svc.Namespace, svc.Name, endpointHostname(addr)}, "/") + + err = nil + + services = append(services, s) + } + } + } + } + continue + } + + // External service + if svc.Spec.ExternalName != "" { + s := msg.Service{Key: strings.Join([]string{zonePath, Svc, svc.Namespace, svc.Name}, "/"), Host: svc.Spec.ExternalName, TTL: k.ttl} + if t, _ := s.HostType(); t == dns.TypeCNAME { + s.Key = strings.Join([]string{zonePath, Svc, svc.Namespace, svc.Name}, "/") + services = append(services, s) + + err = nil + + continue + } + } + + // ClusterIP service + for _, p := range svc.Spec.Ports { + if !(match(r.port, p.Name) && match(r.protocol, string(p.Protocol))) { + continue + } + + err = nil + + s := msg.Service{Host: svc.Spec.ClusterIP, Port: int(p.Port), TTL: k.ttl} + s.Key = strings.Join([]string{zonePath, Svc, svc.Namespace, svc.Name}, "/") + + services = append(services, s) + } + } + return services, err +} + +// match checks if a and b are equal taking wildcards into account. +func match(a, b string) bool { + if wildcard(a) { + return true + } + if wildcard(b) { + return true + } + return strings.EqualFold(a, b) +} + +// wildcard checks whether s contains a wildcard value defined as "*" or "any". +func wildcard(s string) bool { + return s == "*" || s == "any" +} + +// namespaceExposed returns true when the namespace is exposed. +func (k *Kubernetes) namespaceExposed(namespace string) bool { + _, ok := k.Namespaces[namespace] + if len(k.Namespaces) > 0 && !ok { + return false + } + return true +} + +const ( + // Svc is the DNS schema for kubernetes services + Svc = "svc" + // Pod is the DNS schema for kubernetes pods + Pod = "pod" + // defaultTTL to apply to all answers. + defaultTTL = 5 +) |