aboutsummaryrefslogtreecommitdiff
path: root/plugin/pkg/watch/watcher.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugin/pkg/watch/watcher.go')
-rw-r--r--plugin/pkg/watch/watcher.go178
1 files changed, 0 insertions, 178 deletions
diff --git a/plugin/pkg/watch/watcher.go b/plugin/pkg/watch/watcher.go
deleted file mode 100644
index 86a952db2..000000000
--- a/plugin/pkg/watch/watcher.go
+++ /dev/null
@@ -1,178 +0,0 @@
-package watch
-
-import (
- "fmt"
- "io"
- "sync"
-
- "github.com/miekg/dns"
-
- "github.com/coredns/coredns/pb"
- "github.com/coredns/coredns/plugin"
- "github.com/coredns/coredns/plugin/pkg/log"
- "github.com/coredns/coredns/request"
-)
-
-// Watcher handles watch creation, cancellation, and processing.
-type Watcher interface {
- // Watch monitors a client stream and creates and cancels watches.
- Watch(pb.DnsService_WatchServer) error
-
- // Stop cancels open watches and stops the watch processing go routine.
- Stop()
-}
-
-// Manager contains all the data needed to manage watches
-type Manager struct {
- changes Chan
- stopper chan bool
- counter int64
- watches map[string]watchlist
- plugins []Watchable
- mutex sync.Mutex
-}
-
-type watchlist map[int64]pb.DnsService_WatchServer
-
-// NewWatcher creates a Watcher, which is used to manage watched names.
-func NewWatcher(plugins []Watchable) *Manager {
- w := &Manager{changes: make(Chan), stopper: make(chan bool), watches: make(map[string]watchlist), plugins: plugins}
-
- for _, p := range plugins {
- p.SetWatchChan(w.changes)
- }
-
- go w.process()
- return w
-}
-
-func (w *Manager) nextID() int64 {
- w.mutex.Lock()
-
- w.counter++
- id := w.counter
-
- w.mutex.Unlock()
- return id
-}
-
-// Watch monitors a client stream and creates and cancels watches.
-func (w *Manager) Watch(stream pb.DnsService_WatchServer) error {
- for {
- in, err := stream.Recv()
- if err == io.EOF {
- return nil
- }
- if err != nil {
- return err
- }
- create := in.GetCreateRequest()
- if create != nil {
- msg := new(dns.Msg)
- err := msg.Unpack(create.Query.Msg)
- if err != nil {
- log.Warningf("Could not decode watch request: %s\n", err)
- stream.Send(&pb.WatchResponse{Err: "could not decode request"})
- continue
- }
- id := w.nextID()
- if err := stream.Send(&pb.WatchResponse{WatchId: id, Created: true}); err != nil {
- // if we fail to notify client of watch creation, don't create the watch
- continue
- }
-
- // Normalize qname
- qname := (&request.Request{Req: msg}).Name()
-
- w.mutex.Lock()
- if _, ok := w.watches[qname]; !ok {
- w.watches[qname] = make(watchlist)
- }
- w.watches[qname][id] = stream
- w.mutex.Unlock()
-
- for _, p := range w.plugins {
- err := p.Watch(qname)
- if err != nil {
- log.Warningf("Failed to start watch for %s in plugin %s: %s\n", qname, p.Name(), err)
- stream.Send(&pb.WatchResponse{Err: fmt.Sprintf("failed to start watch for %s in plugin %s", qname, p.Name())})
- }
- }
- continue
- }
-
- cancel := in.GetCancelRequest()
- if cancel != nil {
- w.mutex.Lock()
- for qname, wl := range w.watches {
- ws, ok := wl[cancel.WatchId]
- if !ok {
- continue
- }
-
- // only allow cancels from the client that started it
- // TODO: test what happens if a stream tries to cancel a watchID that it doesn't own
- if ws != stream {
- continue
- }
-
- delete(wl, cancel.WatchId)
-
- // if there are no more watches for this qname, we should tell the plugins
- if len(wl) == 0 {
- for _, p := range w.plugins {
- p.StopWatching(qname)
- }
- delete(w.watches, qname)
- }
-
- // let the client know we canceled the watch
- stream.Send(&pb.WatchResponse{WatchId: cancel.WatchId, Canceled: true})
- }
- w.mutex.Unlock()
- continue
- }
- }
-}
-
-func (w *Manager) process() {
- for {
- select {
- case <-w.stopper:
- return
- case changed := <-w.changes:
- w.mutex.Lock()
- for qname, wl := range w.watches {
- if plugin.Zones([]string{changed}).Matches(qname) == "" {
- continue
- }
- for id, stream := range wl {
- wr := pb.WatchResponse{WatchId: id, Qname: qname}
- err := stream.Send(&wr)
- if err != nil {
- log.Warningf("Error sending change for %s to watch %d: %s. Removing watch.\n", qname, id, err)
- delete(w.watches[qname], id)
- }
- }
- }
- w.mutex.Unlock()
- }
- }
-}
-
-// Stop cancels open watches and stops the watch processing go routine.
-func (w *Manager) Stop() {
- w.stopper <- true
- w.mutex.Lock()
- for wn, wl := range w.watches {
- for id, stream := range wl {
- wr := pb.WatchResponse{WatchId: id, Canceled: true}
- err := stream.Send(&wr)
- if err != nil {
- log.Warningf("Error notifying client of cancellation: %s\n", err)
- }
- }
- delete(w.watches, wn)
- }
- w.mutex.Unlock()
-}