aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--plugin/dnstap/dnstapio/io.go69
-rw-r--r--plugin/dnstap/dnstapio/io_test.go71
-rw-r--r--plugin/dnstap/handler.go9
-rw-r--r--plugin/dnstap/handler_test.go19
-rw-r--r--plugin/dnstap/msg/wrapper.go8
-rw-r--r--plugin/dnstap/out/tcp.go2
-rw-r--r--plugin/dnstap/setup.go8
7 files changed, 164 insertions, 22 deletions
diff --git a/plugin/dnstap/dnstapio/io.go b/plugin/dnstap/dnstapio/io.go
new file mode 100644
index 000000000..586def2ac
--- /dev/null
+++ b/plugin/dnstap/dnstapio/io.go
@@ -0,0 +1,69 @@
+package dnstapio
+
+import (
+ "fmt"
+ "io"
+
+ tap "github.com/dnstap/golang-dnstap"
+ "github.com/golang/protobuf/proto"
+)
+
+// DnstapIO wraps the dnstap I/O routine.
+type DnstapIO struct {
+ writer io.WriteCloser
+ queue chan tap.Dnstap
+ stop chan bool
+}
+
+// Protocol is either `out.TCP` or `out.Socket`.
+type Protocol interface {
+ // Write takes a single frame at once.
+ Write([]byte) (int, error)
+
+ Close() error
+}
+
+// New dnstap I/O routine from Protocol.
+func New(w Protocol) *DnstapIO {
+ dio := DnstapIO{}
+ dio.writer = w
+ dio.queue = make(chan tap.Dnstap, 10)
+ dio.stop = make(chan bool)
+ go dio.serve()
+ return &dio
+}
+
+// Dnstap enqueues the payload for log.
+func (dio *DnstapIO) Dnstap(payload tap.Dnstap) {
+ select {
+ case dio.queue <- payload:
+ default:
+ fmt.Println("[WARN] Dnstap payload dropped.")
+ }
+}
+
+func (dio *DnstapIO) serve() {
+ for {
+ select {
+ case payload := <-dio.queue:
+ frame, err := proto.Marshal(&payload)
+ if err == nil {
+ dio.writer.Write(frame)
+ } else {
+ fmt.Println("[ERROR] Invalid dnstap payload dropped.")
+ }
+ case <-dio.stop:
+ close(dio.queue)
+ dio.stop <- true
+ return
+ }
+ }
+}
+
+// Close waits until the I/O routine is finished to return.
+func (dio DnstapIO) Close() error {
+ dio.stop <- true
+ <-dio.stop
+ close(dio.stop)
+ return dio.writer.Close()
+}
diff --git a/plugin/dnstap/dnstapio/io_test.go b/plugin/dnstap/dnstapio/io_test.go
new file mode 100644
index 000000000..80b804752
--- /dev/null
+++ b/plugin/dnstap/dnstapio/io_test.go
@@ -0,0 +1,71 @@
+package dnstapio
+
+import (
+ "bytes"
+ "sync"
+ "testing"
+ "time"
+
+ tap "github.com/dnstap/golang-dnstap"
+)
+
+type buf struct {
+ *bytes.Buffer
+ cost time.Duration
+}
+
+func (b buf) Write(frame []byte) (int, error) {
+ time.Sleep(b.cost)
+ return b.Buffer.Write(frame)
+}
+
+func (b buf) Close() error {
+ return nil
+}
+
+func TestRace(t *testing.T) {
+ b := buf{&bytes.Buffer{}, 100 * time.Millisecond}
+ dio := New(b)
+ wg := &sync.WaitGroup{}
+ wg.Add(10)
+ for i := 0; i < 10; i++ {
+ timeout := time.After(time.Second)
+ go func() {
+ for {
+ select {
+ case <-timeout:
+ wg.Done()
+ return
+ default:
+ time.Sleep(50 * time.Millisecond)
+ dio.Dnstap(tap.Dnstap{})
+ }
+ }
+ }()
+ }
+ wg.Wait()
+}
+
+func TestClose(t *testing.T) {
+ done := make(chan bool)
+ var dio *DnstapIO
+ go func() {
+ b := buf{&bytes.Buffer{}, 0}
+ dio = New(b)
+ dio.Close()
+ close(done)
+ }()
+ select {
+ case <-done:
+ case <-time.After(time.Second):
+ t.Fatal("Not closing.")
+ }
+ func() {
+ defer func() {
+ if err := recover(); err == nil {
+ t.Fatal("Send on closed channel.")
+ }
+ }()
+ dio.Dnstap(tap.Dnstap{})
+ }()
+}
diff --git a/plugin/dnstap/handler.go b/plugin/dnstap/handler.go
index b20bb2ad9..b6a8afbe7 100644
--- a/plugin/dnstap/handler.go
+++ b/plugin/dnstap/handler.go
@@ -16,11 +16,15 @@ import (
// Dnstap is the dnstap handler.
type Dnstap struct {
Next plugin.Handler
- Out io.Writer
+ IO IORoutine
Pack bool
}
type (
+ // IORoutine is the dnstap I/O thread as defined by: <http://dnstap.info/Architecture>.
+ IORoutine interface {
+ Dnstap(tap.Dnstap)
+ }
// Tapper is implemented by the Context passed by the dnstap handler.
Tapper interface {
TapMessage(*tap.Message) error
@@ -49,7 +53,8 @@ func tapMessageTo(w io.Writer, m *tap.Message) error {
// TapMessage implements Tapper.
func (h Dnstap) TapMessage(m *tap.Message) error {
- return tapMessageTo(h.Out, m)
+ h.IO.Dnstap(msg.Wrap(m))
+ return nil
}
// TapBuilder implements Tapper.
diff --git a/plugin/dnstap/handler_test.go b/plugin/dnstap/handler_test.go
index 54509de82..617c8e675 100644
--- a/plugin/dnstap/handler_test.go
+++ b/plugin/dnstap/handler_test.go
@@ -1,21 +1,18 @@
package dnstap
import (
- "errors"
- "fmt"
"testing"
"github.com/coredns/coredns/plugin/dnstap/test"
mwtest "github.com/coredns/coredns/plugin/test"
tap "github.com/dnstap/golang-dnstap"
- "github.com/golang/protobuf/proto"
"github.com/miekg/dns"
"golang.org/x/net/context"
)
func testCase(t *testing.T, tapq, tapr *tap.Message, q, r *dns.Msg) {
- w := writer{}
+ w := writer{t: t}
w.queue = append(w.queue, tapq, tapr)
h := Dnstap{
Next: mwtest.HandlerFunc(func(_ context.Context,
@@ -23,7 +20,7 @@ func testCase(t *testing.T, tapq, tapr *tap.Message, q, r *dns.Msg) {
return 0, w.WriteMsg(r)
}),
- Out: &w,
+ IO: &w,
Pack: false,
}
_, err := h.ServeDNS(context.TODO(), &mwtest.ResponseWriter{}, q)
@@ -33,22 +30,18 @@ func testCase(t *testing.T, tapq, tapr *tap.Message, q, r *dns.Msg) {
}
type writer struct {
+ t *testing.T
queue []*tap.Message
}
-func (w *writer) Write(b []byte) (int, error) {
- e := tap.Dnstap{}
- if err := proto.Unmarshal(b, &e); err != nil {
- return 0, err
- }
+func (w *writer) Dnstap(e tap.Dnstap) {
if len(w.queue) == 0 {
- return 0, errors.New("message not expected")
+ w.t.Error("Message not expected.")
}
if !test.MsgEqual(w.queue[0], e.Message) {
- return 0, fmt.Errorf("want: %v, have: %v", w.queue[0], e.Message)
+ w.t.Errorf("want: %v, have: %v", w.queue[0], e.Message)
}
w.queue = w.queue[1:]
- return len(b), nil
}
func TestDnstap(t *testing.T) {
diff --git a/plugin/dnstap/msg/wrapper.go b/plugin/dnstap/msg/wrapper.go
index a74c604d8..3396b1342 100644
--- a/plugin/dnstap/msg/wrapper.go
+++ b/plugin/dnstap/msg/wrapper.go
@@ -7,9 +7,10 @@ import (
"github.com/golang/protobuf/proto"
)
-func wrap(m *lib.Message) *lib.Dnstap {
+// Wrap a dnstap message in the top-level dnstap type.
+func Wrap(m *lib.Message) lib.Dnstap {
t := lib.Dnstap_MESSAGE
- return &lib.Dnstap{
+ return lib.Dnstap{
Type: &t,
Message: m,
}
@@ -17,7 +18,8 @@ func wrap(m *lib.Message) *lib.Dnstap {
// Marshal encodes the message to a binary dnstap payload.
func Marshal(m *lib.Message) (data []byte, err error) {
- data, err = proto.Marshal(wrap(m))
+ payload := Wrap(m)
+ data, err = proto.Marshal(&payload)
if err != nil {
err = fmt.Errorf("proto: %s", err)
return
diff --git a/plugin/dnstap/out/tcp.go b/plugin/dnstap/out/tcp.go
index 8d2c25270..715c3024a 100644
--- a/plugin/dnstap/out/tcp.go
+++ b/plugin/dnstap/out/tcp.go
@@ -32,7 +32,7 @@ func (s *TCP) Write(frame []byte) (n int, err error) {
// Flush the remaining frames.
func (s *TCP) Flush() error {
defer func() {
- s.frames = s.frames[0:]
+ s.frames = s.frames[:0]
}()
c, err := net.DialTimeout("tcp", s.address, time.Second)
if err != nil {
diff --git a/plugin/dnstap/setup.go b/plugin/dnstap/setup.go
index a57873470..c1a8956a1 100644
--- a/plugin/dnstap/setup.go
+++ b/plugin/dnstap/setup.go
@@ -8,6 +8,7 @@ import (
"github.com/coredns/coredns/core/dnsserver"
"github.com/coredns/coredns/plugin"
+ "github.com/coredns/coredns/plugin/dnstap/dnstapio"
"github.com/coredns/coredns/plugin/dnstap/out"
"github.com/coredns/coredns/plugin/pkg/dnsutil"
@@ -79,11 +80,12 @@ func setup(c *caddy.Controller) error {
} else {
o = out.NewTCP(conf.target)
}
- dnstap.Out = o
+ dio := dnstapio.New(o)
+ dnstap.IO = dio
c.OnShutdown(func() error {
- if err := o.Close(); err != nil {
- return fmt.Errorf("output: %s", err)
+ if err := dio.Close(); err != nil {
+ return fmt.Errorf("dnstap io routine: %s", err)
}
return nil
})
vatar.com/avatar/1c8b274ef6dd37fbd668244859c0c436?s=13&d=retro' width='13' height='13' alt='Gravatar' /> tony-sull 4-17/+29 2022-09-29[@astrojs/image] adding caching support for SSG builds (#4909)Gravatar Tony Sullivan 8-10/+240 2022-09-29[ci] release (#4903)astro@1.4.0@astrojs/vue@1.1.0@astrojs/vercel@2.1.0@astrojs/telemetry@1.0.1@astrojs/tailwind@2.0.2@astrojs/svelte@1.0.1@astrojs/rss@1.0.2@astrojs/preact@1.1.1@astrojs/node@1.1.0@astrojs/netlify@1.1.0@astrojs/mdx@0.11.3@astrojs/markdown-remark@1.1.3@astrojs/image@0.8.1@astrojs/deno@1.1.0@astrojs/cloudflare@2.1.0Gravatar Fred K. Bot 65-235/+512 2022-09-29[ci] update lockfile (#4899)Gravatar Fred K. Bot 1-735/+718 2022-09-29[ci] formatGravatar matthewp 1-1/+3 2022-09-29fix trailing slash mismatch in dev vs build in docs example (#4912)Gravatar Rishi Raj Jain 1-1/+1 2022-09-29[ci] formatGravatar bluwy 1-1/+1 2022-09-29Support Vue JSX (#4897)Gravatar Bjorn Lu 12-5/+329 2022-09-28[ci] formatGravatar matthewp 1-8/+4 2022-09-28Fix CSS ordering between imported and Astro styles (#4907)Gravatar Matthew Phillips 12-7/+218 2022-09-28[ci] formatGravatar matthewp 23-137/+127 2022-09-28Astro.cookies implementation (#4876)Gravatar Matthew Phillips 32-29/+943 2022-09-28Fix: let Squoosh default image quality internally (#4906)Gravatar Tony Sullivan 5-11/+20 2022-09-28Update README.md (#4898)Gravatar stijlmassi 1-2/+3 2022-09-28Fix test (#4904)Gravatar Bjorn Lu 2-1/+7 2022-09-28[ci] formatGravatar FredKSchott 2-4/+4 2022-09-28redesign basics template (#4879)Gravatar Fred K. Schott 3-88/+34 2022-09-28[ci] formatGravatar bluwy 1-2/+2 2022-09-28Remove shamefully-hoist (#4842)Gravatar Bjorn Lu 104-527/+768 2022-09-28[ci] formatGravatar matthewp 4-14/+16 2022-09-28Hoist hydration script out of slot templates (#4891)Gravatar Matthew Phillips 13-43/+165 2022-09-28Ensure head content rendered once with lazy layouts (#4892)Gravatar Matthew Phillips 9-3/+59 2022-09-27fixed typing (#4893)Gravatar tweenietomatoes 1-1/+1 2022-09-27[ci] release (#4846)create-astro@1.1.0astro@1.3.1@astrojs/webapi@1.1.0@astrojs/vercel@2.0.1@astrojs/mdx@0.11.2@astrojs/image@0.8.0Gravatar Fred K. Bot 60-185/+169 2022-09-27fix: post API routes in SSG should warn or error during dev mode (#4878)Gravatar Rishi Raj Jain 3-2/+17 2022-09-27docs: Fix links to Tailwind examples (#4883)Gravatar Deanmv 1-1/+1 2022-09-27Set SSR target webworker for Vercel edge (#4884)Gravatar Bjorn Lu 2-0/+6 2022-09-27[ci] update lockfile (#4885)Gravatar Fred K. Bot 1-86/+79 2022-09-26[ci] formatGravatar bholmesdev 3-23/+19 2022-09-26Fix: correctly transform `import.meta.env.*` in MDX (#4858)Gravatar Ben Holmes 12-233/+454 2022-09-26Change negative lookbehind to lookahead (#4866)Gravatar Rishi Raj Jain 1-1/+1 2022-09-26add double check on astro file return type to display more human readable err...Gravatar Steven Yung 6-2/+61 2022-09-26[ci] update lockfile (#4862)Gravatar Fred K. Bot 1-81/+81 2022-09-26fix: Script with innerHTML not working on Safari (#4861)Gravatar Rishi Raj Jain 3-3/+10 2022-09-26Prevent /undefined catch-all routes in dev (#4873)Gravatar Bjorn Lu 6-9/+66 2022-09-26fix: 🐛 BUG: class:list directive adding class attribute when undefined (#4...Gravatar Rishi Raj Jain 2-2/+9 2022-09-26docs: Standardize common integration READMEs (#4874)Gravatar Jake Strawn 7-6/+66 2022-09-26docs: Update references to support channel in Discord. (#4872)Gravatar Jake Strawn 12-12/+12 2022-09-26[ci] formatGravatar bluwy 1-1/+1 2022-09-26fix: "chunks" directory appears in build output, if custom modules are import...Gravatar Rishi Raj Jain 2-6/+34 2022-09-23[ci] formatGravatar matthewp 1-1/+1 2022-09-23Define toStringTag another way (#4855)Gravatar Matthew Phillips 2-4/+12 2022-09-23update SSR example to match recent change on Astro API Context (#4854)Gravatar Steven Yung 2-4/+6 2022-09-23[ci] update lockfile (#4852)Gravatar Fred K. Bot 1-373/+402