aboutsummaryrefslogtreecommitdiff
path: root/plugin/forwardcrd/controller.go
diff options
context:
space:
mode:
authorGravatar Christian Ang <christian.ang@outlook.com> 2021-11-12 08:22:34 -0800
committerGravatar GitHub <noreply@github.com> 2021-11-12 11:22:34 -0500
commit2e6953c7dbd1d6b359911e1ce92e2567df07ca8c (patch)
treed91514ca867bb5b000bec3ea219e6a2ab0a0c244 /plugin/forwardcrd/controller.go
parent6953ab2b4f23f916a08b68ba51b9a26e41e9a748 (diff)
downloadcoredns-2e6953c7dbd1d6b359911e1ce92e2567df07ca8c.tar.gz
coredns-2e6953c7dbd1d6b359911e1ce92e2567df07ca8c.tar.zst
coredns-2e6953c7dbd1d6b359911e1ce92e2567df07ca8c.zip
Initial implementation of ForwardCRD plugin (#4512)
* Add forwardcrd plugin README.md Co-authored-by: Aidan Obley <aobley@vmware.com> Signed-off-by: Christian Ang <angc@vmware.com> * Create forwardcrd plugin - Place forwardcrd before forward plugin in plugin list. This will avoid forward from preventing the forwardcrd plugin from handling any queries in the case of having a default upstream forwarder in a server block (as is the case in the default kubernetes Corefile). Co-authored-by: Aidan Obley <aobley@vmware.com> Signed-off-by: Christian Ang <angc@vmware.com> * Add Forward CRD Signed-off-by: Christian Ang <angc@vmware.com> * Add NewWithConfig to forward plugin - allows external packages to instanciate forward plugins Co-authored-by: Aidan Obley <aobley@vmware.com> Signed-off-by: Christian Ang <angc@vmware.com> * ForwardCRD plugin handles requests for Forward CRs - add a Kubernetes controller that can read Forward CRs - instances of the forward plugin are created based on Forward CRs from the Kubernetes controller - DNS requests are handled by calling matching Forward plugin instances based on zone name - Defaults to the kube-system namespace to align with Corefile RBAC Signed-off-by: Christian Ang <angc@vmware.com> Use klog v2 in forwardcrd plugin * Refactor forward setup to use NewWithConfig Co-authored-by: Christian Ang <angc@vmware.com> Signed-off-by: Edwin Xie <exie@vmware.com> * Use ParseInt instead of Atoi - to ensure that the bitsize is 32 for later casting to uint32 Signed-off-by: Christian Ang <angc@vmware.com> * Add @christianang to CODEOWNERS for forwardcrd Signed-off-by: Christian Ang <angc@vmware.com> Co-authored-by: Edwin Xie <exie@vmware.com>
Diffstat (limited to 'plugin/forwardcrd/controller.go')
-rw-r--r--plugin/forwardcrd/controller.go238
1 files changed, 238 insertions, 0 deletions
diff --git a/plugin/forwardcrd/controller.go b/plugin/forwardcrd/controller.go
new file mode 100644
index 000000000..b35e77b62
--- /dev/null
+++ b/plugin/forwardcrd/controller.go
@@ -0,0 +1,238 @@
+package forwardcrd
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "sync"
+ "time"
+
+ "github.com/coredns/coredns/plugin"
+ "github.com/coredns/coredns/plugin/dnstap"
+ "github.com/coredns/coredns/plugin/forward"
+ corednsv1alpha1 "github.com/coredns/coredns/plugin/forwardcrd/apis/coredns/v1alpha1"
+
+ metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
+ "k8s.io/apimachinery/pkg/runtime"
+ utilruntime "k8s.io/apimachinery/pkg/util/runtime"
+ "k8s.io/apimachinery/pkg/util/wait"
+ "k8s.io/apimachinery/pkg/watch"
+ "k8s.io/client-go/dynamic"
+ "k8s.io/client-go/tools/cache"
+ "k8s.io/client-go/util/workqueue"
+)
+
+const defaultResyncPeriod = 0
+
+type forwardCRDController interface {
+ Run(threads int)
+ HasSynced() bool
+ Stop() error
+}
+
+type forwardCRDControl struct {
+ client dynamic.Interface
+ scheme *runtime.Scheme
+ forwardController cache.Controller
+ forwardLister cache.Store
+ workqueue workqueue.RateLimitingInterface
+ pluginMap *PluginInstanceMap
+ instancer pluginInstancer
+ tapPlugin *dnstap.Dnstap
+ namespace string
+
+ // stopLock is used to enforce only a single call to Stop is active.
+ // Needed because we allow stopping through an http endpoint and
+ // allowing concurrent stoppers leads to stack traces.
+ stopLock sync.Mutex
+ shutdown bool
+ stopCh chan struct{}
+}
+
+type lifecyclePluginHandler interface {
+ plugin.Handler
+ OnStartup() error
+ OnShutdown() error
+}
+
+type pluginInstancer func(forward.ForwardConfig) (lifecyclePluginHandler, error)
+
+func newForwardCRDController(ctx context.Context, client dynamic.Interface, scheme *runtime.Scheme, namespace string, pluginMap *PluginInstanceMap, instancer pluginInstancer) forwardCRDController {
+ controller := forwardCRDControl{
+ client: client,
+ scheme: scheme,
+ stopCh: make(chan struct{}),
+ namespace: namespace,
+ pluginMap: pluginMap,
+ instancer: instancer,
+ workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ForwardCRD"),
+ }
+
+ controller.forwardLister, controller.forwardController = cache.NewInformer(
+ &cache.ListWatch{
+ ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
+ if namespace != "" {
+ return controller.client.Resource(corednsv1alpha1.GroupVersion.WithResource("forwards")).Namespace(namespace).List(ctx, options)
+ }
+ return controller.client.Resource(corednsv1alpha1.GroupVersion.WithResource("forwards")).List(ctx, options)
+ },
+ WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
+ if namespace != "" {
+ return controller.client.Resource(corednsv1alpha1.GroupVersion.WithResource("forwards")).Namespace(namespace).Watch(ctx, options)
+ }
+ return controller.client.Resource(corednsv1alpha1.GroupVersion.WithResource("forwards")).Watch(ctx, options)
+ },
+ },
+ &unstructured.Unstructured{},
+ defaultResyncPeriod,
+ cache.ResourceEventHandlerFuncs{
+ AddFunc: func(obj interface{}) {
+ key, err := cache.MetaNamespaceKeyFunc(obj)
+ if err == nil {
+ controller.workqueue.Add(key)
+ }
+ },
+ UpdateFunc: func(oldObj, newObj interface{}) {
+ key, err := cache.MetaNamespaceKeyFunc(newObj)
+ if err == nil {
+ controller.workqueue.Add(key)
+ }
+ },
+ DeleteFunc: func(obj interface{}) {
+ key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
+ if err == nil {
+ controller.workqueue.Add(key)
+ }
+ },
+ },
+ )
+
+ return &controller
+}
+
+// Run starts the controller. Threads is the number of workers that can process
+// work on the workqueue in parallel.
+func (d *forwardCRDControl) Run(threads int) {
+ defer utilruntime.HandleCrash()
+ defer d.workqueue.ShutDown()
+
+ go d.forwardController.Run(d.stopCh)
+
+ if !cache.WaitForCacheSync(d.stopCh, d.forwardController.HasSynced) {
+ utilruntime.HandleError(errors.New("Timed out waiting for caches to sync"))
+ return
+ }
+
+ for i := 0; i < threads; i++ {
+ go wait.Until(d.runWorker, time.Second, d.stopCh)
+ }
+
+ <-d.stopCh
+
+ // Shutdown all plugins
+ for _, plugin := range d.pluginMap.List() {
+ plugin.OnShutdown()
+ }
+}
+
+// HasSynced returns true once the controller has completed an initial resource
+// listing.
+func (d *forwardCRDControl) HasSynced() bool {
+ return d.forwardController.HasSynced()
+}
+
+// Stop stops the controller.
+func (d *forwardCRDControl) Stop() error {
+ d.stopLock.Lock()
+ defer d.stopLock.Unlock()
+
+ // Only try draining the workqueue if we haven't already.
+ if !d.shutdown {
+ close(d.stopCh)
+ d.shutdown = true
+
+ return nil
+ }
+
+ return fmt.Errorf("shutdown already in progress")
+}
+
+func (d *forwardCRDControl) runWorker() {
+ for d.processNextItem() {
+ }
+}
+
+func (d *forwardCRDControl) processNextItem() bool {
+ key, quit := d.workqueue.Get()
+ if quit {
+ return false
+ }
+
+ defer d.workqueue.Done(key)
+
+ err := d.sync(key.(string))
+ if err != nil {
+ log.Errorf("Error syncing Forward %v: %v", key, err)
+ d.workqueue.AddRateLimited(key)
+ return true
+ }
+
+ d.workqueue.Forget(key)
+
+ return true
+}
+
+func (d *forwardCRDControl) sync(key string) error {
+ obj, exists, err := d.forwardLister.GetByKey(key)
+ if err != nil {
+ return err
+ }
+
+ if !exists {
+ plugin := d.pluginMap.Delete(key)
+ if plugin != nil {
+ plugin.OnShutdown()
+ }
+ } else {
+ f, err := d.convertToForward(obj.(runtime.Object))
+ if err != nil {
+ return err
+ }
+ forwardConfig := forward.ForwardConfig{
+ From: f.Spec.From,
+ To: f.Spec.To,
+ TapPlugin: d.tapPlugin,
+ }
+ plugin, err := d.instancer(forwardConfig)
+ if err != nil {
+ return err
+ }
+ err = plugin.OnStartup()
+ if err != nil {
+ return err
+ }
+ oldPlugin, updated := d.pluginMap.Upsert(key, f.Spec.From, plugin)
+ if updated {
+ oldPlugin.OnShutdown()
+ }
+ }
+
+ return nil
+}
+
+func (d *forwardCRDControl) convertToForward(obj runtime.Object) (*corednsv1alpha1.Forward, error) {
+ unstructured, ok := obj.(*unstructured.Unstructured)
+ if !ok {
+ return nil, fmt.Errorf("object was not Unstructured")
+ }
+
+ switch unstructured.GetKind() {
+ case "Forward":
+ forward := &corednsv1alpha1.Forward{}
+ err := d.scheme.Convert(unstructured, forward, nil)
+ return forward, err
+ default:
+ return nil, fmt.Errorf("unsupported object type: %T", unstructured)
+ }
+}