diff options
-rw-r--r-- | middleware/kubernetes/kubernetes.go | 253 | ||||
-rw-r--r-- | test/kubernetes_test.go | 55 |
2 files changed, 231 insertions, 77 deletions
diff --git a/middleware/kubernetes/kubernetes.go b/middleware/kubernetes/kubernetes.go index f673fe472..68086702e 100644 --- a/middleware/kubernetes/kubernetes.go +++ b/middleware/kubernetes/kubernetes.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "log" - "strconv" "strings" "time" @@ -45,11 +44,28 @@ type Kubernetes struct { Selector *labels.Selector } +type endpoint struct { + addr api.EndpointAddress + port api.EndpointPort +} + +type service struct { + name string + namespace string + addr string + ports []api.ServicePort + endpoints []endpoint +} + var errNoItems = errors.New("no items found") var errNsNotExposed = errors.New("namespace is not exposed") +var errInvalidRequest = errors.New("invalid query name") // Services implements the ServiceBackend interface. func (k *Kubernetes) Services(state request.Request, exact bool, opt middleware.Options) ([]msg.Service, []msg.Service, error) { + if state.Type() == "SRV" && !ValidSRV(state.Name()) { + return nil, nil, errInvalidRequest + } s, e := k.Records(state.Name(), exact) return s, nil, e // Haven't implemented debug queries yet. } @@ -77,7 +93,7 @@ func (k *Kubernetes) Lookup(state request.Request, name string, typ uint16) (*dn // IsNameError implements the ServiceBackend interface. func (k *Kubernetes) IsNameError(err error) bool { - return err == errNoItems || err == errNsNotExposed + return err == errNoItems || err == errNsNotExposed || err == errInvalidRequest } // Debug implements the ServiceBackend interface. @@ -171,6 +187,23 @@ func (k *Kubernetes) getZoneForName(name string) (string, []string) { return zone, serviceSegments } +// stripSRVPrefix separates out the port and protocol segments, if present +// If not present, assume all ports/protocols (e.g. wildcard) +func stripSRVPrefix(name []string) (string, string, []string) { + if name[0][0] == '_' && name[1][0] == '_' { + return name[0][1:], name[1][1:], name[2:] + } + // no srv prefix present + return "*", "*", name +} + +func stripEndpointName(name []string) (endpoint string, nameOut []string) { + if len(name) == 4 { + return strings.ToLower(name[0]), name[1:] + } + return "", name +} + // Records looks up services in kubernetes. If exact is true, it will lookup // just this name. This is used when find matches when completing SRV lookups // for instance. @@ -182,6 +215,8 @@ func (k *Kubernetes) Records(name string, exact bool) ([]msg.Service, error) { ) zone, serviceSegments := k.getZoneForName(name) + port, protocol, serviceSegments := stripSRVPrefix(serviceSegments) + endpointname, serviceSegments := stripEndpointName(serviceSegments) if len(serviceSegments) < 3 { return nil, errNoItems } @@ -205,16 +240,13 @@ func (k *Kubernetes) Records(name string, exact bool) ([]msg.Service, error) { serviceName = "*" } - nsWildcard := symbolContainsWildcard(namespace) - serviceWildcard := symbolContainsWildcard(serviceName) - // Abort if the namespace does not contain a wildcard, and namespace is not published per CoreFile // Case where namespace contains a wildcard is handled in Get(...) method. - if (!nsWildcard) && (len(k.Namespaces) > 0) && (!dnsstrings.StringInSlice(namespace, k.Namespaces)) { + if (!symbolContainsWildcard(namespace)) && (len(k.Namespaces) > 0) && (!dnsstrings.StringInSlice(namespace, k.Namespaces)) { return nil, errNsNotExposed } - k8sItems, err := k.Get(namespace, nsWildcard, serviceName, serviceWildcard, typeName) + k8sItems, err := k.Get(namespace, serviceName, endpointname, port, protocol, typeName) if err != nil { return nil, err } @@ -227,40 +259,41 @@ func (k *Kubernetes) Records(name string, exact bool) ([]msg.Service, error) { return records, nil } -func (k *Kubernetes) getRecordsForServiceItems(serviceItems []*api.Service, zone string) []msg.Service { +func endpointHostname(addr api.EndpointAddress) string { + if addr.Hostname != "" { + return strings.ToLower(addr.Hostname) + } + if strings.Contains(addr.IP, ".") { + return strings.Replace(addr.IP, ".", "-", -1) + } + if strings.Contains(addr.IP, ":") { + return strings.ToLower(strings.Replace(addr.IP, ":", "-", -1)) + } + return "" +} + +func (k *Kubernetes) getRecordsForServiceItems(serviceItems []service, zone string) []msg.Service { var records []msg.Service - for _, item := range serviceItems { - - key := k.NameTemplate.RecordNameFromNameValues(nametemplate.NameValues{TypeName: "svc", ServiceName: item.ObjectMeta.Name, Namespace: item.ObjectMeta.Namespace, Zone: zone}) - - key = strings.Replace(key, ".", "/", -1) - - clusterIP := item.Spec.ClusterIP - if clusterIP == api.ClusterIPNone { - // This is a headless service, create records for each pod - epList, _ := k.APIConn.epLister.List() - for _, ep := range epList.Items { - if ep.ObjectMeta.Name == item.ObjectMeta.Name && ep.ObjectMeta.Namespace == item.ObjectMeta.Namespace { - for _, eps := range ep.Subsets { - for i, port := range eps.Ports { - for j, addr := range eps.Addresses { - refid := strconv.Itoa(j*1024 + i) - s := msg.Service{ - Key: msg.Path(strings.ToLower(refid+"._"+port.Name+"._"+string(port.Protocol)+"."+key), "coredns"), - Host: addr.IP, Port: int(port.Port), - } - records = append(records, s) - } - } - } + for _, svc := range serviceItems { + + key := svc.name + "." + svc.namespace + ".svc." + zone + + if svc.addr == api.ClusterIPNone { + // This is a headless service, create records for each endpoint + for _, ep := range svc.endpoints { + ephostname := endpointHostname(ep.addr) + s := msg.Service{ + Key: msg.Path(strings.ToLower(ephostname+"."+key), "coredns"), + Host: ep.addr.IP, Port: int(ep.port.Port), } + records = append(records, s) + } } else { // Create records for each exposed port... - - for _, p := range item.Spec.Ports { - s := msg.Service{Key: msg.Path(strings.ToLower("_"+p.Name+"._"+string(p.Protocol)+"."+key), "coredns"), Host: clusterIP, Port: int(p.Port)} + for _, p := range svc.ports { + s := msg.Service{Key: msg.Path(strings.ToLower(key), "coredns"), Host: svc.addr, Port: int(p.Port)} records = append(records, s) } } @@ -270,36 +303,75 @@ func (k *Kubernetes) getRecordsForServiceItems(serviceItems []*api.Service, zone } // Get performs the call to the Kubernetes http API. -func (k *Kubernetes) Get(namespace string, nsWildcard bool, servicename string, serviceWildcard bool, typeName string) ([]*api.Service, error) { +func (k *Kubernetes) Get(namespace, servicename, endpointname, port, protocol, typeName string) (services []service, err error) { switch { case typeName == "pod": - return nil, fmt.Errorf("pod not implemented") + return nil, fmt.Errorf("%v not implemented", typeName) default: - return k.getServices(namespace, nsWildcard, servicename, serviceWildcard) + return k.getServices(namespace, servicename, endpointname, port, protocol) } } -func (k *Kubernetes) getServices(namespace string, nsWildcard bool, servicename string, serviceWildcard bool) ([]*api.Service, error) { +func (k *Kubernetes) getServices(namespace, servicename, endpointname, port, protocol string) ([]service, error) { serviceList := k.APIConn.ServiceList() - var resultItems []*api.Service + var resultItems []service - for _, item := range serviceList { - if symbolMatches(namespace, item.Namespace, nsWildcard) && symbolMatches(servicename, item.Name, serviceWildcard) { - // If namespace has a wildcard, filter results against Corefile namespace list. - // (Namespaces without a wildcard were filtered before the call to this function.) - if nsWildcard && (len(k.Namespaces) > 0) && (!dnsstrings.StringInSlice(item.Namespace, k.Namespaces)) { + nsWildcard := symbolContainsWildcard(namespace) + serviceWildcard := symbolContainsWildcard(servicename) + portWildcard := symbolContainsWildcard(port) + protocolWildcard := symbolContainsWildcard(protocol) + + for _, svc := range serviceList { + if !(symbolMatches(namespace, svc.Namespace, nsWildcard) && symbolMatches(servicename, svc.Name, serviceWildcard)) { + continue + } + // If namespace has a wildcard, filter results against Corefile namespace list. + // (Namespaces without a wildcard were filtered before the call to this function.) + if nsWildcard && (len(k.Namespaces) > 0) && (!dnsstrings.StringInSlice(svc.Namespace, k.Namespaces)) { + continue + } + s := service{name: svc.Name, namespace: svc.Namespace, addr: svc.Spec.ClusterIP} + if s.addr != api.ClusterIPNone { + for _, p := range svc.Spec.Ports { + if !(symbolMatches(port, strings.ToLower(p.Name), portWildcard) && symbolMatches(protocol, strings.ToLower(string(p.Protocol)), protocolWildcard)) { + continue + } + s.ports = append(s.ports, p) + } + resultItems = append(resultItems, s) + continue + } + // Headless service + endpointsList, err := k.APIConn.epLister.List() + if err != nil { + continue + } + for _, ep := range endpointsList.Items { + if ep.ObjectMeta.Name != svc.Name || ep.ObjectMeta.Namespace != svc.Namespace { continue } - - resultItems = append(resultItems, item) + for _, eps := range ep.Subsets { + for _, addr := range eps.Addresses { + for _, p := range eps.Ports { + ephostname := endpointHostname(addr) + if endpointname != "" && endpointname != ephostname { + continue + } + if !(symbolMatches(port, strings.ToLower(p.Name), portWildcard) && symbolMatches(protocol, strings.ToLower(string(p.Protocol)), protocolWildcard)) { + continue + } + s.endpoints = append(s.endpoints, endpoint{addr: addr, port: p}) + } + } + } } + resultItems = append(resultItems, s) } - return resultItems, nil } -func symbolMatches(queryString string, candidateString string, wildcard bool) bool { +func symbolMatches(queryString, candidateString string, wildcard bool) bool { result := false switch { case !wildcard: @@ -312,18 +384,41 @@ func symbolMatches(queryString string, candidateString string, wildcard bool) bo return result } +// getServiceRecordForIP: Gets a service record with a cluster ip matching the ip argument +// If a service cluster ip does not match, it checks all endpoints func (k *Kubernetes) getServiceRecordForIP(ip, name string) []msg.Service { + // First check services with cluster ips svcList, err := k.APIConn.svcLister.List(labels.Everything()) if err != nil { return nil } for _, service := range svcList { + if !dnsstrings.StringInSlice(service.Namespace, k.Namespaces) { + continue + } if service.Spec.ClusterIP == ip { - name := k.NameTemplate.RecordNameFromNameValues(nametemplate.NameValues{TypeName: "svc", ServiceName: service.ObjectMeta.Name, Namespace: service.ObjectMeta.Namespace, Zone: k.PrimaryZone()}) - return []msg.Service{msg.Service{Host: name}} + domain := service.Name + "." + service.Namespace + ".svc." + k.PrimaryZone() + return []msg.Service{msg.Service{Host: domain}} + } + } + // If no cluster ips match, search endpoints + epList, err := k.APIConn.epLister.List() + if err != nil { + return nil + } + for _, ep := range epList.Items { + if !dnsstrings.StringInSlice(ep.ObjectMeta.Namespace, k.Namespaces) { + continue + } + for _, eps := range ep.Subsets { + for _, addr := range eps.Addresses { + if addr.IP == ip { + domain := endpointHostname(addr) + "." + ep.ObjectMeta.Name + "." + ep.ObjectMeta.Namespace + ".svc." + k.PrimaryZone() + return []msg.Service{msg.Service{Host: domain}} + } + } } } - return nil } @@ -331,3 +426,57 @@ func (k *Kubernetes) getServiceRecordForIP(ip, name string) []msg.Service { func symbolContainsWildcard(symbol string) bool { return (strings.Contains(symbol, "*") || (symbol == "any")) } + +// ValidSRV parses a server record validating _port._proto. prefix labels. +// The valid schema is: +// * Fist two segments must start with an "_", +// * Second segment must be one of _tcp|_udp|_*|_any +func ValidSRV(name string) bool { + + // Does it start with a "_" ? + if len(name) > 0 && name[0] != '_' { + return false + } + + // First label + first, end := dns.NextLabel(name, 0) + if end { + return false + } + // Second label + off, end := dns.NextLabel(name, first) + if end { + return false + } + + // first:off has captured _tcp. or _udp. (if present) + second := name[first:off] + if len(second) > 0 && second[0] != '_' { + return false + } + + // A bit convoluted to avoid strings.ToLower + if len(second) == 5 { + // matches _tcp + if (second[1] == 't' || second[1] == 'T') && (second[2] == 'c' || second[2] == 'C') && + (second[3] == 'p' || second[3] == 'P') { + return true + } + // matches _udp + if (second[1] == 'u' || second[1] == 'U') && (second[2] == 'd' || second[2] == 'D') && + (second[3] == 'p' || second[3] == 'P') { + return true + } + // matches _any + if (second[1] == 'a' || second[1] == 'A') && (second[2] == 'n' || second[2] == 'N') && + (second[3] == 'y' || second[3] == 'Y') { + return true + } + } + // matches _* + if len(second) == 3 && second[1] == '*' { + return true + } + + return false +} diff --git a/test/kubernetes_test.go b/test/kubernetes_test.go index 215b8191c..1d5df37a3 100644 --- a/test/kubernetes_test.go +++ b/test/kubernetes_test.go @@ -105,7 +105,7 @@ var dnsTestCases = []test.Case{ }, //TODO: Fix below to all use test.SRV not test.A! { - Qname: "svc-1-a.test-1.svc.cluster.local.", Qtype: dns.TypeSRV, + Qname: "_*._*.svc-1-a.test-1.svc.cluster.local.", Qtype: dns.TypeSRV, Rcode: dns.RcodeSuccess, Answer: []dns.RR{ test.SRV("_http._tcp.svc-1-a.test-1.svc.cluster.local. 303 IN SRV 10 100 80 svc-1-a.test-1.svc.cluster.local."), @@ -113,12 +113,12 @@ var dnsTestCases = []test.Case{ }, }, { - Qname: "bogusservice.test-1.svc.cluster.local.", Qtype: dns.TypeSRV, + Qname: "_*._*.bogusservice.test-1.svc.cluster.local.", Qtype: dns.TypeSRV, Rcode: dns.RcodeNameError, Answer: []dns.RR{}, }, { - Qname: "svc-1-a.*.svc.cluster.local.", Qtype: dns.TypeSRV, + Qname: "_*._*.svc-1-a.*.svc.cluster.local.", Qtype: dns.TypeSRV, Rcode: dns.RcodeSuccess, Answer: []dns.RR{ test.SRV("_http._tcp.svc-1-a.test-1.svc.cluster.local. 303 IN SRV 10 100 80 svc-1-a.test-1.svc.cluster.local."), @@ -126,7 +126,7 @@ var dnsTestCases = []test.Case{ }, }, { - Qname: "svc-1-a.any.svc.cluster.local.", Qtype: dns.TypeSRV, + Qname: "_*._*.svc-1-a.any.svc.cluster.local.", Qtype: dns.TypeSRV, Rcode: dns.RcodeSuccess, Answer: []dns.RR{ test.SRV("_http._tcp.svc-1-a.test-1.svc.cluster.local. 303 IN SRV 10 100 80 svc-1-a.test-1.svc.cluster.local."), @@ -134,76 +134,81 @@ var dnsTestCases = []test.Case{ }, }, { - Qname: "bogusservice.*.svc.cluster.local.", Qtype: dns.TypeSRV, + Qname: "_*._*.bogusservice.*.svc.cluster.local.", Qtype: dns.TypeSRV, Rcode: dns.RcodeNameError, Answer: []dns.RR{}, }, { - Qname: "bogusservice.any.svc.cluster.local.", Qtype: dns.TypeSRV, + Qname: "_*._*.bogusservice.any.svc.cluster.local.", Qtype: dns.TypeSRV, Rcode: dns.RcodeNameError, Answer: []dns.RR{}, }, { - Qname: "*.test-1.svc.cluster.local.", Qtype: dns.TypeSRV, + Qname: "_c-port._*.*.test-1.svc.cluster.local.", Qtype: dns.TypeSRV, Rcode: dns.RcodeSuccess, Answer: []dns.RR{ - test.SRV("_http._tcp.svc-1-a.test-1.svc.cluster.local. 303 IN SRV 10 100 80 svc-1-a.test-1.svc.cluster.local."), - test.SRV("_https._tcp.svc-1-a.test-1.svc.cluster.local. 303 IN SRV 10 100 443 svc-1-a.test-1.svc.cluster.local."), - test.SRV("_http._tcp.svc-1-b.test-1.svc.cluster.local. 303 IN SRV 10 100 80 svc-1-b.test-1.svc.cluster.local."), test.SRV("_c-port._udp.svc-c.test-1.svc.cluster.local. 303 IN SRV 10 100 1234 svc-c.test-1.svc.cluster.local."), - test.SRV("_c-port._udp.headless-svc.test-1.svc.cluster.local. 303 IN SRV 10 100 1234 headless-svc.test-1.svc.cluster.local."), - test.SRV("_c-port._udp.headless-svc.test-1.svc.cluster.local. 303 IN SRV 10 100 1234 headless-svc.test-1.svc.cluster.local."), + test.SRV("_c-port._udp.headless-svc.test-1.svc.cluster.local. 303 IN SRV 10 100 1234 172-17-0-5.headless-svc.test-1.svc.cluster.local."), + test.SRV("_c-port._udp.headless-svc.test-1.svc.cluster.local. 303 IN SRV 10 100 1234 172-17-0-6.headless-svc.test-1.svc.cluster.local."), }, }, { - Qname: "any.test-1.svc.cluster.local.", Qtype: dns.TypeSRV, + Qname: "_*._tcp.any.test-1.svc.cluster.local.", Qtype: dns.TypeSRV, Rcode: dns.RcodeSuccess, Answer: []dns.RR{ test.SRV("_http._tcp.svc-1-a.test-1.svc.cluster.local. 303 IN SRV 10 100 80 svc-1-a.test-1.svc.cluster.local."), test.SRV("_https._tcp.svc-1-a.test-1.svc.cluster.local. 303 IN SRV 10 100 443 svc-1-a.test-1.svc.cluster.local."), test.SRV("_http._tcp.svc-1-b.test-1.svc.cluster.local. 303 IN SRV 10 100 80 svc-1-b.test-1.svc.cluster.local."), - test.SRV("_c-port._udp.svc-c.test-1.svc.cluster.local. 303 IN SRV 10 100 1234 svc-c.test-1.svc.cluster.local."), - test.SRV("_c-port._udp.headless-svc.test-1.svc.cluster.local. 303 IN SRV 10 100 1234 headless-svc.test-1.svc.cluster.local."), - test.SRV("_c-port._udp.headless-svc.test-1.svc.cluster.local. 303 IN SRV 10 100 1234 headless-svc.test-1.svc.cluster.local."), }, }, { - Qname: "any.test-2.svc.cluster.local.", Qtype: dns.TypeSRV, + Qname: "_*._*.any.test-2.svc.cluster.local.", Qtype: dns.TypeSRV, Rcode: dns.RcodeNameError, Answer: []dns.RR{}, }, { - Qname: "*.test-2.svc.cluster.local.", Qtype: dns.TypeSRV, + Qname: "_*._*.*.test-2.svc.cluster.local.", Qtype: dns.TypeSRV, Rcode: dns.RcodeNameError, Answer: []dns.RR{}, }, { - Qname: "*.*.svc.cluster.local.", Qtype: dns.TypeSRV, + Qname: "_http._tcp.*.*.svc.cluster.local.", Qtype: dns.TypeSRV, Rcode: dns.RcodeSuccess, Answer: []dns.RR{ test.SRV("_http._tcp.svc-1-a.test-1.svc.cluster.local. 303 IN SRV 10 100 80 svc-1-a.test-1.svc.cluster.local."), - test.SRV("_https._tcp.svc-1-a.test-1.svc.cluster.local. 303 IN SRV 10 100 443 svc-1-a.test-1.svc.cluster.local."), test.SRV("_http._tcp.svc-1-b.test-1.svc.cluster.local. 303 IN SRV 10 100 80 svc-1-b.test-1.svc.cluster.local."), - test.SRV("_c-port._udp.svc-c.test-1.svc.cluster.local. 303 IN SRV 10 100 1234 svc-c.test-1.svc.cluster.local."), - test.SRV("_c-port._udp.headless-svc.test-1.svc.cluster.local. 303 IN SRV 10 100 1234 headless-svc.test-1.svc.cluster.local."), - test.SRV("_c-port._udp.headless-svc.test-1.svc.cluster.local. 303 IN SRV 10 100 1234 headless-svc.test-1.svc.cluster.local."), }, }, { + Qname: "_*.svc-1-a.test-1.svc.cluster.local.", Qtype: dns.TypeSRV, + Rcode: dns.RcodeNameError, + Answer: []dns.RR{}, + }, + { + Qname: "_*._not-udp-or-tcp.svc-1-a.test-1.svc.cluster.local.", Qtype: dns.TypeSRV, + Rcode: dns.RcodeNameError, + Answer: []dns.RR{}, + }, + { + Qname: "svc-1-a.test-1.svc.cluster.local.", Qtype: dns.TypeSRV, + Rcode: dns.RcodeNameError, + Answer: []dns.RR{}, + }, + { Qname: "123.0.0.10.in-addr.arpa.", Qtype: dns.TypePTR, Rcode: dns.RcodeSuccess, Answer: []dns.RR{}, }, { Qname: "100.0.0.10.in-addr.arpa.", Qtype: dns.TypePTR, - Rcode: dns.RcodeSuccess, + Rcode: dns.RcodeSuccess, Answer: []dns.RR{ test.PTR("100.0.0.10.in-addr.arpa. 303 IN PTR svc-1-a.test-1.svc.cluster.local."), }, }, { Qname: "115.0.0.10.in-addr.arpa.", Qtype: dns.TypePTR, - Rcode: dns.RcodeSuccess, + Rcode: dns.RcodeSuccess, Answer: []dns.RR{ test.PTR("115.0.0.10.in-addr.arpa. 303 IN PTR svc-c.test-1.svc.cluster.local."), }, |