diff options
Diffstat (limited to 'middleware/kubernetes/kubernetes.go')
-rw-r--r-- | middleware/kubernetes/kubernetes.go | 77 |
1 files changed, 66 insertions, 11 deletions
diff --git a/middleware/kubernetes/kubernetes.go b/middleware/kubernetes/kubernetes.go index 921014af6..c22afb5ba 100644 --- a/middleware/kubernetes/kubernetes.go +++ b/middleware/kubernetes/kubernetes.go @@ -39,6 +39,7 @@ type Kubernetes struct { APIConn dnsController ResyncPeriod time.Duration Namespaces []string + Federations []Federation LabelSelector *unversionedapi.LabelSelector Selector *labels.Selector PodMode string @@ -78,9 +79,18 @@ type pod struct { } type recordRequest struct { - port, protocol, endpoint, service, namespace, typeName, zone string + port string + protocol string + endpoint string + service string + namespace string + typeName string + zone string + federation string } +var localPodIP net.IP + var errNoItems = errors.New("no items found") var errNsNotExposed = errors.New("namespace is not exposed") var errInvalidRequest = errors.New("invalid query name") @@ -236,16 +246,20 @@ func (k *Kubernetes) InitKubeCache() (err error) { log.Printf("[INFO] Kubernetes middleware configured with the label selector '%s'. Only kubernetes objects matching this label selector will be exposed.", unversionedapi.FormatLabelSelector(k.LabelSelector)) } - k.APIConn = newdnsController(kubeClient, k.ResyncPeriod, k.Selector, k.PodMode == PodModeVerified) + opts := dnsControlOpts{ + initPodCache: k.PodMode == PodModeVerified, + } + k.APIConn = newdnsController(kubeClient, k.ResyncPeriod, k.Selector, opts) return err } func (k *Kubernetes) parseRequest(lowerCasedName string, qtype uint16) (r recordRequest, err error) { // 3 Possible cases - // SRV Request: _port._protocol.service.namespace.type.zone - // A Request (endpoint): endpoint.service.namespace.type.zone - // A Request (service): service.namespace.type.zone + // SRV Request: _port._protocol.service.namespace.[federation.]type.zone + // A Request (endpoint): endpoint.service.namespace.[federation.]type.zone + // A Request (service): service.namespace.[federation.]type.zone + // separate zone from rest of lowerCasedName var segs []string for _, z := range k.Zones { @@ -261,6 +275,8 @@ func (k *Kubernetes) parseRequest(lowerCasedName string, qtype uint16) (r record return r, errZoneNotFound } + r.federation, segs = k.stripFederation(segs) + if qtype == dns.TypeNS { return r, nil } @@ -339,11 +355,17 @@ func (k *Kubernetes) Records(r recordRequest) ([]msg.Service, error) { return nil, err } if len(services) == 0 && len(pods) == 0 { - // Did not find item in k8s + // Did not find item in k8s, try federated + if r.federation != "" { + fedCNAME := k.federationCNAMERecord(r) + if fedCNAME.Key != "" { + return []msg.Service{fedCNAME}, nil + } + } return nil, errNoItems } - records := k.getRecordsForK8sItems(services, pods, r.zone) + records := k.getRecordsForK8sItems(services, pods, r) return records, nil } @@ -360,27 +382,37 @@ func endpointHostname(addr api.EndpointAddress) string { return "" } -func (k *Kubernetes) getRecordsForK8sItems(services []service, pods []pod, zone string) (records []msg.Service) { - zonePath := msg.Path(zone, "coredns") +func (k *Kubernetes) getRecordsForK8sItems(services []service, pods []pod, r recordRequest) (records []msg.Service) { + zonePath := msg.Path(r.zone, "coredns") for _, svc := range services { if svc.addr == api.ClusterIPNone { // This is a headless service, create records for each endpoint for _, ep := range svc.endpoints { s := msg.Service{ - Key: strings.Join([]string{zonePath, "svc", svc.namespace, svc.name, endpointHostname(ep.addr)}, "/"), Host: ep.addr.IP, Port: int(ep.port.Port), } + if r.federation != "" { + s.Key = strings.Join([]string{zonePath, "svc", r.federation, svc.namespace, svc.name, endpointHostname(ep.addr)}, "/") + } else { + s.Key = strings.Join([]string{zonePath, "svc", svc.namespace, svc.name, endpointHostname(ep.addr)}, "/") + } records = append(records, s) } } else { // Create records for each exposed port... for _, p := range svc.ports { s := msg.Service{ - Key: strings.Join([]string{zonePath, "svc", svc.namespace, svc.name}, "/"), Host: svc.addr, Port: int(p.Port)} + + if r.federation != "" { + s.Key = strings.Join([]string{zonePath, "svc", r.federation, svc.namespace, svc.name}, "/") + } else { + s.Key = strings.Join([]string{zonePath, "svc", svc.namespace, svc.name}, "/") + } + records = append(records, s) } // If the addr is not an IP (i.e. an external service), add the record ... @@ -388,6 +420,11 @@ func (k *Kubernetes) getRecordsForK8sItems(services []service, pods []pod, zone Key: strings.Join([]string{zonePath, "svc", svc.namespace, svc.name}, "/"), Host: svc.addr} if t, _ := s.HostType(); t == dns.TypeCNAME { + if r.federation != "" { + s.Key = strings.Join([]string{zonePath, "svc", r.federation, svc.namespace, svc.name}, "/") + } else { + s.Key = strings.Join([]string{zonePath, "svc", svc.namespace, svc.name}, "/") + } records = append(records, s) } @@ -575,3 +612,21 @@ func (k *Kubernetes) getServiceRecordForIP(ip, name string) []msg.Service { func symbolContainsWildcard(symbol string) bool { return (symbol == "*" || symbol == "any") } + +func (k *Kubernetes) localPodIP() net.IP { + if localPodIP != nil { + return localPodIP + } + addrs, _ := k.interfaceAddrs.interfaceAddrs() + + for _, addr := range addrs { + ip, _, _ := net.ParseCIDR(addr.String()) + ip = ip.To4() + if ip == nil || ip.IsLoopback() { + continue + } + localPodIP = ip + return localPodIP + } + return nil +} |