diff options
author | 2017-11-28 00:36:14 +0300 | |
---|---|---|
committer | 2017-11-27 21:36:14 +0000 | |
commit | 6d6e1357b964ec36cfcc0d08aea4253471bfa7fa (patch) | |
tree | e256b6df981f66a98c5c8438ad541e4424257d47 /plugin | |
parent | 06006fac569364855fe808eece2f8b5b194c9f0a (diff) | |
download | coredns-6d6e1357b964ec36cfcc0d08aea4253471bfa7fa.tar.gz coredns-6d6e1357b964ec36cfcc0d08aea4253471bfa7fa.tar.zst coredns-6d6e1357b964ec36cfcc0d08aea4253471bfa7fa.zip |
Dnstap plugin refactoring (#1257)
Diffstat (limited to 'plugin')
-rw-r--r-- | plugin/dnstap/dnstapio/io.go | 107 | ||||
-rw-r--r-- | plugin/dnstap/dnstapio/io_test.go | 185 | ||||
-rw-r--r-- | plugin/dnstap/out/socket.go | 82 | ||||
-rw-r--r-- | plugin/dnstap/out/socket_test.go | 94 | ||||
-rw-r--r-- | plugin/dnstap/out/tcp.go | 59 | ||||
-rw-r--r-- | plugin/dnstap/out/tcp_test.go | 64 | ||||
-rw-r--r-- | plugin/dnstap/setup.go | 32 |
7 files changed, 215 insertions, 408 deletions
diff --git a/plugin/dnstap/dnstapio/io.go b/plugin/dnstap/dnstapio/io.go index 0c409d6fb..3bc44f85e 100644 --- a/plugin/dnstap/dnstapio/io.go +++ b/plugin/dnstap/dnstapio/io.go @@ -2,67 +2,100 @@ package dnstapio import ( "log" + "net" + "time" tap "github.com/dnstap/golang-dnstap" + fs "github.com/farsightsec/golang-framestream" "github.com/golang/protobuf/proto" ) -// DnstapIO wraps the dnstap I/O routine. -type DnstapIO struct { - protocol Protocol - queue chan tap.Dnstap - stop chan bool +const ( + tcpTimeout = 4 * time.Second + flushTimeout = 1 * time.Second + queueSize = 1000 +) + +type dnstapIO struct { + enc *fs.Encoder + conn net.Conn + queue chan tap.Dnstap } -// Protocol is either `out.TCP` or `out.Socket`. -type Protocol interface { - // Write takes a single frame at once. - Write([]byte) (int, error) +// New returns a new and initialized DnstapIO. +func New() DnstapIO { + return &dnstapIO{queue: make(chan tap.Dnstap, queueSize)} +} - Close() error +// DnstapIO interface +type DnstapIO interface { + Connect(endpoint string, socket bool) error + Dnstap(payload tap.Dnstap) + Close() } -// New dnstap I/O routine from Protocol. -func New(w Protocol) *DnstapIO { - dio := DnstapIO{} - dio.protocol = w - dio.queue = make(chan tap.Dnstap, 10) - dio.stop = make(chan bool) +// Connect connects to the dnstop endpoint. +func (dio *dnstapIO) Connect(endpoint string, socket bool) error { + var err error + if socket { + dio.conn, err = net.Dial("unix", endpoint) + } else { + dio.conn, err = net.DialTimeout("tcp", endpoint, tcpTimeout) + } + if err != nil { + return err + } + dio.enc, err = fs.NewEncoder(dio.conn, &fs.EncoderOptions{ + ContentType: []byte("protobuf:dnstap.Dnstap"), + Bidirectional: true, + }) + if err != nil { + return err + } go dio.serve() - return &dio + return nil } // Dnstap enqueues the payload for log. -func (dio *DnstapIO) Dnstap(payload tap.Dnstap) { +func (dio *dnstapIO) Dnstap(payload tap.Dnstap) { select { case dio.queue <- payload: default: - log.Println("[WARN] Dnstap payload dropped.") + log.Printf("[ERROR] Dnstap payload dropped") } } -func (dio *DnstapIO) serve() { +// Close waits until the I/O routine is finished to return. +func (dio *dnstapIO) Close() { + close(dio.queue) +} + +func (dio *dnstapIO) serve() { + timeout := time.After(flushTimeout) for { select { - case payload := <-dio.queue: + case payload, ok := <-dio.queue: + if !ok { + dio.enc.Close() + dio.conn.Close() + return + } frame, err := proto.Marshal(&payload) - if err == nil { - dio.protocol.Write(frame) - } else { - log.Printf("[ERROR] Invalid dnstap payload dropped: %s\n", err) + if err != nil { + log.Printf("[ERROR] Invalid dnstap payload dropped: %s", err) + continue + } + _, err = dio.enc.Write(frame) + if err != nil { + log.Printf("[ERROR] Cannot write dnstap payload: %s", err) + continue + } + case <-timeout: + err := dio.enc.Flush() + if err != nil { + log.Printf("[ERROR] Cannot flush dnstap payloads: %s", err) } - case <-dio.stop: - close(dio.queue) - dio.stop <- true - return + timeout = time.After(flushTimeout) } } } - -// Close waits until the I/O routine is finished to return. -func (dio DnstapIO) Close() error { - dio.stop <- true - <-dio.stop - close(dio.stop) - return dio.protocol.Close() -} diff --git a/plugin/dnstap/dnstapio/io_test.go b/plugin/dnstap/dnstapio/io_test.go index 17e7758c3..bfeeb4289 100644 --- a/plugin/dnstap/dnstapio/io_test.go +++ b/plugin/dnstap/dnstapio/io_test.go @@ -1,78 +1,155 @@ package dnstapio import ( - "bytes" - "io/ioutil" - "log" + "net" "sync" "testing" "time" tap "github.com/dnstap/golang-dnstap" + fs "github.com/farsightsec/golang-framestream" ) -func init() { - log.SetOutput(ioutil.Discard) -} +func accept(t *testing.T, l net.Listener, count int) { + server, err := l.Accept() + if err != nil { + t.Fatalf("server accept: %s", err) + return + } -type buf struct { - *bytes.Buffer - cost time.Duration -} + dec, err := fs.NewDecoder(server, &fs.DecoderOptions{ + ContentType: []byte("protobuf:dnstap.Dnstap"), + Bidirectional: true, + }) + if err != nil { + t.Fatalf("server decoder: %s", err) + return + } -func (b buf) Write(frame []byte) (int, error) { - time.Sleep(b.cost) - return b.Buffer.Write(frame) -} + for i := 0; i < count; i++ { + if _, err := dec.Decode(); err != nil { + t.Errorf("server decode: %s", err) + } + } -func (b buf) Close() error { - return nil + if err := server.Close(); err != nil { + t.Error(err) + } } -func TestRace(t *testing.T) { - b := buf{&bytes.Buffer{}, 100 * time.Millisecond} - dio := New(b) - wg := &sync.WaitGroup{} - wg.Add(10) - for i := 0; i < 10; i++ { - timeout := time.After(time.Second) - go func() { - for { - select { - case <-timeout: - wg.Done() - return - default: - time.Sleep(50 * time.Millisecond) - t := tap.Dnstap_MESSAGE - dio.Dnstap(tap.Dnstap{Type: &t}) - } - } - }() +const endpointTCP = "localhost:0" + +func TestTCP(t *testing.T) { + dio := New() + + err := dio.Connect(endpointTCP, false) + if err == nil { + t.Fatal("Not listening but no error") + } + + // Start TCP listener + l, err := net.Listen("tcp", endpointTCP) + if err != nil { + t.Fatalf("Cannot start listener: %s", err) } + defer l.Close() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + accept(t, l, 1) + wg.Done() + }() + + err = dio.Connect(l.Addr().String(), false) + if err != nil { + t.Fatalf("Cannot connect to listener: %s", err) + } + + msg := tap.Dnstap_MESSAGE + dio.Dnstap(tap.Dnstap{Type: &msg}) + wg.Wait() + + dio.Close() } -func TestClose(t *testing.T) { - done := make(chan bool) - var dio *DnstapIO +const endpointSocket = "dnstap.sock" + +func TestSocket(t *testing.T) { + dio := New() + + err := dio.Connect(endpointSocket, true) + if err == nil { + t.Fatal("Not listening but no error") + } + + // Start Socket listener + l, err := net.Listen("unix", endpointSocket) + if err != nil { + t.Fatalf("Cannot start listener: %s", err) + } + defer l.Close() + + var wg sync.WaitGroup + wg.Add(1) go func() { - b := buf{&bytes.Buffer{}, 0} - dio = New(b) - dio.Close() - close(done) + accept(t, l, 1) + wg.Done() }() - select { - case <-done: - case <-time.After(time.Second): - t.Fatal("Not closing.") + + err = dio.Connect(endpointSocket, true) + if err != nil { + t.Fatalf("Cannot connect to listener: %s", err) + } + + msg := tap.Dnstap_MESSAGE + dio.Dnstap(tap.Dnstap{Type: &msg}) + + wg.Wait() + + dio.Close() +} + +func TestRace(t *testing.T) { + count := 10 + dio := New() + + err := dio.Connect(endpointTCP, false) + if err == nil { + t.Fatal("Not listening but no error") } - func() { - defer func() { - if err := recover(); err == nil { - t.Fatal("Send on closed channel.") - } - }() - dio.Dnstap(tap.Dnstap{}) + + // Start TCP listener + l, err := net.Listen("tcp", endpointTCP) + if err != nil { + t.Fatalf("Cannot start listener: %s", err) + } + defer l.Close() + + var wg sync.WaitGroup + wg.Add(1) + go func() { + accept(t, l, count) + wg.Done() }() + + err = dio.Connect(l.Addr().String(), false) + if err != nil { + t.Fatalf("Cannot connect to listener: %s", err) + } + + msg := tap.Dnstap_MESSAGE + wg.Add(count) + for i := 0; i < count; i++ { + go func(i byte) { + time.Sleep(50 * time.Millisecond) + dio.Dnstap(tap.Dnstap{Type: &msg, Extra: []byte{i}}) + wg.Done() + }(byte(i)) + } + + wg.Wait() + + dio.Close() } diff --git a/plugin/dnstap/out/socket.go b/plugin/dnstap/out/socket.go deleted file mode 100644 index 58285f25c..000000000 --- a/plugin/dnstap/out/socket.go +++ /dev/null @@ -1,82 +0,0 @@ -package out - -import ( - "fmt" - "net" - - fs "github.com/farsightsec/golang-framestream" -) - -// Socket is a Frame Streams encoder over a UNIX socket. -type Socket struct { - path string - enc *fs.Encoder - conn net.Conn - err error -} - -func openSocket(s *Socket) error { - conn, err := net.Dial("unix", s.path) - if err != nil { - return err - } - s.conn = conn - - enc, err := fs.NewEncoder(conn, &fs.EncoderOptions{ - ContentType: []byte("protobuf:dnstap.Dnstap"), - Bidirectional: true, - }) - if err != nil { - return err - } - s.enc = enc - - s.err = nil - return nil -} - -// NewSocket will always return a new Socket. -// err if nothing is listening to it, it will attempt to reconnect on the next Write. -func NewSocket(path string) (s *Socket, err error) { - s = &Socket{path: path} - if err = openSocket(s); err != nil { - err = fmt.Errorf("open socket: %s", err) - s.err = err - return - } - return -} - -// Write a single Frame Streams frame. -func (s *Socket) Write(frame []byte) (int, error) { - if s.err != nil { - // is the dnstap tool listening? - if err := openSocket(s); err != nil { - return 0, fmt.Errorf("open socket: %s", err) - } - } - n, err := s.enc.Write(frame) - if err != nil { - // the dnstap command line tool is down - s.conn.Close() - s.err = err - return 0, err - } - return n, nil - -} - -// Close the socket and flush the remaining frames. -func (s *Socket) Close() error { - if s.err != nil { - // nothing to close - return nil - } - - defer s.conn.Close() - - if err := s.enc.Flush(); err != nil { - return fmt.Errorf("flush: %s", err) - } - return s.enc.Close() -} diff --git a/plugin/dnstap/out/socket_test.go b/plugin/dnstap/out/socket_test.go deleted file mode 100644 index 050a38d36..000000000 --- a/plugin/dnstap/out/socket_test.go +++ /dev/null @@ -1,94 +0,0 @@ -package out - -import ( - "net" - "testing" - - fs "github.com/farsightsec/golang-framestream" -) - -func acceptOne(t *testing.T, l net.Listener) { - server, err := l.Accept() - if err != nil { - t.Fatalf("server accept: %s", err) - return - } - - dec, err := fs.NewDecoder(server, &fs.DecoderOptions{ - ContentType: []byte("protobuf:dnstap.Dnstap"), - Bidirectional: true, - }) - if err != nil { - t.Fatalf("server decoder: %s", err) - return - } - - if _, err := dec.Decode(); err != nil { - t.Errorf("server decode: %s", err) - } - - if err := server.Close(); err != nil { - t.Error(err) - } -} -func sendOne(socket *Socket) error { - if _, err := socket.Write([]byte("frame")); err != nil { - return err - } - if err := socket.enc.Flush(); err != nil { - // Would happen during Write in real life. - socket.conn.Close() - socket.err = err - return err - } - return nil -} -func TestSocket(t *testing.T) { - socket, err := NewSocket("dnstap.sock") - if err == nil { - t.Fatal("new socket: not listening but no error") - return - } - - if err := sendOne(socket); err == nil { - t.Fatal("not listening but no error") - return - } - - l, err := net.Listen("unix", "dnstap.sock") - if err != nil { - t.Fatal(err) - return - } - - wait := make(chan bool) - go func() { - acceptOne(t, l) - wait <- true - }() - - if err := sendOne(socket); err != nil { - t.Fatalf("send one: %s", err) - return - } - - <-wait - if err := sendOne(socket); err == nil { - panic("must fail") - } - - go func() { - acceptOne(t, l) - wait <- true - }() - - if err := sendOne(socket); err != nil { - t.Fatalf("send one: %s", err) - return - } - - <-wait - if err := l.Close(); err != nil { - t.Error(err) - } -} diff --git a/plugin/dnstap/out/tcp.go b/plugin/dnstap/out/tcp.go deleted file mode 100644 index 715c3024a..000000000 --- a/plugin/dnstap/out/tcp.go +++ /dev/null @@ -1,59 +0,0 @@ -package out - -import ( - "net" - "time" - - fs "github.com/farsightsec/golang-framestream" -) - -// TCP is a Frame Streams encoder over TCP. -type TCP struct { - address string - frames [][]byte -} - -// NewTCP returns a TCP writer. -func NewTCP(address string) *TCP { - s := &TCP{address: address} - s.frames = make([][]byte, 0, 13) // 13 messages buffer - return s -} - -// Write a single Frame Streams frame. -func (s *TCP) Write(frame []byte) (n int, err error) { - s.frames = append(s.frames, frame) - if len(s.frames) == cap(s.frames) { - return len(frame), s.Flush() - } - return len(frame), nil -} - -// Flush the remaining frames. -func (s *TCP) Flush() error { - defer func() { - s.frames = s.frames[:0] - }() - c, err := net.DialTimeout("tcp", s.address, time.Second) - if err != nil { - return err - } - enc, err := fs.NewEncoder(c, &fs.EncoderOptions{ - ContentType: []byte("protobuf:dnstap.Dnstap"), - Bidirectional: true, - }) - if err != nil { - return err - } - for _, frame := range s.frames { - if _, err = enc.Write(frame); err != nil { - return err - } - } - return enc.Flush() -} - -// Close is an alias to Flush to satisfy io.WriteCloser similarly to type Socket. -func (s *TCP) Close() error { - return s.Flush() -} diff --git a/plugin/dnstap/out/tcp_test.go b/plugin/dnstap/out/tcp_test.go deleted file mode 100644 index 45ac98bf7..000000000 --- a/plugin/dnstap/out/tcp_test.go +++ /dev/null @@ -1,64 +0,0 @@ -package out - -import ( - "net" - "testing" -) - -func sendOneTCP(tcp *TCP) error { - if _, err := tcp.Write([]byte("frame")); err != nil { - return err - } - return tcp.Flush() -} - -func TestTCP(t *testing.T) { - tcp := NewTCP("localhost:14000") - - if err := sendOneTCP(tcp); err == nil { - t.Fatal("Not listening but no error.") - return - } - - l, err := net.Listen("tcp", "localhost:14000") - if err != nil { - t.Fatal(err) - return - } - - wait := make(chan bool) - go func() { - acceptOne(t, l) - wait <- true - }() - - if err := sendOneTCP(tcp); err != nil { - t.Fatalf("send one: %s", err) - return - } - - <-wait - - // TODO: When the server isn't responding according to the framestream protocol - // the thread is blocked. - /* - if err := sendOneTCP(tcp); err == nil { - panic("must fail") - } - */ - - go func() { - acceptOne(t, l) - wait <- true - }() - - if err := sendOneTCP(tcp); err != nil { - t.Fatalf("send one: %s", err) - return - } - - <-wait - if err := l.Close(); err != nil { - t.Error(err) - } -} diff --git a/plugin/dnstap/setup.go b/plugin/dnstap/setup.go index c1a8956a1..342f14e88 100644 --- a/plugin/dnstap/setup.go +++ b/plugin/dnstap/setup.go @@ -1,15 +1,11 @@ package dnstap import ( - "fmt" - "io" - "log" "strings" "github.com/coredns/coredns/core/dnsserver" "github.com/coredns/coredns/plugin" "github.com/coredns/coredns/plugin/dnstap/dnstapio" - "github.com/coredns/coredns/plugin/dnstap/out" "github.com/coredns/coredns/plugin/pkg/dnsutil" "github.com/mholt/caddy" @@ -69,24 +65,24 @@ func setup(c *caddy.Controller) error { return err } - dnstap := Dnstap{Pack: conf.full} + dio := dnstapio.New() + dnstap := Dnstap{IO: dio, Pack: conf.full} - var o io.WriteCloser - if conf.socket { - o, err = out.NewSocket(conf.target) + c.OnStartup(func() error { + err := dio.Connect(conf.target, conf.socket) if err != nil { - log.Printf("[WARN] Can't connect to %s at the moment: %s", conf.target, err) + return plugin.Error("dnstap", err) } - } else { - o = out.NewTCP(conf.target) - } - dio := dnstapio.New(o) - dnstap.IO = dio + return nil + }) - c.OnShutdown(func() error { - if err := dio.Close(); err != nil { - return fmt.Errorf("dnstap io routine: %s", err) - } + c.OnRestart(func() error { + dio.Close() + return nil + }) + + c.OnFinalShutdown(func() error { + dio.Close() return nil }) |