aboutsummaryrefslogtreecommitdiff
path: root/plugin/dnstap/dnstapio/io.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugin/dnstap/dnstapio/io.go')
-rw-r--r--plugin/dnstap/dnstapio/io.go93
1 files changed, 45 insertions, 48 deletions
diff --git a/plugin/dnstap/dnstapio/io.go b/plugin/dnstap/dnstapio/io.go
index 9a4c26042..c88fc14ab 100644
--- a/plugin/dnstap/dnstapio/io.go
+++ b/plugin/dnstap/dnstapio/io.go
@@ -20,7 +20,13 @@ const (
queueSize = 10000
)
-type dnstapIO struct {
+// Tapper interface is used in testing to mock the Dnstap method.
+type Tapper interface {
+ Dnstap(tap.Dnstap)
+}
+
+// dio implements the Tapper interface.
+type dio struct {
endpoint string
socket bool
conn net.Conn
@@ -30,9 +36,9 @@ type dnstapIO struct {
quit chan struct{}
}
-// New returns a new and initialized DnstapIO.
-func New(endpoint string, socket bool) DnstapIO {
- return &dnstapIO{
+// New returns a new and initialized pointer to a dio.
+func New(endpoint string, socket bool) *dio {
+ return &dio{
endpoint: endpoint,
socket: socket,
enc: newDnstapEncoder(&fs.EncoderOptions{
@@ -44,74 +50,65 @@ func New(endpoint string, socket bool) DnstapIO {
}
}
-// DnstapIO interface
-type DnstapIO interface {
- Connect()
- Dnstap(payload tap.Dnstap)
- Close()
-}
-
-func (dio *dnstapIO) newConnect() error {
+func (d *dio) newConnect() error {
var err error
- if dio.socket {
- if dio.conn, err = net.Dial("unix", dio.endpoint); err != nil {
+ if d.socket {
+ if d.conn, err = net.Dial("unix", d.endpoint); err != nil {
return err
}
} else {
- if dio.conn, err = net.DialTimeout("tcp", dio.endpoint, tcpTimeout); err != nil {
+ if d.conn, err = net.DialTimeout("tcp", d.endpoint, tcpTimeout); err != nil {
return err
}
- if tcpConn, ok := dio.conn.(*net.TCPConn); ok {
+ if tcpConn, ok := d.conn.(*net.TCPConn); ok {
tcpConn.SetWriteBuffer(tcpWriteBufSize)
tcpConn.SetNoDelay(false)
}
}
- return dio.enc.resetWriter(dio.conn)
+ return d.enc.resetWriter(d.conn)
}
// Connect connects to the dnstap endpoint.
-func (dio *dnstapIO) Connect() {
- if err := dio.newConnect(); err != nil {
+func (d *dio) Connect() {
+ if err := d.newConnect(); err != nil {
log.Error("No connection to dnstap endpoint")
}
- go dio.serve()
+ go d.serve()
}
// Dnstap enqueues the payload for log.
-func (dio *dnstapIO) Dnstap(payload tap.Dnstap) {
+func (d *dio) Dnstap(payload tap.Dnstap) {
select {
- case dio.queue <- payload:
+ case d.queue <- payload:
default:
- atomic.AddUint32(&dio.dropped, 1)
+ atomic.AddUint32(&d.dropped, 1)
}
}
-func (dio *dnstapIO) closeConnection() {
- dio.enc.close()
- if dio.conn != nil {
- dio.conn.Close()
- dio.conn = nil
+func (d *dio) closeConnection() {
+ d.enc.close()
+ if d.conn != nil {
+ d.conn.Close()
+ d.conn = nil
}
}
// Close waits until the I/O routine is finished to return.
-func (dio *dnstapIO) Close() {
- close(dio.quit)
-}
+func (d *dio) Close() { close(d.quit) }
-func (dio *dnstapIO) flushBuffer() {
- if dio.conn == nil {
- if err := dio.newConnect(); err != nil {
+func (d *dio) flushBuffer() {
+ if d.conn == nil {
+ if err := d.newConnect(); err != nil {
return
}
log.Info("Reconnected to dnstap")
}
- if err := dio.enc.flushBuffer(); err != nil {
+ if err := d.enc.flushBuffer(); err != nil {
log.Warningf("Connection lost: %s", err)
- dio.closeConnection()
- if err := dio.newConnect(); err != nil {
+ d.closeConnection()
+ if err := d.newConnect(); err != nil {
log.Errorf("Cannot connect to dnstap: %s", err)
} else {
log.Info("Reconnected to dnstap")
@@ -119,27 +116,27 @@ func (dio *dnstapIO) flushBuffer() {
}
}
-func (dio *dnstapIO) write(payload *tap.Dnstap) {
- if err := dio.enc.writeMsg(payload); err != nil {
- atomic.AddUint32(&dio.dropped, 1)
+func (d *dio) write(payload *tap.Dnstap) {
+ if err := d.enc.writeMsg(payload); err != nil {
+ atomic.AddUint32(&d.dropped, 1)
}
}
-func (dio *dnstapIO) serve() {
+func (d *dio) serve() {
timeout := time.After(flushTimeout)
for {
select {
- case <-dio.quit:
- dio.flushBuffer()
- dio.closeConnection()
+ case <-d.quit:
+ d.flushBuffer()
+ d.closeConnection()
return
- case payload := <-dio.queue:
- dio.write(&payload)
+ case payload := <-d.queue:
+ d.write(&payload)
case <-timeout:
- if dropped := atomic.SwapUint32(&dio.dropped, 0); dropped > 0 {
+ if dropped := atomic.SwapUint32(&d.dropped, 0); dropped > 0 {
log.Warningf("Dropped dnstap messages: %d", dropped)
}
- dio.flushBuffer()
+ d.flushBuffer()
timeout = time.After(flushTimeout)
}
}