aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--middleware/kubernetes/controller.go72
-rw-r--r--middleware/kubernetes/kubernetes.go28
-rw-r--r--middleware/kubernetes/setup.go4
-rw-r--r--test/kubernetes_test.go25
4 files changed, 123 insertions, 6 deletions
diff --git a/middleware/kubernetes/controller.go b/middleware/kubernetes/controller.go
index 150cc843f..e6e79366a 100644
--- a/middleware/kubernetes/controller.go
+++ b/middleware/kubernetes/controller.go
@@ -1,6 +1,7 @@
package kubernetes
import (
+ "errors"
"fmt"
"log"
"sync"
@@ -24,6 +25,8 @@ type storeToNamespaceLister struct {
cache.Store
}
+const podIPIndex = "PodIP"
+
// List lists all Namespaces in the store.
func (s *storeToNamespaceLister) List() (ns api.NamespaceList, err error) {
for _, m := range s.Store.List() {
@@ -38,10 +41,12 @@ type dnsController struct {
selector *labels.Selector
svcController *cache.Controller
+ podController *cache.Controller
nsController *cache.Controller
epController *cache.Controller
svcLister cache.StoreToServiceLister
+ podLister cache.StoreToPodLister
nsLister storeToNamespaceLister
epLister cache.StoreToEndpointsLister
@@ -54,7 +59,7 @@ type dnsController struct {
}
// newDNSController creates a controller for CoreDNS.
-func newdnsController(kubeClient *kubernetes.Clientset, resyncPeriod time.Duration, lselector *labels.Selector) *dnsController {
+func newdnsController(kubeClient *kubernetes.Clientset, resyncPeriod time.Duration, lselector *labels.Selector, initPodCache bool) *dnsController {
dns := dnsController{
client: kubeClient,
selector: lselector,
@@ -71,6 +76,18 @@ func newdnsController(kubeClient *kubernetes.Clientset, resyncPeriod time.Durati
cache.ResourceEventHandlerFuncs{},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
+ if initPodCache {
+ dns.podLister.Indexer, dns.podController = cache.NewIndexerInformer(
+ &cache.ListWatch{
+ ListFunc: podListFunc(dns.client, namespace, dns.selector),
+ WatchFunc: podWatchFunc(dns.client, namespace, dns.selector),
+ },
+ &api.Pod{}, // TODO replace with a lighter-weight custom struct
+ resyncPeriod,
+ cache.ResourceEventHandlerFuncs{},
+ cache.Indexers{podIPIndex: podIPIndexFunc})
+ }
+
dns.nsLister.Store, dns.nsController = cache.NewInformer(
&cache.ListWatch{
ListFunc: namespaceListFunc(dns.client, dns.selector),
@@ -88,6 +105,14 @@ func newdnsController(kubeClient *kubernetes.Clientset, resyncPeriod time.Durati
return &dns
}
+func podIPIndexFunc(obj interface{}) ([]string, error) {
+ p, ok := obj.(*api.Pod)
+ if !ok {
+ return nil, errors.New("obj was not an *api.Pod")
+ }
+ return []string{p.Status.PodIP}, nil
+}
+
func serviceListFunc(c *kubernetes.Clientset, ns string, s *labels.Selector) func(api.ListOptions) (runtime.Object, error) {
return func(opts api.ListOptions) (runtime.Object, error) {
if s != nil {
@@ -107,6 +132,26 @@ func serviceListFunc(c *kubernetes.Clientset, ns string, s *labels.Selector) fun
}
}
+func podListFunc(c *kubernetes.Clientset, ns string, s *labels.Selector) func(api.ListOptions) (runtime.Object, error) {
+ return func(opts api.ListOptions) (runtime.Object, error) {
+ if s != nil {
+ opts.LabelSelector = *s
+ }
+ listV1, err := c.Core().Pods(ns).List(opts)
+
+ if err != nil {
+ return nil, err
+ }
+ var listAPI api.PodList
+ err = v1.Convert_v1_PodList_To_api_PodList(listV1, &listAPI, nil)
+ if err != nil {
+ return nil, err
+ }
+
+ return &listAPI, err
+ }
+}
+
func v1ToAPIFilter(in watch.Event) (out watch.Event, keep bool) {
if in.Type == watch.Error {
return in, true
@@ -121,6 +166,14 @@ func v1ToAPIFilter(in watch.Event) (out watch.Event, keep bool) {
return in, true
}
return watch.Event{Type: in.Type, Object: &apiObj}, true
+ case *v1.Pod:
+ var apiObj api.Pod
+ err := v1.Convert_v1_Pod_To_api_Pod(v1Obj, &apiObj, nil)
+ if err != nil {
+ log.Printf("[ERROR] Could not convert v1.Pod: %s", err)
+ return in, true
+ }
+ return watch.Event{Type: in.Type, Object: &apiObj}, true
case *v1.Namespace:
var apiObj api.Namespace
err := v1.Convert_v1_Namespace_To_api_Namespace(v1Obj, &apiObj, nil)
@@ -156,6 +209,20 @@ func serviceWatchFunc(c *kubernetes.Clientset, ns string, s *labels.Selector) fu
}
}
+func podWatchFunc(c *kubernetes.Clientset, ns string, s *labels.Selector) func(options api.ListOptions) (watch.Interface, error) {
+ return func(options api.ListOptions) (watch.Interface, error) {
+ if s != nil {
+ options.LabelSelector = *s
+ }
+ w, err := c.Core().Pods(ns).Watch(options)
+
+ if err != nil {
+ return nil, err
+ }
+ return watch.Filter(w, v1ToAPIFilter), nil
+ }
+}
+
func namespaceListFunc(c *kubernetes.Clientset, s *labels.Selector) func(api.ListOptions) (runtime.Object, error) {
return func(opts api.ListOptions) (runtime.Object, error) {
if s != nil {
@@ -244,6 +311,9 @@ func (dns *dnsController) Run() {
go dns.svcController.Run(dns.stopCh)
go dns.nsController.Run(dns.stopCh)
go dns.epController.Run(dns.stopCh)
+ if dns.podController != nil {
+ go dns.podController.Run(dns.stopCh)
+ }
<-dns.stopCh
}
diff --git a/middleware/kubernetes/kubernetes.go b/middleware/kubernetes/kubernetes.go
index 9141cd40a..c59f98835 100644
--- a/middleware/kubernetes/kubernetes.go
+++ b/middleware/kubernetes/kubernetes.go
@@ -45,6 +45,7 @@ type Kubernetes struct {
const (
PodModeDisabled = "disabled" // default. pod requests are ignored
+ PodModeVerified = "verified" // Pod requests are answered only if they exist
PodModeInsecure = "insecure" // ALL pod requests are answered without verfying they exist
DnsSchemaVersion = "1.0.0" // https://github.com/kubernetes/dns/blob/master/docs/specification.md
)
@@ -197,7 +198,7 @@ func (k *Kubernetes) InitKubeCache() error {
log.Printf("[INFO] Kubernetes middleware configured with the label selector '%s'. Only kubernetes objects matching this label selector will be exposed.", unversionedapi.FormatLabelSelector(k.LabelSelector))
}
- k.APIConn = newdnsController(kubeClient, k.ResyncPeriod, k.Selector)
+ k.APIConn = newdnsController(kubeClient, k.ResyncPeriod, k.Selector, k.PodMode == PodModeVerified)
return err
}
@@ -384,9 +385,30 @@ func (k *Kubernetes) findPods(namespace, podname string) (pods []pod, err error)
return pods, nil
}
- // TODO: implement cache verified pod responses
- return pods, nil
+ // PodModeVerified
+ objList, err := k.APIConn.podLister.Indexer.ByIndex(podIPIndex, ip)
+ if err != nil {
+ return nil, err
+ }
+ nsWildcard := symbolContainsWildcard(namespace)
+ for _, o := range objList {
+ p, ok := o.(*api.Pod)
+ if !ok {
+ return nil, errors.New("expected type *api.Pod")
+ }
+ // If namespace has a wildcard, filter results against Corefile namespace list.
+ if nsWildcard && (len(k.Namespaces) > 0) && (!dnsstrings.StringInSlice(p.Namespace, k.Namespaces)) {
+ continue
+ }
+ // check for matching ip and namespace
+ if ip == p.Status.PodIP && symbolMatches(namespace, p.Namespace, nsWildcard) {
+ s := pod{name: podname, namespace: namespace, addr: ip}
+ pods = append(pods, s)
+ return pods, nil
+ }
+ }
+ return pods, nil
}
// Get retrieves matching data from the cache.
diff --git a/middleware/kubernetes/setup.go b/middleware/kubernetes/setup.go
index 0813ef7b3..7fd9804e0 100644
--- a/middleware/kubernetes/setup.go
+++ b/middleware/kubernetes/setup.go
@@ -88,10 +88,10 @@ func kubernetesParse(c *caddy.Controller) (*Kubernetes, error) {
args := c.RemainingArgs()
if len(args) == 1 {
switch args[0] {
- case PodModeDisabled, PodModeInsecure:
+ case PodModeDisabled, PodModeInsecure, PodModeVerified:
k8s.PodMode = args[0]
default:
- return nil, errors.New("pods must be one of: disabled, insecure")
+ return nil, errors.New("pods must be one of: disabled, verified, insecure")
}
continue
}
diff --git a/test/kubernetes_test.go b/test/kubernetes_test.go
index 69bd6502c..cdb8add2b 100644
--- a/test/kubernetes_test.go
+++ b/test/kubernetes_test.go
@@ -242,6 +242,19 @@ var dnsTestCasesPodsInsecure = []test.Case{
},
}
+var dnsTestCasesPodsVerified = []test.Case{
+ {
+ Qname: "10-20-0-101.test-1.pod.cluster.local.", Qtype: dns.TypeA,
+ Rcode: dns.RcodeNameError,
+ Answer: []dns.RR{},
+ },
+ {
+ Qname: "10-20-0-101.test-X.pod.cluster.local.", Qtype: dns.TypeA,
+ Rcode: dns.RcodeNameError,
+ Answer: []dns.RR{},
+ },
+}
+
func createTestServer(t *testing.T, corefile string) (*caddy.Instance, string) {
server, err := CoreDNSServer(corefile)
if err != nil {
@@ -315,3 +328,15 @@ func TestKubernetesIntegrationPodsInsecure(t *testing.T) {
`
doIntegrationTests(t, corefile, dnsTestCasesPodsInsecure)
}
+
+func TestKubernetesIntegrationPodsVerified(t *testing.T) {
+ corefile :=
+ `.:0 {
+ kubernetes cluster.local 0.0.10.in-addr.arpa {
+ endpoint http://localhost:8080
+ namespaces test-1
+ pods verified
+ }
+`
+ doIntegrationTests(t, corefile, dnsTestCasesPodsVerified)
+}