aboutsummaryrefslogtreecommitdiff
path: root/plugin/pkg
diff options
context:
space:
mode:
Diffstat (limited to 'plugin/pkg')
-rw-r--r--plugin/pkg/watch/watch.go23
-rw-r--r--plugin/pkg/watch/watcher.go178
2 files changed, 201 insertions, 0 deletions
diff --git a/plugin/pkg/watch/watch.go b/plugin/pkg/watch/watch.go
new file mode 100644
index 000000000..7e77bb7b3
--- /dev/null
+++ b/plugin/pkg/watch/watch.go
@@ -0,0 +1,23 @@
+package watch
+
+// Chan is used to inform the server of a change. Whenever
+// a watched FQDN has a change in data, that FQDN should be
+// sent down this channel.
+type Chan chan string
+
+// Watchable is the interface watchable plugins should implement
+type Watchable interface {
+ // Name returns the plugin name.
+ Name() string
+
+ // SetWatchChan is called when the watch channel is created.
+ SetWatchChan(Chan)
+
+ // Watch is called whenever a watch is created for a FQDN. Plugins
+ // should send the FQDN down the watch channel when its data may have
+ // changed. This is an exact match only.
+ Watch(qname string) error
+
+ // StopWatching is called whenever all watches are canceled for a FQDN.
+ StopWatching(qname string)
+}
diff --git a/plugin/pkg/watch/watcher.go b/plugin/pkg/watch/watcher.go
new file mode 100644
index 000000000..59474a7bc
--- /dev/null
+++ b/plugin/pkg/watch/watcher.go
@@ -0,0 +1,178 @@
+package watch
+
+import (
+ "fmt"
+ "io"
+ "sync"
+
+ "github.com/miekg/dns"
+
+ "github.com/coredns/coredns/pb"
+ "github.com/coredns/coredns/plugin"
+ "github.com/coredns/coredns/plugin/pkg/log"
+ "github.com/coredns/coredns/request"
+)
+
+// Watcher handles watch creation, cancellation, and processing.
+type Watcher interface {
+ // Watch monitors a client stream and creates and cancels watches.
+ Watch(pb.DnsService_WatchServer) error
+
+ // Stop cancels open watches and stops the watch processing go routine.
+ Stop()
+}
+
+// Manager contains all the data needed to manage watches
+type Manager struct {
+ changes Chan
+ stopper chan bool
+ counter int64
+ watches map[string]watchlist
+ plugins []Watchable
+ mutex sync.Mutex
+}
+
+type watchlist map[int64]pb.DnsService_WatchServer
+
+// NewWatcher creates a Watcher, which is used to manage watched names.
+func NewWatcher(plugins []Watchable) *Manager {
+ w := &Manager{changes: make(Chan), stopper: make(chan bool), watches: make(map[string]watchlist), plugins: plugins}
+
+ for _, p := range plugins {
+ p.SetWatchChan(w.changes)
+ }
+
+ go w.process()
+ return w
+}
+
+func (w *Manager) nextID() int64 {
+ w.mutex.Lock()
+
+ w.counter++
+ id := w.counter
+
+ w.mutex.Unlock()
+ return id
+}
+
+// Watch monitors a client stream and creates and cancels watches.
+func (w *Manager) Watch(stream pb.DnsService_WatchServer) error {
+ for {
+ in, err := stream.Recv()
+ if err == io.EOF {
+ return nil
+ }
+ if err != nil {
+ return err
+ }
+ create := in.GetCreateRequest()
+ if create != nil {
+ msg := new(dns.Msg)
+ err := msg.Unpack(create.Query.Msg)
+ if err != nil {
+ log.Warningf("Could not decode watch request: %s\n", err)
+ stream.Send(&pb.WatchResponse{Err: "could not decode request"})
+ continue
+ }
+ id := w.nextID()
+ if err := stream.Send(&pb.WatchResponse{WatchId: id, Created: true}); err != nil {
+ // if we fail to notify client of watch creation, don't create the watch
+ continue
+ }
+
+ // Normalize qname
+ qname := (&request.Request{Req: msg}).Name()
+
+ w.mutex.Lock()
+ if _, ok := w.watches[qname]; !ok {
+ w.watches[qname] = make(watchlist)
+ }
+ w.watches[qname][id] = stream
+ w.mutex.Unlock()
+
+ for _, p := range w.plugins {
+ err := p.Watch(qname)
+ if err != nil {
+ log.Warningf("Failed to start watch for %s in plugin %s: %s\n", qname, p.Name(), err)
+ stream.Send(&pb.WatchResponse{Err: fmt.Sprintf("failed to start watch for %s in plugin %s", qname, p.Name())})
+ }
+ }
+ continue
+ }
+
+ cancel := in.GetCancelRequest()
+ if cancel != nil {
+ w.mutex.Lock()
+ for qname, wl := range w.watches {
+ ws, ok := wl[cancel.WatchId]
+ if !ok {
+ continue
+ }
+
+ // only allow cancels from the client that started it
+ // TODO: test what happens if a stream tries to cancel a watchID that it doesn't own
+ if ws != stream {
+ continue
+ }
+
+ delete(wl, cancel.WatchId)
+
+ // if there are no more watches for this qname, we should tell the plugins
+ if len(wl) == 0 {
+ for _, p := range w.plugins {
+ p.StopWatching(qname)
+ }
+ delete(w.watches, qname)
+ }
+
+ // let the client know we canceled the watch
+ stream.Send(&pb.WatchResponse{WatchId: cancel.WatchId, Canceled: true})
+ }
+ w.mutex.Unlock()
+ continue
+ }
+ }
+}
+
+func (w *Manager) process() {
+ for {
+ select {
+ case <-w.stopper:
+ return
+ case changed := <-w.changes:
+ w.mutex.Lock()
+ for qname, wl := range w.watches {
+ if plugin.Zones([]string{changed}).Matches(qname) == "" {
+ continue
+ }
+ for id, stream := range wl {
+ wr := pb.WatchResponse{WatchId: id, Qname: qname}
+ err := stream.Send(&wr)
+ if err != nil {
+ log.Warningf("Error sending change for %s to watch %d: %s. Removing watch.\n", qname, id, err)
+ delete(w.watches[qname], id)
+ }
+ }
+ }
+ w.mutex.Unlock()
+ }
+ }
+}
+
+// Stop cancels open watches and stops the watch processing go routine.
+func (w *Manager) Stop() {
+ w.stopper <- true
+ w.mutex.Lock()
+ for wn, wl := range w.watches {
+ for id, stream := range wl {
+ wr := pb.WatchResponse{WatchId: id, Canceled: true}
+ err := stream.Send(&wr)
+ if err != nil {
+ log.Warningf("Error notifiying client of cancellation: %s\n", err)
+ }
+ }
+ delete(w.watches, wn)
+ }
+ w.mutex.Unlock()
+}