aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--plugin/dnstap/README.md8
-rw-r--r--plugin/dnstap/dnstapio/dnstap_encoder.go91
-rw-r--r--plugin/dnstap/dnstapio/dnstap_encoder_test.go53
-rw-r--r--plugin/dnstap/dnstapio/encoder.go41
-rw-r--r--plugin/dnstap/dnstapio/io.go120
-rw-r--r--plugin/dnstap/dnstapio/io_test.go45
-rw-r--r--plugin/dnstap/setup.go8
-rw-r--r--plugin/dnstap/setup_test.go20
8 files changed, 130 insertions, 256 deletions
diff --git a/plugin/dnstap/README.md b/plugin/dnstap/README.md
index b2bbd3c0f..8dc9b5674 100644
--- a/plugin/dnstap/README.md
+++ b/plugin/dnstap/README.md
@@ -9,8 +9,8 @@
dnstap is a flexible, structured binary log format for DNS software; see https://dnstap.info. With this
plugin you make CoreDNS output dnstap logging.
-Note that there is an internal buffer, so expect at least 13 requests before the server sends its
-dnstap messages to the socket.
+Every message is sent to the socket as soon as it comes in, the *dnstap* plugin has a buffer of
+10000 messages, above that number dnstap messages will be dropped (this is logged).
## Syntax
@@ -100,5 +100,5 @@ func (x RandomPlugin) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns
## See Also
-The website [dnstap.info](https://dnstap.info) has info on the dnstap protocol.
-The *forward* plugin's `dnstap.go` uses dnstap to tap messages sent to an upstream.
+The website [dnstap.info](https://dnstap.info) has info on the dnstap protocol. The *forward*
+plugin's `dnstap.go` uses dnstap to tap messages sent to an upstream.
diff --git a/plugin/dnstap/dnstapio/dnstap_encoder.go b/plugin/dnstap/dnstapio/dnstap_encoder.go
deleted file mode 100644
index 65b15f587..000000000
--- a/plugin/dnstap/dnstapio/dnstap_encoder.go
+++ /dev/null
@@ -1,91 +0,0 @@
-package dnstapio
-
-import (
- "encoding/binary"
- "fmt"
- "io"
-
- tap "github.com/dnstap/golang-dnstap"
- fs "github.com/farsightsec/golang-framestream"
- "github.com/golang/protobuf/proto"
-)
-
-const (
- protobufSize = 1024 * 1024
-)
-
-type dnstapEncoder struct {
- fse *fs.Encoder
- opts *fs.EncoderOptions
- writer io.Writer
- buffer *proto.Buffer
-}
-
-func newDnstapEncoder(o *fs.EncoderOptions) *dnstapEncoder {
- return &dnstapEncoder{
- opts: o,
- buffer: proto.NewBuffer(make([]byte, 0, protobufSize)),
- }
-}
-
-func (enc *dnstapEncoder) resetWriter(w io.Writer) error {
- fse, err := fs.NewEncoder(w, enc.opts)
- if err != nil {
- return err
- }
- if err = fse.Flush(); err != nil {
- return err
- }
- enc.fse = fse
- enc.writer = w
- return nil
-}
-
-func (enc *dnstapEncoder) writeMsg(msg *tap.Dnstap) error {
- if len(enc.buffer.Bytes()) >= protobufSize {
- if err := enc.flushBuffer(); err != nil {
- return err
- }
- }
- bufLen := len(enc.buffer.Bytes())
- // add placeholder for frame length
- if err := enc.buffer.EncodeFixed32(0); err != nil {
- enc.buffer.SetBuf(enc.buffer.Bytes()[:bufLen])
- return err
- }
- if err := enc.buffer.Marshal(msg); err != nil {
- enc.buffer.SetBuf(enc.buffer.Bytes()[:bufLen])
- return err
- }
- enc.encodeFrameLen(enc.buffer.Bytes()[bufLen:])
- return nil
-}
-
-func (enc *dnstapEncoder) flushBuffer() error {
- if enc.fse == nil || enc.writer == nil {
- return fmt.Errorf("no writer")
- }
-
- buf := enc.buffer.Bytes()
- written := 0
- for written < len(buf) {
- n, err := enc.writer.Write(buf[written:])
- written += n
- if err != nil {
- return err
- }
- }
- enc.buffer.Reset()
- return nil
-}
-
-func (enc *dnstapEncoder) encodeFrameLen(buf []byte) {
- binary.BigEndian.PutUint32(buf, uint32(len(buf)-4))
-}
-
-func (enc *dnstapEncoder) close() error {
- if enc.fse != nil {
- return enc.fse.Close()
- }
- return nil
-}
diff --git a/plugin/dnstap/dnstapio/dnstap_encoder_test.go b/plugin/dnstap/dnstapio/dnstap_encoder_test.go
deleted file mode 100644
index a7fe23d2b..000000000
--- a/plugin/dnstap/dnstapio/dnstap_encoder_test.go
+++ /dev/null
@@ -1,53 +0,0 @@
-package dnstapio
-
-import (
- "bytes"
- "testing"
-
- tap "github.com/dnstap/golang-dnstap"
- fs "github.com/farsightsec/golang-framestream"
- "github.com/golang/protobuf/proto"
-)
-
-func dnstapMsg() *tap.Dnstap {
- t := tap.Dnstap_MESSAGE
- mt := tap.Message_CLIENT_RESPONSE
- msg := &tap.Message{Type: &mt}
- return &tap.Dnstap{Type: &t, Message: msg}
-}
-
-func TestEncoderCompatibility(t *testing.T) {
- opts := &fs.EncoderOptions{
- ContentType: []byte("protobuf:dnstap.DnstapTest"),
- Bidirectional: false,
- }
- msg := dnstapMsg()
-
- //framestream encoder
- fsW := new(bytes.Buffer)
- fsEnc, err := fs.NewEncoder(fsW, opts)
- if err != nil {
- t.Fatal(err)
- }
- data, err := proto.Marshal(msg)
- if err != nil {
- t.Fatal(err)
- }
- fsEnc.Write(data)
- fsEnc.Close()
-
- //dnstap encoder
- dnstapW := new(bytes.Buffer)
- dnstapEnc := newDnstapEncoder(opts)
- if err := dnstapEnc.resetWriter(dnstapW); err != nil {
- t.Fatal(err)
- }
- dnstapEnc.writeMsg(msg)
- dnstapEnc.flushBuffer()
- dnstapEnc.close()
-
- //compare results
- if !bytes.Equal(fsW.Bytes(), dnstapW.Bytes()) {
- t.Fatal("DnstapEncoder is not compatible with framestream Encoder")
- }
-}
diff --git a/plugin/dnstap/dnstapio/encoder.go b/plugin/dnstap/dnstapio/encoder.go
new file mode 100644
index 000000000..2b4a76cd5
--- /dev/null
+++ b/plugin/dnstap/dnstapio/encoder.go
@@ -0,0 +1,41 @@
+// Package dnstapio is a small wrapper around golang-framestream
+package dnstapio
+
+import (
+ "io"
+ "time"
+
+ tap "github.com/dnstap/golang-dnstap"
+ fs "github.com/farsightsec/golang-framestream"
+ "github.com/golang/protobuf/proto"
+)
+
+// Encoder wraps a fs.Encoder.
+type Encoder struct {
+ fs *fs.Encoder
+}
+
+func newEncoder(w io.Writer, timeout time.Duration) (*Encoder, error) {
+ fs, err := fs.NewEncoder(w, &fs.EncoderOptions{
+ ContentType: []byte("protobuf:dnstap.Dnstap"),
+ Bidirectional: true,
+ Timeout: timeout,
+ })
+ if err != nil {
+ return nil, err
+ }
+ return &Encoder{fs}, nil
+}
+
+func (e *Encoder) writeMsg(msg *tap.Dnstap) error {
+ buf, err := proto.Marshal(msg)
+ if err != nil {
+ return err
+ }
+
+ _, err = e.fs.Write(buf) // n < len(buf) should return an error
+ return err
+}
+
+func (e *Encoder) flush() error { return e.fs.Flush() }
+func (e *Encoder) close() error { return e.fs.Close() }
diff --git a/plugin/dnstap/dnstapio/io.go b/plugin/dnstap/dnstapio/io.go
index c88fc14ab..d85196cc8 100644
--- a/plugin/dnstap/dnstapio/io.go
+++ b/plugin/dnstap/dnstapio/io.go
@@ -8,16 +8,15 @@ import (
clog "github.com/coredns/coredns/plugin/pkg/log"
tap "github.com/dnstap/golang-dnstap"
- fs "github.com/farsightsec/golang-framestream"
)
var log = clog.NewWithPlugin("dnstap")
const (
- tcpWriteBufSize = 1024 * 1024
+ tcpWriteBufSize = 1024 * 1024 // there is no good explanation for why this number (see #xxx)
+ queueSize = 10000 // see #xxxx
tcpTimeout = 4 * time.Second
flushTimeout = 1 * time.Second
- queueSize = 10000
)
// Tapper interface is used in testing to mock the Dnstap method.
@@ -27,52 +26,47 @@ type Tapper interface {
// dio implements the Tapper interface.
type dio struct {
- endpoint string
- socket bool
- conn net.Conn
- enc *dnstapEncoder
- queue chan tap.Dnstap
- dropped uint32
- quit chan struct{}
+ endpoint string
+ proto string
+ conn net.Conn
+ enc *Encoder
+ queue chan tap.Dnstap
+ dropped uint32
+ quit chan struct{}
+ flushTimeout time.Duration
+ tcpTimeout time.Duration
}
// New returns a new and initialized pointer to a dio.
-func New(endpoint string, socket bool) *dio {
+func New(proto, endpoint string) *dio {
return &dio{
- endpoint: endpoint,
- socket: socket,
- enc: newDnstapEncoder(&fs.EncoderOptions{
- ContentType: []byte("protobuf:dnstap.Dnstap"),
- Bidirectional: true,
- }),
- queue: make(chan tap.Dnstap, queueSize),
- quit: make(chan struct{}),
+ endpoint: endpoint,
+ proto: proto,
+ queue: make(chan tap.Dnstap, queueSize),
+ quit: make(chan struct{}),
+ flushTimeout: flushTimeout,
+ tcpTimeout: tcpTimeout,
}
}
-func (d *dio) newConnect() error {
- var err error
- if d.socket {
- if d.conn, err = net.Dial("unix", d.endpoint); err != nil {
- return err
- }
- } else {
- if d.conn, err = net.DialTimeout("tcp", d.endpoint, tcpTimeout); err != nil {
- return err
- }
- if tcpConn, ok := d.conn.(*net.TCPConn); ok {
- tcpConn.SetWriteBuffer(tcpWriteBufSize)
- tcpConn.SetNoDelay(false)
- }
+func (d *dio) dial() error {
+ conn, err := net.DialTimeout(d.proto, d.endpoint, d.tcpTimeout)
+ if err != nil {
+ return err
+ }
+ if tcpConn, ok := conn.(*net.TCPConn); ok {
+ tcpConn.SetWriteBuffer(tcpWriteBufSize)
+ tcpConn.SetNoDelay(false)
}
- return d.enc.resetWriter(d.conn)
+ d.enc, err = newEncoder(conn, d.tcpTimeout)
+ return err
}
// Connect connects to the dnstap endpoint.
func (d *dio) Connect() {
- if err := d.newConnect(); err != nil {
- log.Error("No connection to dnstap endpoint")
+ if err := d.dial(); err != nil {
+ log.Errorf("No connection to dnstap endpoint: %s", err)
}
go d.serve()
}
@@ -86,58 +80,46 @@ func (d *dio) Dnstap(payload tap.Dnstap) {
}
}
-func (d *dio) closeConnection() {
- d.enc.close()
- if d.conn != nil {
- d.conn.Close()
- d.conn = nil
- }
-}
-
// Close waits until the I/O routine is finished to return.
func (d *dio) Close() { close(d.quit) }
-func (d *dio) flushBuffer() {
- if d.conn == nil {
- if err := d.newConnect(); err != nil {
- return
- }
- log.Info("Reconnected to dnstap")
- }
-
- if err := d.enc.flushBuffer(); err != nil {
- log.Warningf("Connection lost: %s", err)
- d.closeConnection()
- if err := d.newConnect(); err != nil {
- log.Errorf("Cannot connect to dnstap: %s", err)
- } else {
- log.Info("Reconnected to dnstap")
- }
+func (d *dio) write(payload *tap.Dnstap) error {
+ if d.enc == nil {
+ atomic.AddUint32(&d.dropped, 1)
+ return nil
}
-}
-
-func (d *dio) write(payload *tap.Dnstap) {
if err := d.enc.writeMsg(payload); err != nil {
atomic.AddUint32(&d.dropped, 1)
+ return err
}
+ return nil
}
func (d *dio) serve() {
- timeout := time.After(flushTimeout)
+ timeout := time.After(d.flushTimeout)
for {
select {
case <-d.quit:
- d.flushBuffer()
- d.closeConnection()
+ if d.enc == nil {
+ return
+ }
+ d.enc.flush()
+ d.enc.close()
return
case payload := <-d.queue:
- d.write(&payload)
+ if err := d.write(&payload); err != nil {
+ d.dial()
+ }
case <-timeout:
if dropped := atomic.SwapUint32(&d.dropped, 0); dropped > 0 {
log.Warningf("Dropped dnstap messages: %d", dropped)
}
- d.flushBuffer()
- timeout = time.After(flushTimeout)
+ if d.enc == nil {
+ d.dial()
+ } else {
+ d.enc.flush()
+ }
+ timeout = time.After(d.flushTimeout)
}
}
}
diff --git a/plugin/dnstap/dnstapio/io_test.go b/plugin/dnstap/dnstapio/io_test.go
index f26f50095..6870734e4 100644
--- a/plugin/dnstap/dnstapio/io_test.go
+++ b/plugin/dnstap/dnstapio/io_test.go
@@ -12,11 +12,6 @@ import (
fs "github.com/farsightsec/golang-framestream"
)
-const (
- endpointTCP = "localhost:0"
- endpointSocket = "dnstap.sock"
-)
-
var (
msgType = tap.Dnstap_MESSAGE
msg = tap.Dnstap{Type: &msgType}
@@ -27,7 +22,6 @@ func accept(t *testing.T, l net.Listener, count int) {
if err != nil {
t.Fatalf("Server accepted: %s", err)
}
-
dec, err := fs.NewDecoder(server, &fs.DecoderOptions{
ContentType: []byte("protobuf:dnstap.Dnstap"),
Bidirectional: true,
@@ -48,9 +42,10 @@ func accept(t *testing.T, l net.Listener, count int) {
}
func TestTransport(t *testing.T) {
- transport := [2][3]string{
- {"tcp", endpointTCP, "false"},
- {"unix", endpointSocket, "true"},
+
+ transport := [2][2]string{
+ {"tcp", ":0"},
+ {"unix", "dnstap.sock"},
}
for _, param := range transport {
@@ -67,7 +62,9 @@ func TestTransport(t *testing.T) {
wg.Done()
}()
- dio := New(l.Addr().String(), param[2] == "true")
+ dio := New(param[0], l.Addr().String())
+ dio.tcpTimeout = 10 * time.Millisecond
+ dio.flushTimeout = 30 * time.Millisecond
dio.Connect()
dio.Dnstap(msg)
@@ -81,8 +78,7 @@ func TestTransport(t *testing.T) {
func TestRace(t *testing.T) {
count := 10
- // Start TCP listener
- l, err := reuseport.Listen("tcp", endpointTCP)
+ l, err := reuseport.Listen("tcp", ":0")
if err != nil {
t.Fatalf("Cannot start listener: %s", err)
}
@@ -95,27 +91,27 @@ func TestRace(t *testing.T) {
wg.Done()
}()
- dio := New(l.Addr().String(), false)
+ dio := New("tcp", l.Addr().String())
+ dio.tcpTimeout = 10 * time.Millisecond
+ dio.flushTimeout = 30 * time.Millisecond
dio.Connect()
defer dio.Close()
wg.Add(count)
for i := 0; i < count; i++ {
go func() {
- time.Sleep(50 * time.Millisecond)
- dio.Dnstap(msg)
+ msg := tap.Dnstap_MESSAGE
+ dio.Dnstap(tap.Dnstap{Type: &msg})
wg.Done()
}()
}
-
wg.Wait()
}
func TestReconnect(t *testing.T) {
count := 5
- // Start TCP listener
- l, err := reuseport.Listen("tcp", endpointTCP)
+ l, err := reuseport.Listen("tcp", ":0")
if err != nil {
t.Fatalf("Cannot start listener: %s", err)
}
@@ -128,18 +124,18 @@ func TestReconnect(t *testing.T) {
}()
addr := l.Addr().String()
- dio := New(addr, false)
+ dio := New("tcp", addr)
+ dio.tcpTimeout = 10 * time.Millisecond
+ dio.flushTimeout = 30 * time.Millisecond
dio.Connect()
defer dio.Close()
- msg := tap.Dnstap_MESSAGE
- dio.Dnstap(tap.Dnstap{Type: &msg})
+ dio.Dnstap(msg)
wg.Wait()
// Close listener
l.Close()
-
// And start TCP listener again on the same port
l, err = reuseport.Listen("tcp", addr)
if err != nil {
@@ -154,9 +150,8 @@ func TestReconnect(t *testing.T) {
}()
for i := 0; i < count; i++ {
- time.Sleep(time.Second)
- dio.Dnstap(tap.Dnstap{Type: &msg})
+ time.Sleep(100 * time.Millisecond)
+ dio.Dnstap(msg)
}
-
wg.Wait()
}
diff --git a/plugin/dnstap/setup.go b/plugin/dnstap/setup.go
index ab5488686..a863639ad 100644
--- a/plugin/dnstap/setup.go
+++ b/plugin/dnstap/setup.go
@@ -13,8 +13,8 @@ import (
func init() { plugin.Register("dnstap", setup) }
type config struct {
+ proto string
target string
- socket bool
full bool
}
@@ -32,10 +32,10 @@ func parseConfig(d *caddy.Controller) (c config, err error) {
return c, d.ArgErr()
}
c.target = servers[0]
+ c.proto = "tcp"
} else {
- // default to UNIX socket
c.target = strings.TrimPrefix(c.target, "unix://")
- c.socket = true
+ c.proto = "unix"
}
c.full = d.NextArg() && d.Val() == "full"
@@ -49,7 +49,7 @@ func setup(c *caddy.Controller) error {
return plugin.Error("dnstap", err)
}
- dio := dnstapio.New(conf.target, conf.socket)
+ dio := dnstapio.New(conf.proto, conf.target)
dnstap := Dnstap{io: dio, IncludeRawMessage: conf.full}
c.OnStartup(func() error {
diff --git a/plugin/dnstap/setup_test.go b/plugin/dnstap/setup_test.go
index 8fad9cd39..129107efd 100644
--- a/plugin/dnstap/setup_test.go
+++ b/plugin/dnstap/setup_test.go
@@ -8,16 +8,16 @@ import (
func TestConfig(t *testing.T) {
tests := []struct {
- file string
- path string
- full bool
- socket bool
- fail bool
+ file string
+ path string
+ full bool
+ proto string
+ fail bool
}{
- {"dnstap dnstap.sock full", "dnstap.sock", true, true, false},
- {"dnstap unix://dnstap.sock", "dnstap.sock", false, true, false},
- {"dnstap tcp://127.0.0.1:6000", "127.0.0.1:6000", false, false, false},
- {"dnstap", "fail", false, true, true},
+ {"dnstap dnstap.sock full", "dnstap.sock", true, "unix", false},
+ {"dnstap unix://dnstap.sock", "dnstap.sock", false, "unix", false},
+ {"dnstap tcp://127.0.0.1:6000", "127.0.0.1:6000", false, "tcp", false},
+ {"dnstap", "fail", false, "tcp", true},
}
for _, c := range tests {
cad := caddy.NewTestController("dns", c.file)
@@ -26,7 +26,7 @@ func TestConfig(t *testing.T) {
if err == nil {
t.Errorf("%s: %s", c.file, err)
}
- } else if err != nil || conf.target != c.path || conf.full != c.full || conf.socket != c.socket {
+ } else if err != nil || conf.target != c.path || conf.full != c.full || conf.proto != c.proto {
t.Errorf("Expected: %+v\nhave: %+v\nerror: %s", c, conf, err)
}
}