diff options
Diffstat (limited to 'plugin/dnstap/dnstapio/io.go')
-rw-r--r-- | plugin/dnstap/dnstapio/io.go | 120 |
1 files changed, 51 insertions, 69 deletions
diff --git a/plugin/dnstap/dnstapio/io.go b/plugin/dnstap/dnstapio/io.go index c88fc14ab..d85196cc8 100644 --- a/plugin/dnstap/dnstapio/io.go +++ b/plugin/dnstap/dnstapio/io.go @@ -8,16 +8,15 @@ import ( clog "github.com/coredns/coredns/plugin/pkg/log" tap "github.com/dnstap/golang-dnstap" - fs "github.com/farsightsec/golang-framestream" ) var log = clog.NewWithPlugin("dnstap") const ( - tcpWriteBufSize = 1024 * 1024 + tcpWriteBufSize = 1024 * 1024 // there is no good explanation for why this number (see #xxx) + queueSize = 10000 // see #xxxx tcpTimeout = 4 * time.Second flushTimeout = 1 * time.Second - queueSize = 10000 ) // Tapper interface is used in testing to mock the Dnstap method. @@ -27,52 +26,47 @@ type Tapper interface { // dio implements the Tapper interface. type dio struct { - endpoint string - socket bool - conn net.Conn - enc *dnstapEncoder - queue chan tap.Dnstap - dropped uint32 - quit chan struct{} + endpoint string + proto string + conn net.Conn + enc *Encoder + queue chan tap.Dnstap + dropped uint32 + quit chan struct{} + flushTimeout time.Duration + tcpTimeout time.Duration } // New returns a new and initialized pointer to a dio. -func New(endpoint string, socket bool) *dio { +func New(proto, endpoint string) *dio { return &dio{ - endpoint: endpoint, - socket: socket, - enc: newDnstapEncoder(&fs.EncoderOptions{ - ContentType: []byte("protobuf:dnstap.Dnstap"), - Bidirectional: true, - }), - queue: make(chan tap.Dnstap, queueSize), - quit: make(chan struct{}), + endpoint: endpoint, + proto: proto, + queue: make(chan tap.Dnstap, queueSize), + quit: make(chan struct{}), + flushTimeout: flushTimeout, + tcpTimeout: tcpTimeout, } } -func (d *dio) newConnect() error { - var err error - if d.socket { - if d.conn, err = net.Dial("unix", d.endpoint); err != nil { - return err - } - } else { - if d.conn, err = net.DialTimeout("tcp", d.endpoint, tcpTimeout); err != nil { - return err - } - if tcpConn, ok := d.conn.(*net.TCPConn); ok { - tcpConn.SetWriteBuffer(tcpWriteBufSize) - tcpConn.SetNoDelay(false) - } +func (d *dio) dial() error { + conn, err := net.DialTimeout(d.proto, d.endpoint, d.tcpTimeout) + if err != nil { + return err + } + if tcpConn, ok := conn.(*net.TCPConn); ok { + tcpConn.SetWriteBuffer(tcpWriteBufSize) + tcpConn.SetNoDelay(false) } - return d.enc.resetWriter(d.conn) + d.enc, err = newEncoder(conn, d.tcpTimeout) + return err } // Connect connects to the dnstap endpoint. func (d *dio) Connect() { - if err := d.newConnect(); err != nil { - log.Error("No connection to dnstap endpoint") + if err := d.dial(); err != nil { + log.Errorf("No connection to dnstap endpoint: %s", err) } go d.serve() } @@ -86,58 +80,46 @@ func (d *dio) Dnstap(payload tap.Dnstap) { } } -func (d *dio) closeConnection() { - d.enc.close() - if d.conn != nil { - d.conn.Close() - d.conn = nil - } -} - // Close waits until the I/O routine is finished to return. func (d *dio) Close() { close(d.quit) } -func (d *dio) flushBuffer() { - if d.conn == nil { - if err := d.newConnect(); err != nil { - return - } - log.Info("Reconnected to dnstap") - } - - if err := d.enc.flushBuffer(); err != nil { - log.Warningf("Connection lost: %s", err) - d.closeConnection() - if err := d.newConnect(); err != nil { - log.Errorf("Cannot connect to dnstap: %s", err) - } else { - log.Info("Reconnected to dnstap") - } +func (d *dio) write(payload *tap.Dnstap) error { + if d.enc == nil { + atomic.AddUint32(&d.dropped, 1) + return nil } -} - -func (d *dio) write(payload *tap.Dnstap) { if err := d.enc.writeMsg(payload); err != nil { atomic.AddUint32(&d.dropped, 1) + return err } + return nil } func (d *dio) serve() { - timeout := time.After(flushTimeout) + timeout := time.After(d.flushTimeout) for { select { case <-d.quit: - d.flushBuffer() - d.closeConnection() + if d.enc == nil { + return + } + d.enc.flush() + d.enc.close() return case payload := <-d.queue: - d.write(&payload) + if err := d.write(&payload); err != nil { + d.dial() + } case <-timeout: if dropped := atomic.SwapUint32(&d.dropped, 0); dropped > 0 { log.Warningf("Dropped dnstap messages: %d", dropped) } - d.flushBuffer() - timeout = time.After(flushTimeout) + if d.enc == nil { + d.dial() + } else { + d.enc.flush() + } + timeout = time.After(d.flushTimeout) } } } |