aboutsummaryrefslogtreecommitdiff
path: root/plugin
diff options
context:
space:
mode:
authorGravatar Uladzimir Trehubenka <utrehubenka@infoblox.com> 2017-11-28 00:36:14 +0300
committerGravatar Miek Gieben <miek@miek.nl> 2017-11-27 21:36:14 +0000
commit6d6e1357b964ec36cfcc0d08aea4253471bfa7fa (patch)
treee256b6df981f66a98c5c8438ad541e4424257d47 /plugin
parent06006fac569364855fe808eece2f8b5b194c9f0a (diff)
downloadcoredns-6d6e1357b964ec36cfcc0d08aea4253471bfa7fa.tar.gz
coredns-6d6e1357b964ec36cfcc0d08aea4253471bfa7fa.tar.zst
coredns-6d6e1357b964ec36cfcc0d08aea4253471bfa7fa.zip
Dnstap plugin refactoring (#1257)
Diffstat (limited to 'plugin')
-rw-r--r--plugin/dnstap/dnstapio/io.go107
-rw-r--r--plugin/dnstap/dnstapio/io_test.go185
-rw-r--r--plugin/dnstap/out/socket.go82
-rw-r--r--plugin/dnstap/out/socket_test.go94
-rw-r--r--plugin/dnstap/out/tcp.go59
-rw-r--r--plugin/dnstap/out/tcp_test.go64
-rw-r--r--plugin/dnstap/setup.go32
7 files changed, 215 insertions, 408 deletions
diff --git a/plugin/dnstap/dnstapio/io.go b/plugin/dnstap/dnstapio/io.go
index 0c409d6fb..3bc44f85e 100644
--- a/plugin/dnstap/dnstapio/io.go
+++ b/plugin/dnstap/dnstapio/io.go
@@ -2,67 +2,100 @@ package dnstapio
import (
"log"
+ "net"
+ "time"
tap "github.com/dnstap/golang-dnstap"
+ fs "github.com/farsightsec/golang-framestream"
"github.com/golang/protobuf/proto"
)
-// DnstapIO wraps the dnstap I/O routine.
-type DnstapIO struct {
- protocol Protocol
- queue chan tap.Dnstap
- stop chan bool
+const (
+ tcpTimeout = 4 * time.Second
+ flushTimeout = 1 * time.Second
+ queueSize = 1000
+)
+
+type dnstapIO struct {
+ enc *fs.Encoder
+ conn net.Conn
+ queue chan tap.Dnstap
}
-// Protocol is either `out.TCP` or `out.Socket`.
-type Protocol interface {
- // Write takes a single frame at once.
- Write([]byte) (int, error)
+// New returns a new and initialized DnstapIO.
+func New() DnstapIO {
+ return &dnstapIO{queue: make(chan tap.Dnstap, queueSize)}
+}
- Close() error
+// DnstapIO interface
+type DnstapIO interface {
+ Connect(endpoint string, socket bool) error
+ Dnstap(payload tap.Dnstap)
+ Close()
}
-// New dnstap I/O routine from Protocol.
-func New(w Protocol) *DnstapIO {
- dio := DnstapIO{}
- dio.protocol = w
- dio.queue = make(chan tap.Dnstap, 10)
- dio.stop = make(chan bool)
+// Connect connects to the dnstop endpoint.
+func (dio *dnstapIO) Connect(endpoint string, socket bool) error {
+ var err error
+ if socket {
+ dio.conn, err = net.Dial("unix", endpoint)
+ } else {
+ dio.conn, err = net.DialTimeout("tcp", endpoint, tcpTimeout)
+ }
+ if err != nil {
+ return err
+ }
+ dio.enc, err = fs.NewEncoder(dio.conn, &fs.EncoderOptions{
+ ContentType: []byte("protobuf:dnstap.Dnstap"),
+ Bidirectional: true,
+ })
+ if err != nil {
+ return err
+ }
go dio.serve()
- return &dio
+ return nil
}
// Dnstap enqueues the payload for log.
-func (dio *DnstapIO) Dnstap(payload tap.Dnstap) {
+func (dio *dnstapIO) Dnstap(payload tap.Dnstap) {
select {
case dio.queue <- payload:
default:
- log.Println("[WARN] Dnstap payload dropped.")
+ log.Printf("[ERROR] Dnstap payload dropped")
}
}
-func (dio *DnstapIO) serve() {
+// Close waits until the I/O routine is finished to return.
+func (dio *dnstapIO) Close() {
+ close(dio.queue)
+}
+
+func (dio *dnstapIO) serve() {
+ timeout := time.After(flushTimeout)
for {
select {
- case payload := <-dio.queue:
+ case payload, ok := <-dio.queue:
+ if !ok {
+ dio.enc.Close()
+ dio.conn.Close()
+ return
+ }
frame, err := proto.Marshal(&payload)
- if err == nil {
- dio.protocol.Write(frame)
- } else {
- log.Printf("[ERROR] Invalid dnstap payload dropped: %s\n", err)
+ if err != nil {
+ log.Printf("[ERROR] Invalid dnstap payload dropped: %s", err)
+ continue
+ }
+ _, err = dio.enc.Write(frame)
+ if err != nil {
+ log.Printf("[ERROR] Cannot write dnstap payload: %s", err)
+ continue
+ }
+ case <-timeout:
+ err := dio.enc.Flush()
+ if err != nil {
+ log.Printf("[ERROR] Cannot flush dnstap payloads: %s", err)
}
- case <-dio.stop:
- close(dio.queue)
- dio.stop <- true
- return
+ timeout = time.After(flushTimeout)
}
}
}
-
-// Close waits until the I/O routine is finished to return.
-func (dio DnstapIO) Close() error {
- dio.stop <- true
- <-dio.stop
- close(dio.stop)
- return dio.protocol.Close()
-}
diff --git a/plugin/dnstap/dnstapio/io_test.go b/plugin/dnstap/dnstapio/io_test.go
index 17e7758c3..bfeeb4289 100644
--- a/plugin/dnstap/dnstapio/io_test.go
+++ b/plugin/dnstap/dnstapio/io_test.go
@@ -1,78 +1,155 @@
package dnstapio
import (
- "bytes"
- "io/ioutil"
- "log"
+ "net"
"sync"
"testing"
"time"
tap "github.com/dnstap/golang-dnstap"
+ fs "github.com/farsightsec/golang-framestream"
)
-func init() {
- log.SetOutput(ioutil.Discard)
-}
+func accept(t *testing.T, l net.Listener, count int) {
+ server, err := l.Accept()
+ if err != nil {
+ t.Fatalf("server accept: %s", err)
+ return
+ }
-type buf struct {
- *bytes.Buffer
- cost time.Duration
-}
+ dec, err := fs.NewDecoder(server, &fs.DecoderOptions{
+ ContentType: []byte("protobuf:dnstap.Dnstap"),
+ Bidirectional: true,
+ })
+ if err != nil {
+ t.Fatalf("server decoder: %s", err)
+ return
+ }
-func (b buf) Write(frame []byte) (int, error) {
- time.Sleep(b.cost)
- return b.Buffer.Write(frame)
-}
+ for i := 0; i < count; i++ {
+ if _, err := dec.Decode(); err != nil {
+ t.Errorf("server decode: %s", err)
+ }
+ }
-func (b buf) Close() error {
- return nil
+ if err := server.Close(); err != nil {
+ t.Error(err)
+ }
}
-func TestRace(t *testing.T) {
- b := buf{&bytes.Buffer{}, 100 * time.Millisecond}
- dio := New(b)
- wg := &sync.WaitGroup{}
- wg.Add(10)
- for i := 0; i < 10; i++ {
- timeout := time.After(time.Second)
- go func() {
- for {
- select {
- case <-timeout:
- wg.Done()
- return
- default:
- time.Sleep(50 * time.Millisecond)
- t := tap.Dnstap_MESSAGE
- dio.Dnstap(tap.Dnstap{Type: &t})
- }
- }
- }()
+const endpointTCP = "localhost:0"
+
+func TestTCP(t *testing.T) {
+ dio := New()
+
+ err := dio.Connect(endpointTCP, false)
+ if err == nil {
+ t.Fatal("Not listening but no error")
+ }
+
+ // Start TCP listener
+ l, err := net.Listen("tcp", endpointTCP)
+ if err != nil {
+ t.Fatalf("Cannot start listener: %s", err)
}
+ defer l.Close()
+
+ var wg sync.WaitGroup
+ wg.Add(1)
+ go func() {
+ accept(t, l, 1)
+ wg.Done()
+ }()
+
+ err = dio.Connect(l.Addr().String(), false)
+ if err != nil {
+ t.Fatalf("Cannot connect to listener: %s", err)
+ }
+
+ msg := tap.Dnstap_MESSAGE
+ dio.Dnstap(tap.Dnstap{Type: &msg})
+
wg.Wait()
+
+ dio.Close()
}
-func TestClose(t *testing.T) {
- done := make(chan bool)
- var dio *DnstapIO
+const endpointSocket = "dnstap.sock"
+
+func TestSocket(t *testing.T) {
+ dio := New()
+
+ err := dio.Connect(endpointSocket, true)
+ if err == nil {
+ t.Fatal("Not listening but no error")
+ }
+
+ // Start Socket listener
+ l, err := net.Listen("unix", endpointSocket)
+ if err != nil {
+ t.Fatalf("Cannot start listener: %s", err)
+ }
+ defer l.Close()
+
+ var wg sync.WaitGroup
+ wg.Add(1)
go func() {
- b := buf{&bytes.Buffer{}, 0}
- dio = New(b)
- dio.Close()
- close(done)
+ accept(t, l, 1)
+ wg.Done()
}()
- select {
- case <-done:
- case <-time.After(time.Second):
- t.Fatal("Not closing.")
+
+ err = dio.Connect(endpointSocket, true)
+ if err != nil {
+ t.Fatalf("Cannot connect to listener: %s", err)
+ }
+
+ msg := tap.Dnstap_MESSAGE
+ dio.Dnstap(tap.Dnstap{Type: &msg})
+
+ wg.Wait()
+
+ dio.Close()
+}
+
+func TestRace(t *testing.T) {
+ count := 10
+ dio := New()
+
+ err := dio.Connect(endpointTCP, false)
+ if err == nil {
+ t.Fatal("Not listening but no error")
}
- func() {
- defer func() {
- if err := recover(); err == nil {
- t.Fatal("Send on closed channel.")
- }
- }()
- dio.Dnstap(tap.Dnstap{})
+
+ // Start TCP listener
+ l, err := net.Listen("tcp", endpointTCP)
+ if err != nil {
+ t.Fatalf("Cannot start listener: %s", err)
+ }
+ defer l.Close()
+
+ var wg sync.WaitGroup
+ wg.Add(1)
+ go func() {
+ accept(t, l, count)
+ wg.Done()
}()
+
+ err = dio.Connect(l.Addr().String(), false)
+ if err != nil {
+ t.Fatalf("Cannot connect to listener: %s", err)
+ }
+
+ msg := tap.Dnstap_MESSAGE
+ wg.Add(count)
+ for i := 0; i < count; i++ {
+ go func(i byte) {
+ time.Sleep(50 * time.Millisecond)
+ dio.Dnstap(tap.Dnstap{Type: &msg, Extra: []byte{i}})
+ wg.Done()
+ }(byte(i))
+ }
+
+ wg.Wait()
+
+ dio.Close()
}
diff --git a/plugin/dnstap/out/socket.go b/plugin/dnstap/out/socket.go
deleted file mode 100644
index 58285f25c..000000000
--- a/plugin/dnstap/out/socket.go
+++ /dev/null
@@ -1,82 +0,0 @@
-package out
-
-import (
- "fmt"
- "net"
-
- fs "github.com/farsightsec/golang-framestream"
-)
-
-// Socket is a Frame Streams encoder over a UNIX socket.
-type Socket struct {
- path string
- enc *fs.Encoder
- conn net.Conn
- err error
-}
-
-func openSocket(s *Socket) error {
- conn, err := net.Dial("unix", s.path)
- if err != nil {
- return err
- }
- s.conn = conn
-
- enc, err := fs.NewEncoder(conn, &fs.EncoderOptions{
- ContentType: []byte("protobuf:dnstap.Dnstap"),
- Bidirectional: true,
- })
- if err != nil {
- return err
- }
- s.enc = enc
-
- s.err = nil
- return nil
-}
-
-// NewSocket will always return a new Socket.
-// err if nothing is listening to it, it will attempt to reconnect on the next Write.
-func NewSocket(path string) (s *Socket, err error) {
- s = &Socket{path: path}
- if err = openSocket(s); err != nil {
- err = fmt.Errorf("open socket: %s", err)
- s.err = err
- return
- }
- return
-}
-
-// Write a single Frame Streams frame.
-func (s *Socket) Write(frame []byte) (int, error) {
- if s.err != nil {
- // is the dnstap tool listening?
- if err := openSocket(s); err != nil {
- return 0, fmt.Errorf("open socket: %s", err)
- }
- }
- n, err := s.enc.Write(frame)
- if err != nil {
- // the dnstap command line tool is down
- s.conn.Close()
- s.err = err
- return 0, err
- }
- return n, nil
-
-}
-
-// Close the socket and flush the remaining frames.
-func (s *Socket) Close() error {
- if s.err != nil {
- // nothing to close
- return nil
- }
-
- defer s.conn.Close()
-
- if err := s.enc.Flush(); err != nil {
- return fmt.Errorf("flush: %s", err)
- }
- return s.enc.Close()
-}
diff --git a/plugin/dnstap/out/socket_test.go b/plugin/dnstap/out/socket_test.go
deleted file mode 100644
index 050a38d36..000000000
--- a/plugin/dnstap/out/socket_test.go
+++ /dev/null
@@ -1,94 +0,0 @@
-package out
-
-import (
- "net"
- "testing"
-
- fs "github.com/farsightsec/golang-framestream"
-)
-
-func acceptOne(t *testing.T, l net.Listener) {
- server, err := l.Accept()
- if err != nil {
- t.Fatalf("server accept: %s", err)
- return
- }
-
- dec, err := fs.NewDecoder(server, &fs.DecoderOptions{
- ContentType: []byte("protobuf:dnstap.Dnstap"),
- Bidirectional: true,
- })
- if err != nil {
- t.Fatalf("server decoder: %s", err)
- return
- }
-
- if _, err := dec.Decode(); err != nil {
- t.Errorf("server decode: %s", err)
- }
-
- if err := server.Close(); err != nil {
- t.Error(err)
- }
-}
-func sendOne(socket *Socket) error {
- if _, err := socket.Write([]byte("frame")); err != nil {
- return err
- }
- if err := socket.enc.Flush(); err != nil {
- // Would happen during Write in real life.
- socket.conn.Close()
- socket.err = err
- return err
- }
- return nil
-}
-func TestSocket(t *testing.T) {
- socket, err := NewSocket("dnstap.sock")
- if err == nil {
- t.Fatal("new socket: not listening but no error")
- return
- }
-
- if err := sendOne(socket); err == nil {
- t.Fatal("not listening but no error")
- return
- }
-
- l, err := net.Listen("unix", "dnstap.sock")
- if err != nil {
- t.Fatal(err)
- return
- }
-
- wait := make(chan bool)
- go func() {
- acceptOne(t, l)
- wait <- true
- }()
-
- if err := sendOne(socket); err != nil {
- t.Fatalf("send one: %s", err)
- return
- }
-
- <-wait
- if err := sendOne(socket); err == nil {
- panic("must fail")
- }
-
- go func() {
- acceptOne(t, l)
- wait <- true
- }()
-
- if err := sendOne(socket); err != nil {
- t.Fatalf("send one: %s", err)
- return
- }
-
- <-wait
- if err := l.Close(); err != nil {
- t.Error(err)
- }
-}
diff --git a/plugin/dnstap/out/tcp.go b/plugin/dnstap/out/tcp.go
deleted file mode 100644
index 715c3024a..000000000
--- a/plugin/dnstap/out/tcp.go
+++ /dev/null
@@ -1,59 +0,0 @@
-package out
-
-import (
- "net"
- "time"
-
- fs "github.com/farsightsec/golang-framestream"
-)
-
-// TCP is a Frame Streams encoder over TCP.
-type TCP struct {
- address string
- frames [][]byte
-}
-
-// NewTCP returns a TCP writer.
-func NewTCP(address string) *TCP {
- s := &TCP{address: address}
- s.frames = make([][]byte, 0, 13) // 13 messages buffer
- return s
-}
-
-// Write a single Frame Streams frame.
-func (s *TCP) Write(frame []byte) (n int, err error) {
- s.frames = append(s.frames, frame)
- if len(s.frames) == cap(s.frames) {
- return len(frame), s.Flush()
- }
- return len(frame), nil
-}
-
-// Flush the remaining frames.
-func (s *TCP) Flush() error {
- defer func() {
- s.frames = s.frames[:0]
- }()
- c, err := net.DialTimeout("tcp", s.address, time.Second)
- if err != nil {
- return err
- }
- enc, err := fs.NewEncoder(c, &fs.EncoderOptions{
- ContentType: []byte("protobuf:dnstap.Dnstap"),
- Bidirectional: true,
- })
- if err != nil {
- return err
- }
- for _, frame := range s.frames {
- if _, err = enc.Write(frame); err != nil {
- return err
- }
- }
- return enc.Flush()
-}
-
-// Close is an alias to Flush to satisfy io.WriteCloser similarly to type Socket.
-func (s *TCP) Close() error {
- return s.Flush()
-}
diff --git a/plugin/dnstap/out/tcp_test.go b/plugin/dnstap/out/tcp_test.go
deleted file mode 100644
index 45ac98bf7..000000000
--- a/plugin/dnstap/out/tcp_test.go
+++ /dev/null
@@ -1,64 +0,0 @@
-package out
-
-import (
- "net"
- "testing"
-)
-
-func sendOneTCP(tcp *TCP) error {
- if _, err := tcp.Write([]byte("frame")); err != nil {
- return err
- }
- return tcp.Flush()
-}
-
-func TestTCP(t *testing.T) {
- tcp := NewTCP("localhost:14000")
-
- if err := sendOneTCP(tcp); err == nil {
- t.Fatal("Not listening but no error.")
- return
- }
-
- l, err := net.Listen("tcp", "localhost:14000")
- if err != nil {
- t.Fatal(err)
- return
- }
-
- wait := make(chan bool)
- go func() {
- acceptOne(t, l)
- wait <- true
- }()
-
- if err := sendOneTCP(tcp); err != nil {
- t.Fatalf("send one: %s", err)
- return
- }
-
- <-wait
-
- // TODO: When the server isn't responding according to the framestream protocol
- // the thread is blocked.
- /*
- if err := sendOneTCP(tcp); err == nil {
- panic("must fail")
- }
- */
-
- go func() {
- acceptOne(t, l)
- wait <- true
- }()
-
- if err := sendOneTCP(tcp); err != nil {
- t.Fatalf("send one: %s", err)
- return
- }
-
- <-wait
- if err := l.Close(); err != nil {
- t.Error(err)
- }
-}
diff --git a/plugin/dnstap/setup.go b/plugin/dnstap/setup.go
index c1a8956a1..342f14e88 100644
--- a/plugin/dnstap/setup.go
+++ b/plugin/dnstap/setup.go
@@ -1,15 +1,11 @@
package dnstap
import (
- "fmt"
- "io"
- "log"
"strings"
"github.com/coredns/coredns/core/dnsserver"
"github.com/coredns/coredns/plugin"
"github.com/coredns/coredns/plugin/dnstap/dnstapio"
- "github.com/coredns/coredns/plugin/dnstap/out"
"github.com/coredns/coredns/plugin/pkg/dnsutil"
"github.com/mholt/caddy"
@@ -69,24 +65,24 @@ func setup(c *caddy.Controller) error {
return err
}
- dnstap := Dnstap{Pack: conf.full}
+ dio := dnstapio.New()
+ dnstap := Dnstap{IO: dio, Pack: conf.full}
- var o io.WriteCloser
- if conf.socket {
- o, err = out.NewSocket(conf.target)
+ c.OnStartup(func() error {
+ err := dio.Connect(conf.target, conf.socket)
if err != nil {
- log.Printf("[WARN] Can't connect to %s at the moment: %s", conf.target, err)
+ return plugin.Error("dnstap", err)
}
- } else {
- o = out.NewTCP(conf.target)
- }
- dio := dnstapio.New(o)
- dnstap.IO = dio
+ return nil
+ })
- c.OnShutdown(func() error {
- if err := dio.Close(); err != nil {
- return fmt.Errorf("dnstap io routine: %s", err)
- }
+ c.OnRestart(func() error {
+ dio.Close()
+ return nil
+ })
+
+ c.OnFinalShutdown(func() error {
+ dio.Close()
return nil
})