aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--plugin/dnstap/dnstapio/io.go15
1 files changed, 8 insertions, 7 deletions
diff --git a/plugin/dnstap/dnstapio/io.go b/plugin/dnstap/dnstapio/io.go
index 9390568f9..9a0013ba5 100644
--- a/plugin/dnstap/dnstapio/io.go
+++ b/plugin/dnstap/dnstapio/io.go
@@ -24,6 +24,7 @@ type dnstapIO struct {
enc *dnstapEncoder
queue chan tap.Dnstap
dropped uint32
+ quit chan struct{}
}
// New returns a new and initialized DnstapIO.
@@ -36,6 +37,7 @@ func New(endpoint string, socket bool) DnstapIO {
Bidirectional: true,
}),
queue: make(chan tap.Dnstap, queueSize),
+ quit: make(chan struct{}),
}
}
@@ -92,7 +94,7 @@ func (dio *dnstapIO) closeConnection() {
// Close waits until the I/O routine is finished to return.
func (dio *dnstapIO) Close() {
- close(dio.queue)
+ close(dio.quit)
}
func (dio *dnstapIO) flushBuffer() {
@@ -124,12 +126,11 @@ func (dio *dnstapIO) serve() {
timeout := time.After(flushTimeout)
for {
select {
- case payload, ok := <-dio.queue:
- if !ok {
- dio.flushBuffer()
- dio.closeConnection()
- return
- }
+ case <-dio.quit:
+ dio.flushBuffer()
+ dio.closeConnection()
+ return
+ case payload := <-dio.queue:
dio.write(&payload)
case <-timeout:
if dropped := atomic.SwapUint32(&dio.dropped, 0); dropped > 0 {