aboutsummaryrefslogtreecommitdiff
path: root/plugin/dnstap/dnstapio/io.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugin/dnstap/dnstapio/io.go')
-rw-r--r--plugin/dnstap/dnstapio/io.go120
1 files changed, 51 insertions, 69 deletions
diff --git a/plugin/dnstap/dnstapio/io.go b/plugin/dnstap/dnstapio/io.go
index c88fc14ab..d85196cc8 100644
--- a/plugin/dnstap/dnstapio/io.go
+++ b/plugin/dnstap/dnstapio/io.go
@@ -8,16 +8,15 @@ import (
clog "github.com/coredns/coredns/plugin/pkg/log"
tap "github.com/dnstap/golang-dnstap"
- fs "github.com/farsightsec/golang-framestream"
)
var log = clog.NewWithPlugin("dnstap")
const (
- tcpWriteBufSize = 1024 * 1024
+ tcpWriteBufSize = 1024 * 1024 // there is no good explanation for why this number (see #xxx)
+ queueSize = 10000 // see #xxxx
tcpTimeout = 4 * time.Second
flushTimeout = 1 * time.Second
- queueSize = 10000
)
// Tapper interface is used in testing to mock the Dnstap method.
@@ -27,52 +26,47 @@ type Tapper interface {
// dio implements the Tapper interface.
type dio struct {
- endpoint string
- socket bool
- conn net.Conn
- enc *dnstapEncoder
- queue chan tap.Dnstap
- dropped uint32
- quit chan struct{}
+ endpoint string
+ proto string
+ conn net.Conn
+ enc *Encoder
+ queue chan tap.Dnstap
+ dropped uint32
+ quit chan struct{}
+ flushTimeout time.Duration
+ tcpTimeout time.Duration
}
// New returns a new and initialized pointer to a dio.
-func New(endpoint string, socket bool) *dio {
+func New(proto, endpoint string) *dio {
return &dio{
- endpoint: endpoint,
- socket: socket,
- enc: newDnstapEncoder(&fs.EncoderOptions{
- ContentType: []byte("protobuf:dnstap.Dnstap"),
- Bidirectional: true,
- }),
- queue: make(chan tap.Dnstap, queueSize),
- quit: make(chan struct{}),
+ endpoint: endpoint,
+ proto: proto,
+ queue: make(chan tap.Dnstap, queueSize),
+ quit: make(chan struct{}),
+ flushTimeout: flushTimeout,
+ tcpTimeout: tcpTimeout,
}
}
-func (d *dio) newConnect() error {
- var err error
- if d.socket {
- if d.conn, err = net.Dial("unix", d.endpoint); err != nil {
- return err
- }
- } else {
- if d.conn, err = net.DialTimeout("tcp", d.endpoint, tcpTimeout); err != nil {
- return err
- }
- if tcpConn, ok := d.conn.(*net.TCPConn); ok {
- tcpConn.SetWriteBuffer(tcpWriteBufSize)
- tcpConn.SetNoDelay(false)
- }
+func (d *dio) dial() error {
+ conn, err := net.DialTimeout(d.proto, d.endpoint, d.tcpTimeout)
+ if err != nil {
+ return err
+ }
+ if tcpConn, ok := conn.(*net.TCPConn); ok {
+ tcpConn.SetWriteBuffer(tcpWriteBufSize)
+ tcpConn.SetNoDelay(false)
}
- return d.enc.resetWriter(d.conn)
+ d.enc, err = newEncoder(conn, d.tcpTimeout)
+ return err
}
// Connect connects to the dnstap endpoint.
func (d *dio) Connect() {
- if err := d.newConnect(); err != nil {
- log.Error("No connection to dnstap endpoint")
+ if err := d.dial(); err != nil {
+ log.Errorf("No connection to dnstap endpoint: %s", err)
}
go d.serve()
}
@@ -86,58 +80,46 @@ func (d *dio) Dnstap(payload tap.Dnstap) {
}
}
-func (d *dio) closeConnection() {
- d.enc.close()
- if d.conn != nil {
- d.conn.Close()
- d.conn = nil
- }
-}
-
// Close waits until the I/O routine is finished to return.
func (d *dio) Close() { close(d.quit) }
-func (d *dio) flushBuffer() {
- if d.conn == nil {
- if err := d.newConnect(); err != nil {
- return
- }
- log.Info("Reconnected to dnstap")
- }
-
- if err := d.enc.flushBuffer(); err != nil {
- log.Warningf("Connection lost: %s", err)
- d.closeConnection()
- if err := d.newConnect(); err != nil {
- log.Errorf("Cannot connect to dnstap: %s", err)
- } else {
- log.Info("Reconnected to dnstap")
- }
+func (d *dio) write(payload *tap.Dnstap) error {
+ if d.enc == nil {
+ atomic.AddUint32(&d.dropped, 1)
+ return nil
}
-}
-
-func (d *dio) write(payload *tap.Dnstap) {
if err := d.enc.writeMsg(payload); err != nil {
atomic.AddUint32(&d.dropped, 1)
+ return err
}
+ return nil
}
func (d *dio) serve() {
- timeout := time.After(flushTimeout)
+ timeout := time.After(d.flushTimeout)
for {
select {
case <-d.quit:
- d.flushBuffer()
- d.closeConnection()
+ if d.enc == nil {
+ return
+ }
+ d.enc.flush()
+ d.enc.close()
return
case payload := <-d.queue:
- d.write(&payload)
+ if err := d.write(&payload); err != nil {
+ d.dial()
+ }
case <-timeout:
if dropped := atomic.SwapUint32(&d.dropped, 0); dropped > 0 {
log.Warningf("Dropped dnstap messages: %d", dropped)
}
- d.flushBuffer()
- timeout = time.After(flushTimeout)
+ if d.enc == nil {
+ d.dial()
+ } else {
+ d.enc.flush()
+ }
+ timeout = time.After(d.flushTimeout)
}
}
}