aboutsummaryrefslogtreecommitdiff
path: root/plugin/pkg/proxy/persistent.go
diff options
context:
space:
mode:
Diffstat (limited to 'plugin/pkg/proxy/persistent.go')
-rw-r--r--plugin/pkg/proxy/persistent.go156
1 files changed, 156 insertions, 0 deletions
diff --git a/plugin/pkg/proxy/persistent.go b/plugin/pkg/proxy/persistent.go
new file mode 100644
index 000000000..0908ce96c
--- /dev/null
+++ b/plugin/pkg/proxy/persistent.go
@@ -0,0 +1,156 @@
+package proxy
+
+import (
+ "crypto/tls"
+ "sort"
+ "time"
+
+ "github.com/miekg/dns"
+)
+
+// a persistConn hold the dns.Conn and the last used time.
+type persistConn struct {
+ c *dns.Conn
+ used time.Time
+}
+
+// Transport hold the persistent cache.
+type Transport struct {
+ avgDialTime int64 // kind of average time of dial time
+ conns [typeTotalCount][]*persistConn // Buckets for udp, tcp and tcp-tls.
+ expire time.Duration // After this duration a connection is expired.
+ addr string
+ tlsConfig *tls.Config
+
+ dial chan string
+ yield chan *persistConn
+ ret chan *persistConn
+ stop chan bool
+}
+
+func newTransport(addr string) *Transport {
+ t := &Transport{
+ avgDialTime: int64(maxDialTimeout / 2),
+ conns: [typeTotalCount][]*persistConn{},
+ expire: defaultExpire,
+ addr: addr,
+ dial: make(chan string),
+ yield: make(chan *persistConn),
+ ret: make(chan *persistConn),
+ stop: make(chan bool),
+ }
+ return t
+}
+
+// connManagers manages the persistent connection cache for UDP and TCP.
+func (t *Transport) connManager() {
+ ticker := time.NewTicker(defaultExpire)
+ defer ticker.Stop()
+Wait:
+ for {
+ select {
+ case proto := <-t.dial:
+ transtype := stringToTransportType(proto)
+ // take the last used conn - complexity O(1)
+ if stack := t.conns[transtype]; len(stack) > 0 {
+ pc := stack[len(stack)-1]
+ if time.Since(pc.used) < t.expire {
+ // Found one, remove from pool and return this conn.
+ t.conns[transtype] = stack[:len(stack)-1]
+ t.ret <- pc
+ continue Wait
+ }
+ // clear entire cache if the last conn is expired
+ t.conns[transtype] = nil
+ // now, the connections being passed to closeConns() are not reachable from
+ // transport methods anymore. So, it's safe to close them in a separate goroutine
+ go closeConns(stack)
+ }
+ t.ret <- nil
+
+ case pc := <-t.yield:
+ transtype := t.transportTypeFromConn(pc)
+ t.conns[transtype] = append(t.conns[transtype], pc)
+
+ case <-ticker.C:
+ t.cleanup(false)
+
+ case <-t.stop:
+ t.cleanup(true)
+ close(t.ret)
+ return
+ }
+ }
+}
+
+// closeConns closes connections.
+func closeConns(conns []*persistConn) {
+ for _, pc := range conns {
+ pc.c.Close()
+ }
+}
+
+// cleanup removes connections from cache.
+func (t *Transport) cleanup(all bool) {
+ staleTime := time.Now().Add(-t.expire)
+ for transtype, stack := range t.conns {
+ if len(stack) == 0 {
+ continue
+ }
+ if all {
+ t.conns[transtype] = nil
+ // now, the connections being passed to closeConns() are not reachable from
+ // transport methods anymore. So, it's safe to close them in a separate goroutine
+ go closeConns(stack)
+ continue
+ }
+ if stack[0].used.After(staleTime) {
+ continue
+ }
+
+ // connections in stack are sorted by "used"
+ good := sort.Search(len(stack), func(i int) bool {
+ return stack[i].used.After(staleTime)
+ })
+ t.conns[transtype] = stack[good:]
+ // now, the connections being passed to closeConns() are not reachable from
+ // transport methods anymore. So, it's safe to close them in a separate goroutine
+ go closeConns(stack[:good])
+ }
+}
+
+// It is hard to pin a value to this, the import thing is to no block forever, losing at cached connection is not terrible.
+const yieldTimeout = 25 * time.Millisecond
+
+// Yield returns the connection to transport for reuse.
+func (t *Transport) Yield(pc *persistConn) {
+ pc.used = time.Now() // update used time
+
+ // Make this non-blocking, because in the case of a very busy forwarder we will *block* on this yield. This
+ // blocks the outer go-routine and stuff will just pile up. We timeout when the send fails to as returning
+ // these connection is an optimization anyway.
+ select {
+ case t.yield <- pc:
+ return
+ case <-time.After(yieldTimeout):
+ return
+ }
+}
+
+// Start starts the transport's connection manager.
+func (t *Transport) Start() { go t.connManager() }
+
+// Stop stops the transport's connection manager.
+func (t *Transport) Stop() { close(t.stop) }
+
+// SetExpire sets the connection expire time in transport.
+func (t *Transport) SetExpire(expire time.Duration) { t.expire = expire }
+
+// SetTLSConfig sets the TLS config in transport.
+func (t *Transport) SetTLSConfig(cfg *tls.Config) { t.tlsConfig = cfg }
+
+const (
+ defaultExpire = 10 * time.Second
+ minDialTimeout = 1 * time.Second
+ maxDialTimeout = 30 * time.Second
+)