aboutsummaryrefslogtreecommitdiff
path: root/middleware/kubernetes/kubernetes.go
diff options
context:
space:
mode:
Diffstat (limited to 'middleware/kubernetes/kubernetes.go')
-rw-r--r--middleware/kubernetes/kubernetes.go77
1 files changed, 66 insertions, 11 deletions
diff --git a/middleware/kubernetes/kubernetes.go b/middleware/kubernetes/kubernetes.go
index 921014af6..c22afb5ba 100644
--- a/middleware/kubernetes/kubernetes.go
+++ b/middleware/kubernetes/kubernetes.go
@@ -39,6 +39,7 @@ type Kubernetes struct {
APIConn dnsController
ResyncPeriod time.Duration
Namespaces []string
+ Federations []Federation
LabelSelector *unversionedapi.LabelSelector
Selector *labels.Selector
PodMode string
@@ -78,9 +79,18 @@ type pod struct {
}
type recordRequest struct {
- port, protocol, endpoint, service, namespace, typeName, zone string
+ port string
+ protocol string
+ endpoint string
+ service string
+ namespace string
+ typeName string
+ zone string
+ federation string
}
+var localPodIP net.IP
+
var errNoItems = errors.New("no items found")
var errNsNotExposed = errors.New("namespace is not exposed")
var errInvalidRequest = errors.New("invalid query name")
@@ -236,16 +246,20 @@ func (k *Kubernetes) InitKubeCache() (err error) {
log.Printf("[INFO] Kubernetes middleware configured with the label selector '%s'. Only kubernetes objects matching this label selector will be exposed.", unversionedapi.FormatLabelSelector(k.LabelSelector))
}
- k.APIConn = newdnsController(kubeClient, k.ResyncPeriod, k.Selector, k.PodMode == PodModeVerified)
+ opts := dnsControlOpts{
+ initPodCache: k.PodMode == PodModeVerified,
+ }
+ k.APIConn = newdnsController(kubeClient, k.ResyncPeriod, k.Selector, opts)
return err
}
func (k *Kubernetes) parseRequest(lowerCasedName string, qtype uint16) (r recordRequest, err error) {
// 3 Possible cases
- // SRV Request: _port._protocol.service.namespace.type.zone
- // A Request (endpoint): endpoint.service.namespace.type.zone
- // A Request (service): service.namespace.type.zone
+ // SRV Request: _port._protocol.service.namespace.[federation.]type.zone
+ // A Request (endpoint): endpoint.service.namespace.[federation.]type.zone
+ // A Request (service): service.namespace.[federation.]type.zone
+
// separate zone from rest of lowerCasedName
var segs []string
for _, z := range k.Zones {
@@ -261,6 +275,8 @@ func (k *Kubernetes) parseRequest(lowerCasedName string, qtype uint16) (r record
return r, errZoneNotFound
}
+ r.federation, segs = k.stripFederation(segs)
+
if qtype == dns.TypeNS {
return r, nil
}
@@ -339,11 +355,17 @@ func (k *Kubernetes) Records(r recordRequest) ([]msg.Service, error) {
return nil, err
}
if len(services) == 0 && len(pods) == 0 {
- // Did not find item in k8s
+ // Did not find item in k8s, try federated
+ if r.federation != "" {
+ fedCNAME := k.federationCNAMERecord(r)
+ if fedCNAME.Key != "" {
+ return []msg.Service{fedCNAME}, nil
+ }
+ }
return nil, errNoItems
}
- records := k.getRecordsForK8sItems(services, pods, r.zone)
+ records := k.getRecordsForK8sItems(services, pods, r)
return records, nil
}
@@ -360,27 +382,37 @@ func endpointHostname(addr api.EndpointAddress) string {
return ""
}
-func (k *Kubernetes) getRecordsForK8sItems(services []service, pods []pod, zone string) (records []msg.Service) {
- zonePath := msg.Path(zone, "coredns")
+func (k *Kubernetes) getRecordsForK8sItems(services []service, pods []pod, r recordRequest) (records []msg.Service) {
+ zonePath := msg.Path(r.zone, "coredns")
for _, svc := range services {
if svc.addr == api.ClusterIPNone {
// This is a headless service, create records for each endpoint
for _, ep := range svc.endpoints {
s := msg.Service{
- Key: strings.Join([]string{zonePath, "svc", svc.namespace, svc.name, endpointHostname(ep.addr)}, "/"),
Host: ep.addr.IP,
Port: int(ep.port.Port),
}
+ if r.federation != "" {
+ s.Key = strings.Join([]string{zonePath, "svc", r.federation, svc.namespace, svc.name, endpointHostname(ep.addr)}, "/")
+ } else {
+ s.Key = strings.Join([]string{zonePath, "svc", svc.namespace, svc.name, endpointHostname(ep.addr)}, "/")
+ }
records = append(records, s)
}
} else {
// Create records for each exposed port...
for _, p := range svc.ports {
s := msg.Service{
- Key: strings.Join([]string{zonePath, "svc", svc.namespace, svc.name}, "/"),
Host: svc.addr,
Port: int(p.Port)}
+
+ if r.federation != "" {
+ s.Key = strings.Join([]string{zonePath, "svc", r.federation, svc.namespace, svc.name}, "/")
+ } else {
+ s.Key = strings.Join([]string{zonePath, "svc", svc.namespace, svc.name}, "/")
+ }
+
records = append(records, s)
}
// If the addr is not an IP (i.e. an external service), add the record ...
@@ -388,6 +420,11 @@ func (k *Kubernetes) getRecordsForK8sItems(services []service, pods []pod, zone
Key: strings.Join([]string{zonePath, "svc", svc.namespace, svc.name}, "/"),
Host: svc.addr}
if t, _ := s.HostType(); t == dns.TypeCNAME {
+ if r.federation != "" {
+ s.Key = strings.Join([]string{zonePath, "svc", r.federation, svc.namespace, svc.name}, "/")
+ } else {
+ s.Key = strings.Join([]string{zonePath, "svc", svc.namespace, svc.name}, "/")
+ }
records = append(records, s)
}
@@ -575,3 +612,21 @@ func (k *Kubernetes) getServiceRecordForIP(ip, name string) []msg.Service {
func symbolContainsWildcard(symbol string) bool {
return (symbol == "*" || symbol == "any")
}
+
+func (k *Kubernetes) localPodIP() net.IP {
+ if localPodIP != nil {
+ return localPodIP
+ }
+ addrs, _ := k.interfaceAddrs.interfaceAddrs()
+
+ for _, addr := range addrs {
+ ip, _, _ := net.ParseCIDR(addr.String())
+ ip = ip.To4()
+ if ip == nil || ip.IsLoopback() {
+ continue
+ }
+ localPodIP = ip
+ return localPodIP
+ }
+ return nil
+}