aboutsummaryrefslogtreecommitdiff
path: root/plugin/forward/connect.go
blob: 63a0bfe508e16af910360a63e189d21c6f5a2450 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
// Package forward implements a forwarding proxy. It caches an upstream net.Conn for some time, so if the same
// client returns the upstream's Conn will be precached. Depending on how you benchmark this looks to be
// 50% faster than just opening a new connection for every client. It works with UDP and TCP and uses
// inband healthchecking.
package forward

import (
	"context"
	"io"
	"strconv"
	"sync/atomic"
	"time"

	"github.com/coredns/coredns/request"

	"github.com/miekg/dns"
)

// limitTimeout is a utility function to auto-tune timeout values
// average observed time is moved towards the last observed delay moderated by a weight
// next timeout to use will be the double of the computed average, limited by min and max frame.
func limitTimeout(currentAvg *int64, minValue time.Duration, maxValue time.Duration) time.Duration {
	rt := time.Duration(atomic.LoadInt64(currentAvg))
	if rt < minValue {
		return minValue
	}
	if rt < maxValue/2 {
		return 2 * rt
	}
	return maxValue
}

func averageTimeout(currentAvg *int64, observedDuration time.Duration, weight int64) {
	dt := time.Duration(atomic.LoadInt64(currentAvg))
	atomic.AddInt64(currentAvg, int64(observedDuration-dt)/weight)
}

func (t *Transport) dialTimeout() time.Duration {
	return limitTimeout(&t.avgDialTime, minDialTimeout, maxDialTimeout)
}

func (t *Transport) updateDialTimeout(newDialTime time.Duration) {
	averageTimeout(&t.avgDialTime, newDialTime, cumulativeAvgWeight)
}

// Dial dials the address configured in transport, potentially reusing a connection or creating a new one.
func (t *Transport) Dial(proto string) (*persistConn, bool, error) {
	// If tls has been configured; use it.
	if t.tlsConfig != nil {
		proto = "tcp-tls"
	}

	t.dial <- proto
	pc := <-t.ret

	if pc != nil {
		ConnCacheHitsCount.WithLabelValues(t.addr, proto).Add(1)
		return pc, true, nil
	}
	ConnCacheMissesCount.WithLabelValues(t.addr, proto).Add(1)

	reqTime := time.Now()
	timeout := t.dialTimeout()
	if proto == "tcp-tls" {
		conn, err := dns.DialTimeoutWithTLS("tcp", t.addr, t.tlsConfig, timeout)
		t.updateDialTimeout(time.Since(reqTime))
		return &persistConn{c: conn}, false, err
	}
	conn, err := dns.DialTimeout(proto, t.addr, timeout)
	t.updateDialTimeout(time.Since(reqTime))
	return &persistConn{c: conn}, false, err
}

// Connect selects an upstream, sends the request and waits for a response.
func (p *Proxy) Connect(ctx context.Context, state request.Request, opts options) (*dns.Msg, error) {
	start := time.Now()

	proto := ""
	switch {
	case opts.forceTCP: // TCP flag has precedence over UDP flag
		proto = "tcp"
	case opts.preferUDP:
		proto = "udp"
	default:
		proto = state.Proto()
	}

	pc, cached, err := p.transport.Dial(proto)
	if err != nil {
		return nil, err
	}

	// Set buffer size correctly for this client.
	pc.c.UDPSize = uint16(state.Size())
	if pc.c.UDPSize < 512 {
		pc.c.UDPSize = 512
	}

	pc.c.SetWriteDeadline(time.Now().Add(maxTimeout))
	if err := pc.c.WriteMsg(state.Req); err != nil {
		pc.c.Close() // not giving it back
		if err == io.EOF && cached {
			return nil, ErrCachedClosed
		}
		return nil, err
	}

	var ret *dns.Msg
	pc.c.SetReadDeadline(time.Now().Add(readTimeout))
	for {
		ret, err = pc.c.ReadMsg()
		if err != nil {
			pc.c.Close() // not giving it back
			if err == io.EOF && cached {
				return nil, ErrCachedClosed
			}
			return ret, err
		}
		// drop out-of-order responses
		if state.Req.Id == ret.Id {
			break
		}
	}

	p.transport.Yield(pc)

	rc, ok := dns.RcodeToString[ret.Rcode]
	if !ok {
		rc = strconv.Itoa(ret.Rcode)
	}

	RequestCount.WithLabelValues(p.addr).Add(1)
	RcodeCount.WithLabelValues(rc, p.addr).Add(1)
	RequestDuration.WithLabelValues(p.addr, rc).Observe(time.Since(start).Seconds())

	return ret, nil
}

const cumulativeAvgWeight = 4
6744034dd5e1a3c0?s=13&d=retro' width='13' height='13' alt='Gravatar' /> Jarred Sumner 1-0/+1 2022-03-16[bun.js] Bun.unsafe test should check the gcGravatar Jarred Sumner 1-4/+14 2022-03-16Update work_pool.zigGravatar Jarred Sumner 1-21/+28 2022-03-16Add a way to run serial tasks on a different threadGravatar Jarred Sumner 1-3/+65 2022-03-16fix crash when SyntaxError is thrown and we did not receive an ErrorInstance?Gravatar Jarred Sumner 1-18/+25 2022-03-16[bun.js] Fix release-mode test failures in HeadersGravatar Jarred Sumner 1-47/+42 2022-03-16Update ref_count.zigGravatar Jarred Sumner 1-2/+0 2022-03-15file is too bigjarred/replGravatar Jarred Sumner 1-113827/+0 2022-03-15Update Dockerfile.baseGravatar Jarred Sumner 1-1/+1 2022-03-15Add rust and lolhtml to dockerfileGravatar Jarred Sumner 2-0/+20 2022-03-15bump webkitGravatar Jarred Sumner 1-1/+1 2022-03-15Update WebKitGravatar Jarred Sumner 1-0/+0 2022-03-15:camera:Gravatar Jarred Sumner 60-799/+859 2022-03-15Fix test failureGravatar Jarred Sumner 1-15/+17 2022-03-15[bun:error] handle errors without a name or messageGravatar Jarred Sumner 1-6/+11 2022-03-15Update pool.zigGravatar Jarred Sumner 1-0/+1 2022-03-15Load .env by defaultGravatar Jarred Sumner 2-0/+8 2022-03-15mimalloc interpose is buggyGravatar Jarred Sumner 1-2/+25 2022-03-15higher max http requests for bun.jsGravatar Jarred Sumner 1-0/+29 2022-03-15zero copyGravatar Jarred Sumner 1-21/+15 2022-03-15Update javascript.zigGravatar Jarred Sumner 1-2/+0 2022-03-15[bun.js] utf8 console.{time, count, timeEnd, profile, profileEnd, count, ↵Gravatar Jarred Sumner 1-16/+16 countReset}