aboutsummaryrefslogtreecommitdiff
path: root/plugin/dnstap/io.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugin/dnstap/io.go')
-rw-r--r--plugin/dnstap/io.go121
1 files changed, 121 insertions, 0 deletions
diff --git a/plugin/dnstap/io.go b/plugin/dnstap/io.go
new file mode 100644
index 000000000..6823fa8a6
--- /dev/null
+++ b/plugin/dnstap/io.go
@@ -0,0 +1,121 @@
+package dnstap
+
+import (
+ "net"
+ "sync/atomic"
+ "time"
+
+ tap "github.com/dnstap/golang-dnstap"
+)
+
+const (
+ tcpWriteBufSize = 1024 * 1024 // there is no good explanation for why this number has this value.
+ queueSize = 10000 // idem.
+
+ tcpTimeout = 4 * time.Second
+ flushTimeout = 1 * time.Second
+)
+
+// tapper interface is used in testing to mock the Dnstap method.
+type tapper interface {
+ Dnstap(tap.Dnstap)
+}
+
+// dio implements the Tapper interface.
+type dio struct {
+ endpoint string
+ proto string
+ conn net.Conn
+ enc *encoder
+ queue chan tap.Dnstap
+ dropped uint32
+ quit chan struct{}
+ flushTimeout time.Duration
+ tcpTimeout time.Duration
+}
+
+// newIO returns a new and initialized pointer to a dio.
+func newIO(proto, endpoint string) *dio {
+ return &dio{
+ endpoint: endpoint,
+ proto: proto,
+ queue: make(chan tap.Dnstap, queueSize),
+ quit: make(chan struct{}),
+ flushTimeout: flushTimeout,
+ tcpTimeout: tcpTimeout,
+ }
+}
+
+func (d *dio) dial() error {
+ conn, err := net.DialTimeout(d.proto, d.endpoint, d.tcpTimeout)
+ if err != nil {
+ return err
+ }
+ if tcpConn, ok := conn.(*net.TCPConn); ok {
+ tcpConn.SetWriteBuffer(tcpWriteBufSize)
+ tcpConn.SetNoDelay(false)
+ }
+
+ d.enc, err = newEncoder(conn, d.tcpTimeout)
+ return err
+}
+
+// Connect connects to the dnstap endpoint.
+func (d *dio) connect() error {
+ err := d.dial()
+ go d.serve()
+ return err
+}
+
+// Dnstap enqueues the payload for log.
+func (d *dio) Dnstap(payload tap.Dnstap) {
+ select {
+ case d.queue <- payload:
+ default:
+ atomic.AddUint32(&d.dropped, 1)
+ }
+}
+
+// close waits until the I/O routine is finished to return.
+func (d *dio) close() { close(d.quit) }
+
+func (d *dio) write(payload *tap.Dnstap) error {
+ if d.enc == nil {
+ atomic.AddUint32(&d.dropped, 1)
+ return nil
+ }
+ if err := d.enc.writeMsg(payload); err != nil {
+ atomic.AddUint32(&d.dropped, 1)
+ return err
+ }
+ return nil
+}
+
+func (d *dio) serve() {
+ timeout := time.After(d.flushTimeout)
+ for {
+ select {
+ case <-d.quit:
+ if d.enc == nil {
+ return
+ }
+ d.enc.flush()
+ d.enc.close()
+ return
+ case payload := <-d.queue:
+ if err := d.write(&payload); err != nil {
+ d.dial()
+ }
+ case <-timeout:
+ if dropped := atomic.SwapUint32(&d.dropped, 0); dropped > 0 {
+ log.Warningf("Dropped dnstap messages: %d", dropped)
+ }
+ if d.enc == nil {
+ d.dial()
+ } else {
+ d.enc.flush()
+ }
+ timeout = time.After(d.flushTimeout)
+ }
+ }
+}