aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--plugin/dnstap/dnstapio/dnstap_encoder.go92
-rw-r--r--plugin/dnstap/dnstapio/dnstap_encoder_test.go53
-rw-r--r--plugin/dnstap/dnstapio/io.go93
3 files changed, 196 insertions, 42 deletions
diff --git a/plugin/dnstap/dnstapio/dnstap_encoder.go b/plugin/dnstap/dnstapio/dnstap_encoder.go
new file mode 100644
index 000000000..07dfc8413
--- /dev/null
+++ b/plugin/dnstap/dnstapio/dnstap_encoder.go
@@ -0,0 +1,92 @@
+package dnstapio
+
+import (
+ "encoding/binary"
+ "fmt"
+ "io"
+
+ tap "github.com/dnstap/golang-dnstap"
+ fs "github.com/farsightsec/golang-framestream"
+ "github.com/golang/protobuf/proto"
+)
+
+const (
+ frameLenSize = 4
+ protobufSize = 1024 * 1024
+)
+
+type dnstapEncoder struct {
+ fse *fs.Encoder
+ opts *fs.EncoderOptions
+ writer io.Writer
+ buffer *proto.Buffer
+}
+
+func newDnstapEncoder(o *fs.EncoderOptions) *dnstapEncoder {
+ return &dnstapEncoder{
+ opts: o,
+ buffer: proto.NewBuffer(make([]byte, 0, protobufSize)),
+ }
+}
+
+func (enc *dnstapEncoder) resetWriter(w io.Writer) error {
+ fse, err := fs.NewEncoder(w, enc.opts)
+ if err != nil {
+ return err
+ }
+ if err = fse.Flush(); err != nil {
+ return err
+ }
+ enc.fse = fse
+ enc.writer = w
+ return nil
+}
+
+func (enc *dnstapEncoder) writeMsg(msg *tap.Dnstap) error {
+ if len(enc.buffer.Bytes()) >= protobufSize {
+ if err := enc.flushBuffer(); err != nil {
+ return err
+ }
+ }
+ bufLen := len(enc.buffer.Bytes())
+ // add placeholder for frame length
+ if err := enc.buffer.EncodeFixed32(0); err != nil {
+ enc.buffer.SetBuf(enc.buffer.Bytes()[:bufLen])
+ return err
+ }
+ if err := enc.buffer.Marshal(msg); err != nil {
+ enc.buffer.SetBuf(enc.buffer.Bytes()[:bufLen])
+ return err
+ }
+ enc.encodeFrameLen(enc.buffer.Bytes()[bufLen:])
+ return nil
+}
+
+func (enc *dnstapEncoder) flushBuffer() error {
+ if enc.fse == nil || enc.writer == nil {
+ return fmt.Errorf("no writer")
+ }
+
+ buf := enc.buffer.Bytes()
+ written := 0
+ for written < len(buf) {
+ n, err := enc.writer.Write(buf[written:])
+ written += n
+ if err != nil {
+ return err
+ }
+ }
+ enc.buffer.Reset()
+ return nil
+}
+
+func (enc *dnstapEncoder) encodeFrameLen(buf []byte) {
+ binary.BigEndian.PutUint32(buf, uint32(len(buf)-4))
+}
+
+func (enc *dnstapEncoder) close() error {
+ if enc.fse != nil {
+ return enc.fse.Close()
+ }
+ return nil
+}
diff --git a/plugin/dnstap/dnstapio/dnstap_encoder_test.go b/plugin/dnstap/dnstapio/dnstap_encoder_test.go
new file mode 100644
index 000000000..7ddb08776
--- /dev/null
+++ b/plugin/dnstap/dnstapio/dnstap_encoder_test.go
@@ -0,0 +1,53 @@
+package dnstapio
+
+import (
+ "bytes"
+ "testing"
+
+ tap "github.com/dnstap/golang-dnstap"
+ fs "github.com/farsightsec/golang-framestream"
+ "github.com/golang/protobuf/proto"
+)
+
+func dnstapMsg() *tap.Dnstap {
+ t := tap.Dnstap_MESSAGE
+ mt := tap.Message_CLIENT_RESPONSE
+ msg := &tap.Message{Type: &mt}
+ return &tap.Dnstap{Type: &t, Message: msg}
+}
+
+func TestEncoderCompatibility(t *testing.T) {
+ opts := &fs.EncoderOptions{
+ ContentType: []byte("protobuf:dnstap.DnstapTest"),
+ Bidirectional: false,
+ }
+ msg := dnstapMsg()
+
+ //framestream encoder
+ fsW := new(bytes.Buffer)
+ fsEnc, err := fs.NewEncoder(fsW, opts)
+ if err != nil {
+ t.Fatal(err)
+ }
+ data, err := proto.Marshal(msg)
+ if err != nil {
+ t.Fatal(err)
+ }
+ fsEnc.Write(data)
+ fsEnc.Close()
+
+ //dnstap encoder
+ dnstapW := new(bytes.Buffer)
+ dnstapEnc := newDnstapEncoder(opts)
+ if err := dnstapEnc.resetWriter(dnstapW); err != nil {
+ t.Fatal(err)
+ }
+ dnstapEnc.writeMsg(msg)
+ dnstapEnc.flushBuffer()
+ dnstapEnc.close()
+
+ //compare results
+ if !bytes.Equal(fsW.Bytes(), dnstapW.Bytes()) {
+ t.Fatal("dnstapEncoder is not compatible with framestream Encoder")
+ }
+}
diff --git a/plugin/dnstap/dnstapio/io.go b/plugin/dnstap/dnstapio/io.go
index 08d33aa0c..d2f54679b 100644
--- a/plugin/dnstap/dnstapio/io.go
+++ b/plugin/dnstap/dnstapio/io.go
@@ -3,25 +3,27 @@ package dnstapio
import (
"log"
"net"
+ "sync/atomic"
"time"
tap "github.com/dnstap/golang-dnstap"
fs "github.com/farsightsec/golang-framestream"
- "github.com/golang/protobuf/proto"
)
const (
- tcpTimeout = 4 * time.Second
- flushTimeout = 1 * time.Second
- queueSize = 10000
+ tcpWriteBufSize = 1024 * 1024
+ tcpTimeout = 4 * time.Second
+ flushTimeout = 1 * time.Second
+ queueSize = 10000
)
type dnstapIO struct {
endpoint string
socket bool
conn net.Conn
- enc *fs.Encoder
+ enc *dnstapEncoder
queue chan tap.Dnstap
+ dropped uint32
}
// New returns a new and initialized DnstapIO.
@@ -29,7 +31,11 @@ func New(endpoint string, socket bool) DnstapIO {
return &dnstapIO{
endpoint: endpoint,
socket: socket,
- queue: make(chan tap.Dnstap, queueSize),
+ enc: newDnstapEncoder(&fs.EncoderOptions{
+ ContentType: []byte("protobuf:dnstap.Dnstap"),
+ Bidirectional: true,
+ }),
+ queue: make(chan tap.Dnstap, queueSize),
}
}
@@ -43,18 +49,20 @@ type DnstapIO interface {
func (dio *dnstapIO) newConnect() error {
var err error
if dio.socket {
- dio.conn, err = net.Dial("unix", dio.endpoint)
+ if dio.conn, err = net.Dial("unix", dio.endpoint); err != nil {
+ return err
+ }
} else {
- dio.conn, err = net.DialTimeout("tcp", dio.endpoint, tcpTimeout)
- }
- if err != nil {
- return err
+ if dio.conn, err = net.DialTimeout("tcp", dio.endpoint, tcpTimeout); err != nil {
+ return err
+ }
+ if tcpConn, ok := dio.conn.(*net.TCPConn); ok {
+ tcpConn.SetWriteBuffer(tcpWriteBufSize)
+ tcpConn.SetNoDelay(false)
+ }
}
- dio.enc, err = fs.NewEncoder(dio.conn, &fs.EncoderOptions{
- ContentType: []byte("protobuf:dnstap.Dnstap"),
- Bidirectional: true,
- })
- if err != nil {
+
+ if err = dio.enc.resetWriter(dio.conn); err != nil {
return err
}
return nil
@@ -73,15 +81,16 @@ func (dio *dnstapIO) Dnstap(payload tap.Dnstap) {
select {
case dio.queue <- payload:
default:
- log.Printf("[ERROR] Dnstap payload dropped")
+ atomic.AddUint32(&dio.dropped, 1)
}
}
func (dio *dnstapIO) closeConnection() {
- dio.enc.Close()
- dio.conn.Close()
- dio.enc = nil
- dio.conn = nil
+ dio.enc.close()
+ if dio.conn != nil {
+ dio.conn.Close()
+ dio.conn = nil
+ }
}
// Close waits until the I/O routine is finished to return.
@@ -89,32 +98,28 @@ func (dio *dnstapIO) Close() {
close(dio.queue)
}
-func (dio *dnstapIO) write(payload *tap.Dnstap) {
- if dio.enc == nil {
+func (dio *dnstapIO) flushBuffer() {
+ if dio.conn == nil {
if err := dio.newConnect(); err != nil {
return
}
+ log.Printf("[INFO] Reconnected to dnstap")
}
- var err error
- if payload != nil {
- frame, e := proto.Marshal(payload)
- if err != nil {
- log.Printf("[ERROR] Invalid dnstap payload dropped: %s", e)
- return
+
+ if err := dio.enc.flushBuffer(); err != nil {
+ log.Printf("[WARN] Connection lost: %s", err)
+ dio.closeConnection()
+ if err := dio.newConnect(); err != nil {
+ log.Printf("[ERROR] Cannot connect to dnstap: %s", err)
+ } else {
+ log.Printf("[INFO] Reconnected to dnstap")
}
- _, err = dio.enc.Write(frame)
- } else {
- err = dio.enc.Flush()
}
- if err == nil {
- return
- }
- log.Printf("[WARN] Connection lost: %s", err)
- dio.closeConnection()
- if err := dio.newConnect(); err != nil {
- log.Printf("[ERROR] Cannot write dnstap payload: %s", err)
- } else {
- log.Printf("[INFO] Reconnect to dnstap done")
+}
+
+func (dio *dnstapIO) write(payload *tap.Dnstap) {
+ if err := dio.enc.writeMsg(payload); err != nil {
+ atomic.AddUint32(&dio.dropped, 1)
}
}
@@ -124,12 +129,16 @@ func (dio *dnstapIO) serve() {
select {
case payload, ok := <-dio.queue:
if !ok {
+ dio.flushBuffer()
dio.closeConnection()
return
}
dio.write(&payload)
case <-timeout:
- dio.write(nil)
+ if dropped := atomic.SwapUint32(&dio.dropped, 0); dropped > 0 {
+ log.Printf("[WARN] Dropped dnstap messages: %d", dropped)
+ }
+ dio.flushBuffer()
timeout = time.After(flushTimeout)
}
}