diff options
Diffstat (limited to 'plugin/dnstap/dnstapio/io.go')
-rw-r--r-- | plugin/dnstap/dnstapio/io.go | 93 |
1 files changed, 45 insertions, 48 deletions
diff --git a/plugin/dnstap/dnstapio/io.go b/plugin/dnstap/dnstapio/io.go index 9a4c26042..c88fc14ab 100644 --- a/plugin/dnstap/dnstapio/io.go +++ b/plugin/dnstap/dnstapio/io.go @@ -20,7 +20,13 @@ const ( queueSize = 10000 ) -type dnstapIO struct { +// Tapper interface is used in testing to mock the Dnstap method. +type Tapper interface { + Dnstap(tap.Dnstap) +} + +// dio implements the Tapper interface. +type dio struct { endpoint string socket bool conn net.Conn @@ -30,9 +36,9 @@ type dnstapIO struct { quit chan struct{} } -// New returns a new and initialized DnstapIO. -func New(endpoint string, socket bool) DnstapIO { - return &dnstapIO{ +// New returns a new and initialized pointer to a dio. +func New(endpoint string, socket bool) *dio { + return &dio{ endpoint: endpoint, socket: socket, enc: newDnstapEncoder(&fs.EncoderOptions{ @@ -44,74 +50,65 @@ func New(endpoint string, socket bool) DnstapIO { } } -// DnstapIO interface -type DnstapIO interface { - Connect() - Dnstap(payload tap.Dnstap) - Close() -} - -func (dio *dnstapIO) newConnect() error { +func (d *dio) newConnect() error { var err error - if dio.socket { - if dio.conn, err = net.Dial("unix", dio.endpoint); err != nil { + if d.socket { + if d.conn, err = net.Dial("unix", d.endpoint); err != nil { return err } } else { - if dio.conn, err = net.DialTimeout("tcp", dio.endpoint, tcpTimeout); err != nil { + if d.conn, err = net.DialTimeout("tcp", d.endpoint, tcpTimeout); err != nil { return err } - if tcpConn, ok := dio.conn.(*net.TCPConn); ok { + if tcpConn, ok := d.conn.(*net.TCPConn); ok { tcpConn.SetWriteBuffer(tcpWriteBufSize) tcpConn.SetNoDelay(false) } } - return dio.enc.resetWriter(dio.conn) + return d.enc.resetWriter(d.conn) } // Connect connects to the dnstap endpoint. -func (dio *dnstapIO) Connect() { - if err := dio.newConnect(); err != nil { +func (d *dio) Connect() { + if err := d.newConnect(); err != nil { log.Error("No connection to dnstap endpoint") } - go dio.serve() + go d.serve() } // Dnstap enqueues the payload for log. -func (dio *dnstapIO) Dnstap(payload tap.Dnstap) { +func (d *dio) Dnstap(payload tap.Dnstap) { select { - case dio.queue <- payload: + case d.queue <- payload: default: - atomic.AddUint32(&dio.dropped, 1) + atomic.AddUint32(&d.dropped, 1) } } -func (dio *dnstapIO) closeConnection() { - dio.enc.close() - if dio.conn != nil { - dio.conn.Close() - dio.conn = nil +func (d *dio) closeConnection() { + d.enc.close() + if d.conn != nil { + d.conn.Close() + d.conn = nil } } // Close waits until the I/O routine is finished to return. -func (dio *dnstapIO) Close() { - close(dio.quit) -} +func (d *dio) Close() { close(d.quit) } -func (dio *dnstapIO) flushBuffer() { - if dio.conn == nil { - if err := dio.newConnect(); err != nil { +func (d *dio) flushBuffer() { + if d.conn == nil { + if err := d.newConnect(); err != nil { return } log.Info("Reconnected to dnstap") } - if err := dio.enc.flushBuffer(); err != nil { + if err := d.enc.flushBuffer(); err != nil { log.Warningf("Connection lost: %s", err) - dio.closeConnection() - if err := dio.newConnect(); err != nil { + d.closeConnection() + if err := d.newConnect(); err != nil { log.Errorf("Cannot connect to dnstap: %s", err) } else { log.Info("Reconnected to dnstap") @@ -119,27 +116,27 @@ func (dio *dnstapIO) flushBuffer() { } } -func (dio *dnstapIO) write(payload *tap.Dnstap) { - if err := dio.enc.writeMsg(payload); err != nil { - atomic.AddUint32(&dio.dropped, 1) +func (d *dio) write(payload *tap.Dnstap) { + if err := d.enc.writeMsg(payload); err != nil { + atomic.AddUint32(&d.dropped, 1) } } -func (dio *dnstapIO) serve() { +func (d *dio) serve() { timeout := time.After(flushTimeout) for { select { - case <-dio.quit: - dio.flushBuffer() - dio.closeConnection() + case <-d.quit: + d.flushBuffer() + d.closeConnection() return - case payload := <-dio.queue: - dio.write(&payload) + case payload := <-d.queue: + d.write(&payload) case <-timeout: - if dropped := atomic.SwapUint32(&dio.dropped, 0); dropped > 0 { + if dropped := atomic.SwapUint32(&d.dropped, 0); dropped > 0 { log.Warningf("Dropped dnstap messages: %d", dropped) } - dio.flushBuffer() + d.flushBuffer() timeout = time.After(flushTimeout) } } |