diff options
author | 2017-01-05 10:09:59 -0500 | |
---|---|---|
committer | 2017-01-05 15:09:59 +0000 | |
commit | 9a5e0c64fdbefc401f7476014c21700a5dbed454 (patch) | |
tree | a3c93d451482e2912a7d1d22b296ad4ddfcafa30 /middleware/kubernetes/kubernetes.go | |
parent | 39af7e40762db62386f4a7564ad52815ca829f8b (diff) | |
download | coredns-9a5e0c64fdbefc401f7476014c21700a5dbed454.tar.gz coredns-9a5e0c64fdbefc401f7476014c21700a5dbed454.tar.zst coredns-9a5e0c64fdbefc401f7476014c21700a5dbed454.zip |
handle A/PTR/SRV for headless services/endpoints (#464)
* handle A/PTR/SRV for headless services/endpoints
* error early if _proto will produce nothing
* remove wc params + various style tweaks
* Release 004
* handle A/PTR/SRV for headless services/endpoints
* error early if _proto will produce nothing
* remove wc params + various style tweaks
* optimize srv prefix validation
* poking travis
* reduce response sizes, clean func params
Diffstat (limited to 'middleware/kubernetes/kubernetes.go')
-rw-r--r-- | middleware/kubernetes/kubernetes.go | 253 |
1 files changed, 201 insertions, 52 deletions
diff --git a/middleware/kubernetes/kubernetes.go b/middleware/kubernetes/kubernetes.go index f673fe472..68086702e 100644 --- a/middleware/kubernetes/kubernetes.go +++ b/middleware/kubernetes/kubernetes.go @@ -5,7 +5,6 @@ import ( "errors" "fmt" "log" - "strconv" "strings" "time" @@ -45,11 +44,28 @@ type Kubernetes struct { Selector *labels.Selector } +type endpoint struct { + addr api.EndpointAddress + port api.EndpointPort +} + +type service struct { + name string + namespace string + addr string + ports []api.ServicePort + endpoints []endpoint +} + var errNoItems = errors.New("no items found") var errNsNotExposed = errors.New("namespace is not exposed") +var errInvalidRequest = errors.New("invalid query name") // Services implements the ServiceBackend interface. func (k *Kubernetes) Services(state request.Request, exact bool, opt middleware.Options) ([]msg.Service, []msg.Service, error) { + if state.Type() == "SRV" && !ValidSRV(state.Name()) { + return nil, nil, errInvalidRequest + } s, e := k.Records(state.Name(), exact) return s, nil, e // Haven't implemented debug queries yet. } @@ -77,7 +93,7 @@ func (k *Kubernetes) Lookup(state request.Request, name string, typ uint16) (*dn // IsNameError implements the ServiceBackend interface. func (k *Kubernetes) IsNameError(err error) bool { - return err == errNoItems || err == errNsNotExposed + return err == errNoItems || err == errNsNotExposed || err == errInvalidRequest } // Debug implements the ServiceBackend interface. @@ -171,6 +187,23 @@ func (k *Kubernetes) getZoneForName(name string) (string, []string) { return zone, serviceSegments } +// stripSRVPrefix separates out the port and protocol segments, if present +// If not present, assume all ports/protocols (e.g. wildcard) +func stripSRVPrefix(name []string) (string, string, []string) { + if name[0][0] == '_' && name[1][0] == '_' { + return name[0][1:], name[1][1:], name[2:] + } + // no srv prefix present + return "*", "*", name +} + +func stripEndpointName(name []string) (endpoint string, nameOut []string) { + if len(name) == 4 { + return strings.ToLower(name[0]), name[1:] + } + return "", name +} + // Records looks up services in kubernetes. If exact is true, it will lookup // just this name. This is used when find matches when completing SRV lookups // for instance. @@ -182,6 +215,8 @@ func (k *Kubernetes) Records(name string, exact bool) ([]msg.Service, error) { ) zone, serviceSegments := k.getZoneForName(name) + port, protocol, serviceSegments := stripSRVPrefix(serviceSegments) + endpointname, serviceSegments := stripEndpointName(serviceSegments) if len(serviceSegments) < 3 { return nil, errNoItems } @@ -205,16 +240,13 @@ func (k *Kubernetes) Records(name string, exact bool) ([]msg.Service, error) { serviceName = "*" } - nsWildcard := symbolContainsWildcard(namespace) - serviceWildcard := symbolContainsWildcard(serviceName) - // Abort if the namespace does not contain a wildcard, and namespace is not published per CoreFile // Case where namespace contains a wildcard is handled in Get(...) method. - if (!nsWildcard) && (len(k.Namespaces) > 0) && (!dnsstrings.StringInSlice(namespace, k.Namespaces)) { + if (!symbolContainsWildcard(namespace)) && (len(k.Namespaces) > 0) && (!dnsstrings.StringInSlice(namespace, k.Namespaces)) { return nil, errNsNotExposed } - k8sItems, err := k.Get(namespace, nsWildcard, serviceName, serviceWildcard, typeName) + k8sItems, err := k.Get(namespace, serviceName, endpointname, port, protocol, typeName) if err != nil { return nil, err } @@ -227,40 +259,41 @@ func (k *Kubernetes) Records(name string, exact bool) ([]msg.Service, error) { return records, nil } -func (k *Kubernetes) getRecordsForServiceItems(serviceItems []*api.Service, zone string) []msg.Service { +func endpointHostname(addr api.EndpointAddress) string { + if addr.Hostname != "" { + return strings.ToLower(addr.Hostname) + } + if strings.Contains(addr.IP, ".") { + return strings.Replace(addr.IP, ".", "-", -1) + } + if strings.Contains(addr.IP, ":") { + return strings.ToLower(strings.Replace(addr.IP, ":", "-", -1)) + } + return "" +} + +func (k *Kubernetes) getRecordsForServiceItems(serviceItems []service, zone string) []msg.Service { var records []msg.Service - for _, item := range serviceItems { - - key := k.NameTemplate.RecordNameFromNameValues(nametemplate.NameValues{TypeName: "svc", ServiceName: item.ObjectMeta.Name, Namespace: item.ObjectMeta.Namespace, Zone: zone}) - - key = strings.Replace(key, ".", "/", -1) - - clusterIP := item.Spec.ClusterIP - if clusterIP == api.ClusterIPNone { - // This is a headless service, create records for each pod - epList, _ := k.APIConn.epLister.List() - for _, ep := range epList.Items { - if ep.ObjectMeta.Name == item.ObjectMeta.Name && ep.ObjectMeta.Namespace == item.ObjectMeta.Namespace { - for _, eps := range ep.Subsets { - for i, port := range eps.Ports { - for j, addr := range eps.Addresses { - refid := strconv.Itoa(j*1024 + i) - s := msg.Service{ - Key: msg.Path(strings.ToLower(refid+"._"+port.Name+"._"+string(port.Protocol)+"."+key), "coredns"), - Host: addr.IP, Port: int(port.Port), - } - records = append(records, s) - } - } - } + for _, svc := range serviceItems { + + key := svc.name + "." + svc.namespace + ".svc." + zone + + if svc.addr == api.ClusterIPNone { + // This is a headless service, create records for each endpoint + for _, ep := range svc.endpoints { + ephostname := endpointHostname(ep.addr) + s := msg.Service{ + Key: msg.Path(strings.ToLower(ephostname+"."+key), "coredns"), + Host: ep.addr.IP, Port: int(ep.port.Port), } + records = append(records, s) + } } else { // Create records for each exposed port... - - for _, p := range item.Spec.Ports { - s := msg.Service{Key: msg.Path(strings.ToLower("_"+p.Name+"._"+string(p.Protocol)+"."+key), "coredns"), Host: clusterIP, Port: int(p.Port)} + for _, p := range svc.ports { + s := msg.Service{Key: msg.Path(strings.ToLower(key), "coredns"), Host: svc.addr, Port: int(p.Port)} records = append(records, s) } } @@ -270,36 +303,75 @@ func (k *Kubernetes) getRecordsForServiceItems(serviceItems []*api.Service, zone } // Get performs the call to the Kubernetes http API. -func (k *Kubernetes) Get(namespace string, nsWildcard bool, servicename string, serviceWildcard bool, typeName string) ([]*api.Service, error) { +func (k *Kubernetes) Get(namespace, servicename, endpointname, port, protocol, typeName string) (services []service, err error) { switch { case typeName == "pod": - return nil, fmt.Errorf("pod not implemented") + return nil, fmt.Errorf("%v not implemented", typeName) default: - return k.getServices(namespace, nsWildcard, servicename, serviceWildcard) + return k.getServices(namespace, servicename, endpointname, port, protocol) } } -func (k *Kubernetes) getServices(namespace string, nsWildcard bool, servicename string, serviceWildcard bool) ([]*api.Service, error) { +func (k *Kubernetes) getServices(namespace, servicename, endpointname, port, protocol string) ([]service, error) { serviceList := k.APIConn.ServiceList() - var resultItems []*api.Service + var resultItems []service - for _, item := range serviceList { - if symbolMatches(namespace, item.Namespace, nsWildcard) && symbolMatches(servicename, item.Name, serviceWildcard) { - // If namespace has a wildcard, filter results against Corefile namespace list. - // (Namespaces without a wildcard were filtered before the call to this function.) - if nsWildcard && (len(k.Namespaces) > 0) && (!dnsstrings.StringInSlice(item.Namespace, k.Namespaces)) { + nsWildcard := symbolContainsWildcard(namespace) + serviceWildcard := symbolContainsWildcard(servicename) + portWildcard := symbolContainsWildcard(port) + protocolWildcard := symbolContainsWildcard(protocol) + + for _, svc := range serviceList { + if !(symbolMatches(namespace, svc.Namespace, nsWildcard) && symbolMatches(servicename, svc.Name, serviceWildcard)) { + continue + } + // If namespace has a wildcard, filter results against Corefile namespace list. + // (Namespaces without a wildcard were filtered before the call to this function.) + if nsWildcard && (len(k.Namespaces) > 0) && (!dnsstrings.StringInSlice(svc.Namespace, k.Namespaces)) { + continue + } + s := service{name: svc.Name, namespace: svc.Namespace, addr: svc.Spec.ClusterIP} + if s.addr != api.ClusterIPNone { + for _, p := range svc.Spec.Ports { + if !(symbolMatches(port, strings.ToLower(p.Name), portWildcard) && symbolMatches(protocol, strings.ToLower(string(p.Protocol)), protocolWildcard)) { + continue + } + s.ports = append(s.ports, p) + } + resultItems = append(resultItems, s) + continue + } + // Headless service + endpointsList, err := k.APIConn.epLister.List() + if err != nil { + continue + } + for _, ep := range endpointsList.Items { + if ep.ObjectMeta.Name != svc.Name || ep.ObjectMeta.Namespace != svc.Namespace { continue } - - resultItems = append(resultItems, item) + for _, eps := range ep.Subsets { + for _, addr := range eps.Addresses { + for _, p := range eps.Ports { + ephostname := endpointHostname(addr) + if endpointname != "" && endpointname != ephostname { + continue + } + if !(symbolMatches(port, strings.ToLower(p.Name), portWildcard) && symbolMatches(protocol, strings.ToLower(string(p.Protocol)), protocolWildcard)) { + continue + } + s.endpoints = append(s.endpoints, endpoint{addr: addr, port: p}) + } + } + } } + resultItems = append(resultItems, s) } - return resultItems, nil } -func symbolMatches(queryString string, candidateString string, wildcard bool) bool { +func symbolMatches(queryString, candidateString string, wildcard bool) bool { result := false switch { case !wildcard: @@ -312,18 +384,41 @@ func symbolMatches(queryString string, candidateString string, wildcard bool) bo return result } +// getServiceRecordForIP: Gets a service record with a cluster ip matching the ip argument +// If a service cluster ip does not match, it checks all endpoints func (k *Kubernetes) getServiceRecordForIP(ip, name string) []msg.Service { + // First check services with cluster ips svcList, err := k.APIConn.svcLister.List(labels.Everything()) if err != nil { return nil } for _, service := range svcList { + if !dnsstrings.StringInSlice(service.Namespace, k.Namespaces) { + continue + } if service.Spec.ClusterIP == ip { - name := k.NameTemplate.RecordNameFromNameValues(nametemplate.NameValues{TypeName: "svc", ServiceName: service.ObjectMeta.Name, Namespace: service.ObjectMeta.Namespace, Zone: k.PrimaryZone()}) - return []msg.Service{msg.Service{Host: name}} + domain := service.Name + "." + service.Namespace + ".svc." + k.PrimaryZone() + return []msg.Service{msg.Service{Host: domain}} + } + } + // If no cluster ips match, search endpoints + epList, err := k.APIConn.epLister.List() + if err != nil { + return nil + } + for _, ep := range epList.Items { + if !dnsstrings.StringInSlice(ep.ObjectMeta.Namespace, k.Namespaces) { + continue + } + for _, eps := range ep.Subsets { + for _, addr := range eps.Addresses { + if addr.IP == ip { + domain := endpointHostname(addr) + "." + ep.ObjectMeta.Name + "." + ep.ObjectMeta.Namespace + ".svc." + k.PrimaryZone() + return []msg.Service{msg.Service{Host: domain}} + } + } } } - return nil } @@ -331,3 +426,57 @@ func (k *Kubernetes) getServiceRecordForIP(ip, name string) []msg.Service { func symbolContainsWildcard(symbol string) bool { return (strings.Contains(symbol, "*") || (symbol == "any")) } + +// ValidSRV parses a server record validating _port._proto. prefix labels. +// The valid schema is: +// * Fist two segments must start with an "_", +// * Second segment must be one of _tcp|_udp|_*|_any +func ValidSRV(name string) bool { + + // Does it start with a "_" ? + if len(name) > 0 && name[0] != '_' { + return false + } + + // First label + first, end := dns.NextLabel(name, 0) + if end { + return false + } + // Second label + off, end := dns.NextLabel(name, first) + if end { + return false + } + + // first:off has captured _tcp. or _udp. (if present) + second := name[first:off] + if len(second) > 0 && second[0] != '_' { + return false + } + + // A bit convoluted to avoid strings.ToLower + if len(second) == 5 { + // matches _tcp + if (second[1] == 't' || second[1] == 'T') && (second[2] == 'c' || second[2] == 'C') && + (second[3] == 'p' || second[3] == 'P') { + return true + } + // matches _udp + if (second[1] == 'u' || second[1] == 'U') && (second[2] == 'd' || second[2] == 'D') && + (second[3] == 'p' || second[3] == 'P') { + return true + } + // matches _any + if (second[1] == 'a' || second[1] == 'A') && (second[2] == 'n' || second[2] == 'N') && + (second[3] == 'y' || second[3] == 'Y') { + return true + } + } + // matches _* + if len(second) == 3 && second[1] == '*' { + return true + } + + return false +} |