diff options
Diffstat (limited to 'plugin/dnstap/out/tcp.go')
-rw-r--r-- | plugin/dnstap/out/tcp.go | 59 |
1 files changed, 59 insertions, 0 deletions
diff --git a/plugin/dnstap/out/tcp.go b/plugin/dnstap/out/tcp.go new file mode 100644 index 000000000..8d2c25270 --- /dev/null +++ b/plugin/dnstap/out/tcp.go @@ -0,0 +1,59 @@ +package out + +import ( + "net" + "time" + + fs "github.com/farsightsec/golang-framestream" +) + +// TCP is a Frame Streams encoder over TCP. +type TCP struct { + address string + frames [][]byte +} + +// NewTCP returns a TCP writer. +func NewTCP(address string) *TCP { + s := &TCP{address: address} + s.frames = make([][]byte, 0, 13) // 13 messages buffer + return s +} + +// Write a single Frame Streams frame. +func (s *TCP) Write(frame []byte) (n int, err error) { + s.frames = append(s.frames, frame) + if len(s.frames) == cap(s.frames) { + return len(frame), s.Flush() + } + return len(frame), nil +} + +// Flush the remaining frames. +func (s *TCP) Flush() error { + defer func() { + s.frames = s.frames[0:] + }() + c, err := net.DialTimeout("tcp", s.address, time.Second) + if err != nil { + return err + } + enc, err := fs.NewEncoder(c, &fs.EncoderOptions{ + ContentType: []byte("protobuf:dnstap.Dnstap"), + Bidirectional: true, + }) + if err != nil { + return err + } + for _, frame := range s.frames { + if _, err = enc.Write(frame); err != nil { + return err + } + } + return enc.Flush() +} + +// Close is an alias to Flush to satisfy io.WriteCloser similarly to type Socket. +func (s *TCP) Close() error { + return s.Flush() +} |