aboutsummaryrefslogtreecommitdiff
path: root/plugin/proxy
diff options
context:
space:
mode:
Diffstat (limited to 'plugin/proxy')
-rw-r--r--plugin/proxy/README.md175
-rw-r--r--plugin/proxy/dns.go106
-rw-r--r--plugin/proxy/dnstap_test.go57
-rw-r--r--plugin/proxy/exchanger.go22
-rw-r--r--plugin/proxy/google.go244
-rw-r--r--plugin/proxy/google_rr.go89
-rw-r--r--plugin/proxy/google_test.go5
-rw-r--r--plugin/proxy/grpc.go96
-rw-r--r--plugin/proxy/grpc_test.go71
-rw-r--r--plugin/proxy/lookup.go132
-rw-r--r--plugin/proxy/metrics.go30
-rw-r--r--plugin/proxy/proxy.go195
-rw-r--r--plugin/proxy/proxy_test.go87
-rw-r--r--plugin/proxy/response.go21
-rw-r--r--plugin/proxy/setup.go46
-rw-r--r--plugin/proxy/upstream.go234
-rw-r--r--plugin/proxy/upstream_test.go324
17 files changed, 1934 insertions, 0 deletions
diff --git a/plugin/proxy/README.md b/plugin/proxy/README.md
new file mode 100644
index 000000000..3cccf05ee
--- /dev/null
+++ b/plugin/proxy/README.md
@@ -0,0 +1,175 @@
+# proxy
+
+*proxy* facilitates both a basic reverse proxy and a robust load balancer.
+
+The proxy has support for multiple backends. The load balancing features include multiple policies,
+health checks, and failovers. If all hosts fail their health check the proxy plugin will fail
+back to randomly selecting a target and sending packets to it.
+
+## Syntax
+
+In its most basic form, a simple reverse proxy uses this syntax:
+
+~~~
+proxy FROM TO
+~~~
+
+* **FROM** is the base domain to match for the request to be proxied.
+* **TO** is the destination endpoint to proxy to.
+
+However, advanced features including load balancing can be utilized with an expanded syntax:
+
+~~~
+proxy FROM TO... {
+ policy random|least_conn|round_robin
+ fail_timeout DURATION
+ max_fails INTEGER
+ health_check PATH:PORT [DURATION]
+ except IGNORED_NAMES...
+ spray
+ protocol [dns [force_tcp]|https_google [bootstrap ADDRESS...]|grpc [insecure|CACERT|KEY CERT|KEY CERT CACERT]]
+}
+~~~
+
+* **FROM** is the name to match for the request to be proxied.
+* **TO** is the destination endpoint to proxy to. At least one is required, but multiple may be
+ specified. **TO** may be an IP:Port pair, or may reference a file in resolv.conf format
+* `policy` is the load balancing policy to use; applies only with multiple backends. May be one of
+ random, least_conn, or round_robin. Default is random.
+* `fail_timeout` specifies how long to consider a backend as down after it has failed. While it is
+ down, requests will not be routed to that backend. A backend is "down" if CoreDNS fails to
+ communicate with it. The default value is 10 seconds ("10s").
+* `max_fails` is the number of failures within fail_timeout that are needed before considering
+ a backend to be down. If 0, the backend will never be marked as down. Default is 1.
+* `health_check` will check path (on port) on each backend. If a backend returns a status code of
+ 200-399, then that backend is marked healthy for double the healthcheck duration. If it doesn't,
+ it is marked as unhealthy and no requests are routed to it. If this option is not provided then
+ health checks are disabled. The default duration is 30 seconds ("30s").
+* **IGNORED_NAMES** in `except` is a space-separated list of domains to exclude from proxying.
+ Requests that match none of these names will be passed through.
+* `spray` when all backends are unhealthy, randomly pick one to send the traffic to. (This is
+ a failsafe.)
+* `protocol` specifies what protocol to use to speak to an upstream, `dns` (the default) is plain
+ old DNS, and `https_google` uses `https://dns.google.com` and speaks a JSON DNS dialect. Note when
+ using this **TO** will be ignored. The `grpc` option will talk to a server that has implemented
+ the [DnsService](https://github.com/coredns/coredns/pb/dns.proto).
+ An out-of-tree plugin that implements the server side of this can be found at
+ [here](https://github.com/infobloxopen/coredns-grpc).
+
+## Policies
+
+There are three load-balancing policies available:
+* `random` (default) - Randomly select a backend
+* `least_conn` - Select the backend with the fewest active connections
+* `round_robin` - Select the backend in round-robin fashion
+
+All polices implement randomly spraying packets to backend hosts when *no healthy* hosts are
+available. This is to preeempt the case where the healthchecking (as a mechanism) fails.
+
+## Upstream Protocols
+
+Currently `protocol` supports `dns` (i.e., standard DNS over UDP/TCP) and `https_google` (JSON
+payload over HTTPS). Note that with `https_google` the entire transport is encrypted. Only *you* and
+*Google* can see your DNS activity.
+
+* `dns`: uses the standard DNS exchange. You can pass `force_tcp` to make sure that the proxied connection is performed
+ over TCP, regardless of the inbound request's protocol.
+* `https_google`: bootstrap **ADDRESS...** is used to (re-)resolve `dns.google.com` to an address to
+ connect to. This happens every 300s. If not specified the default is used: 8.8.8.8:53/8.8.4.4:53.
+ Note that **TO** is *ignored* when `https_google` is used, as its upstream is defined as
+ `dns.google.com`.
+
+ Debug queries are enabled by default and currently there is no way to turn them off. When CoreDNS
+ receives a debug query (i.e. the name is prefixed with `o-o.debug.`) a TXT record with Comment
+ from `dns.google.com` is added. Note this is not always set.
+* `grpc`: options are used to control how the TLS connection is made to the gRPC server.
+ * None - No client authentication is used, and the system CAs are used to verify the server certificate.
+ * `insecure` - TLS is not used, the connection is made in plaintext (not good in production).
+ * **CACERT** - No client authentication is used, and the file **CACERT** is used to verify the server certificate.
+ * **KEY** **CERT** - Client authentication is used with the specified key/cert pair. The server
+ certificate is verified with the system CAs.
+ * **KEY** **CERT** **CACERT** - Client authentication is used with the specified key/cert pair. The
+ server certificate is verified using the **CACERT** file.
+
+ An out-of-tree plugin that implements the server side of this can be found at
+ [here](https://github.com/infobloxopen/coredns-grpc).
+
+## Metrics
+
+If monitoring is enabled (via the *prometheus* directive) then the following metric is exported:
+
+* coredns_proxy_request_count_total{proto, proxy_proto, from}
+
+Where `proxy_proto` is the protocol used (`dns`, `grpc`, or `https_google`) and `from` is **FROM**
+specified in the config, `proto` is the protocol used by the incoming query ("tcp" or "udp").
+
+## Examples
+
+Proxy all requests within example.org. to a backend system:
+
+~~~
+proxy example.org 127.0.0.1:9005
+~~~
+
+Load-balance all requests between three backends (using random policy):
+
+~~~
+proxy . 10.0.0.10:53 10.0.0.11:1053 10.0.0.12
+~~~
+
+Same as above, but round-robin style:
+
+~~~
+proxy . 10.0.0.10:53 10.0.0.11:1053 10.0.0.12 {
+ policy round_robin
+}
+~~~
+
+With health checks and proxy headers to pass hostname, IP, and scheme upstream:
+
+~~~
+proxy . 10.0.0.11:53 10.0.0.11:53 10.0.0.12:53 {
+ policy round_robin
+ health_check /health:8080
+}
+~~~
+
+Proxy everything except requests to miek.nl or example.org
+
+~~~
+proxy . 10.0.0.10:1234 {
+ except miek.nl example.org
+}
+~~~
+
+Proxy everything except example.org using the host resolv.conf nameservers:
+
+~~~
+proxy . /etc/resolv.conf {
+ except miek.nl example.org
+}
+~~~
+
+Proxy all requests within example.org to Google's dns.google.com.
+
+~~~
+proxy example.org 1.2.3.4:53 {
+ protocol https_google
+}
+~~~
+
+Proxy everything with HTTPS to `dns.google.com`, except `example.org`. Then have another proxy in
+another stanza that uses plain DNS to resolve names under `example.org`.
+
+~~~
+. {
+ proxy . 1.2.3.4:53 {
+ except example.org
+ protocol https_google
+ }
+}
+
+example.org {
+ proxy . 8.8.8.8:53
+}
+~~~
diff --git a/plugin/proxy/dns.go b/plugin/proxy/dns.go
new file mode 100644
index 000000000..4d8038422
--- /dev/null
+++ b/plugin/proxy/dns.go
@@ -0,0 +1,106 @@
+package proxy
+
+import (
+ "context"
+ "net"
+ "time"
+
+ "github.com/coredns/coredns/request"
+
+ "github.com/miekg/dns"
+)
+
+type dnsEx struct {
+ Timeout time.Duration
+ Options
+}
+
+// Options define the options understood by dns.Exchange.
+type Options struct {
+ ForceTCP bool // If true use TCP for upstream no matter what
+}
+
+func newDNSEx() *dnsEx {
+ return newDNSExWithOption(Options{})
+}
+
+func newDNSExWithOption(opt Options) *dnsEx {
+ return &dnsEx{Timeout: defaultTimeout * time.Second, Options: opt}
+}
+
+func (d *dnsEx) Transport() string {
+ if d.Options.ForceTCP {
+ return "tcp"
+ }
+
+ // The protocol will be determined by `state.Proto()` during Exchange.
+ return ""
+}
+func (d *dnsEx) Protocol() string { return "dns" }
+func (d *dnsEx) OnShutdown(p *Proxy) error { return nil }
+func (d *dnsEx) OnStartup(p *Proxy) error { return nil }
+
+// Exchange implements the Exchanger interface.
+func (d *dnsEx) Exchange(ctx context.Context, addr string, state request.Request) (*dns.Msg, error) {
+ proto := state.Proto()
+ if d.Options.ForceTCP {
+ proto = "tcp"
+ }
+ co, err := net.DialTimeout(proto, addr, d.Timeout)
+ if err != nil {
+ return nil, err
+ }
+
+ reply, _, err := d.ExchangeConn(state.Req, co)
+
+ co.Close()
+
+ if reply != nil && reply.Truncated {
+ // Suppress proxy error for truncated responses
+ err = nil
+ }
+
+ if err != nil {
+ return nil, err
+ }
+ // Make sure it fits in the DNS response.
+ reply, _ = state.Scrub(reply)
+ reply.Compress = true
+ reply.Id = state.Req.Id
+
+ return reply, nil
+}
+
+func (d *dnsEx) ExchangeConn(m *dns.Msg, co net.Conn) (*dns.Msg, time.Duration, error) {
+ start := time.Now()
+ r, err := exchange(m, co)
+ rtt := time.Since(start)
+
+ return r, rtt, err
+}
+
+func exchange(m *dns.Msg, co net.Conn) (*dns.Msg, error) {
+ opt := m.IsEdns0()
+
+ udpsize := uint16(dns.MinMsgSize)
+ // If EDNS0 is used use that for size.
+ if opt != nil && opt.UDPSize() >= dns.MinMsgSize {
+ udpsize = opt.UDPSize()
+ }
+
+ dnsco := &dns.Conn{Conn: co, UDPSize: udpsize}
+
+ writeDeadline := time.Now().Add(defaultTimeout)
+ dnsco.SetWriteDeadline(writeDeadline)
+ dnsco.WriteMsg(m)
+
+ readDeadline := time.Now().Add(defaultTimeout)
+ co.SetReadDeadline(readDeadline)
+ r, err := dnsco.ReadMsg()
+
+ dnsco.Close()
+ if r == nil {
+ return nil, err
+ }
+ return r, err
+}
diff --git a/plugin/proxy/dnstap_test.go b/plugin/proxy/dnstap_test.go
new file mode 100644
index 000000000..05169a1ca
--- /dev/null
+++ b/plugin/proxy/dnstap_test.go
@@ -0,0 +1,57 @@
+package proxy
+
+import (
+ "testing"
+
+ "github.com/coredns/coredns/plugin/dnstap/msg"
+ "github.com/coredns/coredns/plugin/dnstap/test"
+ mwtest "github.com/coredns/coredns/plugin/test"
+ "github.com/coredns/coredns/request"
+
+ tap "github.com/dnstap/golang-dnstap"
+ "github.com/miekg/dns"
+ "golang.org/x/net/context"
+)
+
+func testCase(t *testing.T, ex Exchanger, q, r *dns.Msg, datq, datr *msg.Data) {
+ tapq := datq.ToOutsideQuery(tap.Message_FORWARDER_QUERY)
+ tapr := datr.ToOutsideResponse(tap.Message_FORWARDER_RESPONSE)
+ ctx := test.Context{}
+ err := toDnstap(&ctx, "10.240.0.1:40212", ex,
+ request.Request{W: &mwtest.ResponseWriter{}, Req: q}, r, 0, 0)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if len(ctx.Trap) != 2 {
+ t.Fatalf("messages: %d", len(ctx.Trap))
+ }
+ if !test.MsgEqual(ctx.Trap[0], tapq) {
+ t.Errorf("want: %v\nhave: %v", tapq, ctx.Trap[0])
+ }
+ if !test.MsgEqual(ctx.Trap[1], tapr) {
+ t.Errorf("want: %v\nhave: %v", tapr, ctx.Trap[1])
+ }
+}
+
+func TestDnstap(t *testing.T) {
+ q := mwtest.Case{Qname: "example.org", Qtype: dns.TypeA}.Msg()
+ r := mwtest.Case{
+ Qname: "example.org.", Qtype: dns.TypeA,
+ Answer: []dns.RR{
+ mwtest.A("example.org. 3600 IN A 10.0.0.1"),
+ },
+ }.Msg()
+ tapq, tapr := test.TestingData(), test.TestingData()
+ testCase(t, newDNSEx(), q, r, tapq, tapr)
+ tapq.SocketProto = tap.SocketProtocol_TCP
+ tapr.SocketProto = tap.SocketProtocol_TCP
+ testCase(t, newDNSExWithOption(Options{ForceTCP: true}), q, r, tapq, tapr)
+ testCase(t, newGoogle("", []string{"8.8.8.8:53", "8.8.4.4:53"}), q, r, tapq, tapr)
+}
+
+func TestNoDnstap(t *testing.T) {
+ err := toDnstap(context.TODO(), "", nil, request.Request{}, nil, 0, 0)
+ if err != nil {
+ t.Fatal(err)
+ }
+}
diff --git a/plugin/proxy/exchanger.go b/plugin/proxy/exchanger.go
new file mode 100644
index 000000000..b98a687e7
--- /dev/null
+++ b/plugin/proxy/exchanger.go
@@ -0,0 +1,22 @@
+package proxy
+
+import (
+ "context"
+
+ "github.com/coredns/coredns/request"
+ "github.com/miekg/dns"
+)
+
+// Exchanger is an interface that specifies a type implementing a DNS resolver that
+// can use whatever transport it likes.
+type Exchanger interface {
+ Exchange(ctx context.Context, addr string, state request.Request) (*dns.Msg, error)
+ Protocol() string
+
+ // Transport returns the only transport protocol used by this Exchanger or "".
+ // If the return value is "", Exchange must use `state.Proto()`.
+ Transport() string
+
+ OnStartup(*Proxy) error
+ OnShutdown(*Proxy) error
+}
diff --git a/plugin/proxy/google.go b/plugin/proxy/google.go
new file mode 100644
index 000000000..ecc5e6dfd
--- /dev/null
+++ b/plugin/proxy/google.go
@@ -0,0 +1,244 @@
+package proxy
+
+import (
+ "context"
+ "crypto/tls"
+ "encoding/json"
+ "fmt"
+ "io/ioutil"
+ "log"
+ "net"
+ "net/http"
+ "net/url"
+ "sync/atomic"
+ "time"
+
+ "github.com/coredns/coredns/plugin/pkg/healthcheck"
+ "github.com/coredns/coredns/request"
+
+ "github.com/miekg/dns"
+)
+
+type google struct {
+ client *http.Client
+
+ endpoint string // Name to resolve via 'bootstrapProxy'
+
+ bootstrapProxy Proxy
+ quit chan bool
+}
+
+func newGoogle(endpoint string, bootstrap []string) *google {
+ if endpoint == "" {
+ endpoint = ghost
+ }
+ tls := &tls.Config{ServerName: endpoint}
+ client := &http.Client{
+ Timeout: time.Second * defaultTimeout,
+ Transport: &http.Transport{TLSClientConfig: tls},
+ }
+
+ boot := NewLookup(bootstrap)
+
+ return &google{client: client, endpoint: dns.Fqdn(endpoint), bootstrapProxy: boot, quit: make(chan bool)}
+}
+
+func (g *google) Exchange(ctx context.Context, addr string, state request.Request) (*dns.Msg, error) {
+ v := url.Values{}
+
+ v.Set("name", state.Name())
+ v.Set("type", fmt.Sprintf("%d", state.QType()))
+
+ buf, backendErr := g.exchangeJSON(addr, v.Encode())
+
+ if backendErr == nil {
+ gm := new(googleMsg)
+ if err := json.Unmarshal(buf, gm); err != nil {
+ return nil, err
+ }
+
+ m, err := toMsg(gm)
+ if err != nil {
+ return nil, err
+ }
+
+ m.Id = state.Req.Id
+ return m, nil
+ }
+
+ log.Printf("[WARNING] Failed to connect to HTTPS backend %q: %s", g.endpoint, backendErr)
+ return nil, backendErr
+}
+
+func (g *google) exchangeJSON(addr, json string) ([]byte, error) {
+ url := "https://" + addr + "/resolve?" + json
+ req, err := http.NewRequest("GET", url, nil)
+ if err != nil {
+ return nil, err
+ }
+
+ req.Host = g.endpoint // TODO(miek): works with the extra dot at the end?
+
+ resp, err := g.client.Do(req)
+ if err != nil {
+ return nil, err
+ }
+
+ buf, err := ioutil.ReadAll(resp.Body)
+ resp.Body.Close()
+ if err != nil {
+ return nil, err
+ }
+
+ if resp.StatusCode != 200 {
+ return nil, fmt.Errorf("failed to get 200 status code, got %d", resp.StatusCode)
+ }
+
+ return buf, nil
+}
+
+func (g *google) Transport() string { return "tcp" }
+func (g *google) Protocol() string { return "https_google" }
+
+func (g *google) OnShutdown(p *Proxy) error {
+ g.quit <- true
+ return nil
+}
+
+func (g *google) OnStartup(p *Proxy) error {
+ // We fake a state because normally the proxy is called after we already got a incoming query.
+ // This is a non-edns0, udp request to g.endpoint.
+ req := new(dns.Msg)
+ req.SetQuestion(g.endpoint, dns.TypeA)
+ state := request.Request{W: new(fakeBootWriter), Req: req}
+
+ if len(*p.Upstreams) == 0 {
+ return fmt.Errorf("no upstreams defined")
+ }
+
+ oldUpstream := (*p.Upstreams)[0]
+
+ log.Printf("[INFO] Bootstrapping A records %q", g.endpoint)
+
+ new, err := g.bootstrapProxy.Lookup(state, g.endpoint, dns.TypeA)
+ if err != nil {
+ log.Printf("[WARNING] Failed to bootstrap A records %q: %s", g.endpoint, err)
+ } else {
+ addrs, err1 := extractAnswer(new)
+ if err1 != nil {
+ log.Printf("[WARNING] Failed to bootstrap A records %q: %s", g.endpoint, err1)
+ } else {
+
+ up := newUpstream(addrs, oldUpstream.(*staticUpstream))
+ p.Upstreams = &[]Upstream{up}
+
+ log.Printf("[INFO] Bootstrapping A records %q found: %v", g.endpoint, addrs)
+ }
+ }
+
+ go func() {
+ tick := time.NewTicker(120 * time.Second)
+
+ for {
+ select {
+ case <-tick.C:
+
+ log.Printf("[INFO] Resolving A records %q", g.endpoint)
+
+ new, err := g.bootstrapProxy.Lookup(state, g.endpoint, dns.TypeA)
+ if err != nil {
+ log.Printf("[WARNING] Failed to resolve A records %q: %s", g.endpoint, err)
+ continue
+ }
+
+ addrs, err1 := extractAnswer(new)
+ if err1 != nil {
+ log.Printf("[WARNING] Failed to resolve A records %q: %s", g.endpoint, err1)
+ continue
+ }
+
+ up := newUpstream(addrs, oldUpstream.(*staticUpstream))
+ p.Upstreams = &[]Upstream{up}
+
+ log.Printf("[INFO] Resolving A records %q found: %v", g.endpoint, addrs)
+
+ case <-g.quit:
+ return
+ }
+ }
+ }()
+
+ return nil
+}
+
+func extractAnswer(m *dns.Msg) ([]string, error) {
+ if len(m.Answer) == 0 {
+ return nil, fmt.Errorf("no answer section in response")
+ }
+ ret := []string{}
+ for _, an := range m.Answer {
+ if a, ok := an.(*dns.A); ok {
+ ret = append(ret, net.JoinHostPort(a.A.String(), "443"))
+ }
+ }
+ if len(ret) > 0 {
+ return ret, nil
+ }
+
+ return nil, fmt.Errorf("no address records in answer section")
+}
+
+// newUpstream returns an upstream initialized with hosts.
+func newUpstream(hosts []string, old *staticUpstream) Upstream {
+ upstream := &staticUpstream{
+ from: old.from,
+ HealthCheck: healthcheck.HealthCheck{
+ FailTimeout: 10 * time.Second,
+ MaxFails: 3,
+ Future: 60 * time.Second,
+ },
+ ex: old.ex,
+ WithoutPathPrefix: old.WithoutPathPrefix,
+ IgnoredSubDomains: old.IgnoredSubDomains,
+ }
+
+ upstream.Hosts = make([]*healthcheck.UpstreamHost, len(hosts))
+ for i, h := range hosts {
+ uh := &healthcheck.UpstreamHost{
+ Name: h,
+ Conns: 0,
+ Fails: 0,
+ FailTimeout: upstream.FailTimeout,
+
+ CheckDown: func(upstream *staticUpstream) healthcheck.UpstreamHostDownFunc {
+ return func(uh *healthcheck.UpstreamHost) bool {
+
+ down := false
+
+ uh.CheckMu.Lock()
+ until := uh.OkUntil
+ uh.CheckMu.Unlock()
+
+ if !until.IsZero() && time.Now().After(until) {
+ down = true
+ }
+
+ fails := atomic.LoadInt32(&uh.Fails)
+ if fails >= upstream.MaxFails && upstream.MaxFails != 0 {
+ down = true
+ }
+ return down
+ }
+ }(upstream),
+ WithoutPathPrefix: upstream.WithoutPathPrefix,
+ }
+
+ upstream.Hosts[i] = uh
+ }
+ return upstream
+}
+
+const (
+ // Default endpoint for this service.
+ ghost = "dns.google.com."
+)
diff --git a/plugin/proxy/google_rr.go b/plugin/proxy/google_rr.go
new file mode 100644
index 000000000..3b9233b7b
--- /dev/null
+++ b/plugin/proxy/google_rr.go
@@ -0,0 +1,89 @@
+package proxy
+
+import (
+ "fmt"
+
+ "github.com/miekg/dns"
+)
+
+// toMsg converts a googleMsg into the dns message.
+func toMsg(g *googleMsg) (*dns.Msg, error) {
+ m := new(dns.Msg)
+ m.Response = true
+ m.Rcode = g.Status
+ m.Truncated = g.TC
+ m.RecursionDesired = g.RD
+ m.RecursionAvailable = g.RA
+ m.AuthenticatedData = g.AD
+ m.CheckingDisabled = g.CD
+
+ m.Question = make([]dns.Question, 1)
+ m.Answer = make([]dns.RR, len(g.Answer))
+ m.Ns = make([]dns.RR, len(g.Authority))
+ m.Extra = make([]dns.RR, len(g.Additional))
+
+ m.Question[0] = dns.Question{Name: g.Question[0].Name, Qtype: g.Question[0].Type, Qclass: dns.ClassINET}
+
+ var err error
+ for i := 0; i < len(m.Answer); i++ {
+ m.Answer[i], err = toRR(g.Answer[i])
+ if err != nil {
+ return nil, err
+ }
+ }
+ for i := 0; i < len(m.Ns); i++ {
+ m.Ns[i], err = toRR(g.Authority[i])
+ if err != nil {
+ return nil, err
+ }
+ }
+ for i := 0; i < len(m.Extra); i++ {
+ m.Extra[i], err = toRR(g.Additional[i])
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ return m, nil
+}
+
+// toRR transforms a "google" RR to a dns.RR.
+func toRR(g googleRR) (dns.RR, error) {
+ typ, ok := dns.TypeToString[g.Type]
+ if !ok {
+ return nil, fmt.Errorf("failed to convert type %q", g.Type)
+ }
+
+ str := fmt.Sprintf("%s %d %s %s", g.Name, g.TTL, typ, g.Data)
+ rr, err := dns.NewRR(str)
+ if err != nil {
+ return nil, fmt.Errorf("failed to parse %q: %s", str, err)
+ }
+ return rr, nil
+}
+
+// googleRR represents a dns.RR in another form.
+type googleRR struct {
+ Name string
+ Type uint16
+ TTL uint32
+ Data string
+}
+
+// googleMsg is a JSON representation of the dns.Msg.
+type googleMsg struct {
+ Status int
+ TC bool
+ RD bool
+ RA bool
+ AD bool
+ CD bool
+ Question []struct {
+ Name string
+ Type uint16
+ }
+ Answer []googleRR
+ Authority []googleRR
+ Additional []googleRR
+ Comment string
+}
diff --git a/plugin/proxy/google_test.go b/plugin/proxy/google_test.go
new file mode 100644
index 000000000..1ce591664
--- /dev/null
+++ b/plugin/proxy/google_test.go
@@ -0,0 +1,5 @@
+package proxy
+
+// TODO(miek):
+// Test cert failures - put those in SERVFAIL messages, but attach error code in TXT
+// Test connecting to a a bad host.
diff --git a/plugin/proxy/grpc.go b/plugin/proxy/grpc.go
new file mode 100644
index 000000000..f98fd2e91
--- /dev/null
+++ b/plugin/proxy/grpc.go
@@ -0,0 +1,96 @@
+package proxy
+
+import (
+ "context"
+ "crypto/tls"
+ "log"
+
+ "github.com/coredns/coredns/pb"
+ "github.com/coredns/coredns/plugin/pkg/trace"
+ "github.com/coredns/coredns/request"
+
+ "github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc"
+ "github.com/miekg/dns"
+ opentracing "github.com/opentracing/opentracing-go"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials"
+)
+
+type grpcClient struct {
+ dialOpts []grpc.DialOption
+ clients map[string]pb.DnsServiceClient
+ conns []*grpc.ClientConn
+ upstream *staticUpstream
+}
+
+func newGrpcClient(tls *tls.Config, u *staticUpstream) *grpcClient {
+ g := &grpcClient{upstream: u}
+
+ if tls == nil {
+ g.dialOpts = append(g.dialOpts, grpc.WithInsecure())
+ } else {
+ g.dialOpts = append(g.dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(tls)))
+ }
+ g.clients = map[string]pb.DnsServiceClient{}
+
+ return g
+}
+
+func (g *grpcClient) Exchange(ctx context.Context, addr string, state request.Request) (*dns.Msg, error) {
+ msg, err := state.Req.Pack()
+ if err != nil {
+ return nil, err
+ }
+
+ reply, err := g.clients[addr].Query(ctx, &pb.DnsPacket{Msg: msg})
+ if err != nil {
+ return nil, err
+ }
+ d := new(dns.Msg)
+ err = d.Unpack(reply.Msg)
+ if err != nil {
+ return nil, err
+ }
+ return d, nil
+}
+
+func (g *grpcClient) Transport() string { return "tcp" }
+
+func (g *grpcClient) Protocol() string { return "grpc" }
+
+func (g *grpcClient) OnShutdown(p *Proxy) error {
+ g.clients = map[string]pb.DnsServiceClient{}
+ for i, conn := range g.conns {
+ err := conn.Close()
+ if err != nil {
+ log.Printf("[WARNING] Error closing connection %d: %s\n", i, err)
+ }
+ }
+ g.conns = []*grpc.ClientConn{}
+ return nil
+}
+
+func (g *grpcClient) OnStartup(p *Proxy) error {
+ dialOpts := g.dialOpts
+ if p.Trace != nil {
+ if t, ok := p.Trace.(trace.Trace); ok {
+ onlyIfParent := func(parentSpanCtx opentracing.SpanContext, method string, req, resp interface{}) bool {
+ return parentSpanCtx != nil
+ }
+ intercept := otgrpc.OpenTracingClientInterceptor(t.Tracer(), otgrpc.IncludingSpans(onlyIfParent))
+ dialOpts = append(dialOpts, grpc.WithUnaryInterceptor(intercept))
+ } else {
+ log.Printf("[WARNING] Wrong type for trace plugin reference: %s", p.Trace)
+ }
+ }
+ for _, host := range g.upstream.Hosts {
+ conn, err := grpc.Dial(host.Name, dialOpts...)
+ if err != nil {
+ log.Printf("[WARNING] Skipping gRPC host '%s' due to Dial error: %s\n", host.Name, err)
+ } else {
+ g.clients[host.Name] = pb.NewDnsServiceClient(conn)
+ g.conns = append(g.conns, conn)
+ }
+ }
+ return nil
+}
diff --git a/plugin/proxy/grpc_test.go b/plugin/proxy/grpc_test.go
new file mode 100644
index 000000000..52c5737d6
--- /dev/null
+++ b/plugin/proxy/grpc_test.go
@@ -0,0 +1,71 @@
+package proxy
+
+import (
+ "testing"
+ "time"
+
+ "github.com/coredns/coredns/plugin/pkg/healthcheck"
+
+ "google.golang.org/grpc/grpclog"
+)
+
+func pool() []*healthcheck.UpstreamHost {
+ return []*healthcheck.UpstreamHost{
+ {
+ Name: "localhost:10053",
+ },
+ {
+ Name: "localhost:10054",
+ },
+ }
+}
+
+func TestStartupShutdown(t *testing.T) {
+ grpclog.SetLogger(discard{})
+
+ upstream := &staticUpstream{
+ from: ".",
+ HealthCheck: healthcheck.HealthCheck{
+ Hosts: pool(),
+ FailTimeout: 10 * time.Second,
+ Future: 60 * time.Second,
+ MaxFails: 1,
+ },
+ }
+ g := newGrpcClient(nil, upstream)
+ upstream.ex = g
+
+ p := &Proxy{}
+ p.Upstreams = &[]Upstream{upstream}
+
+ err := g.OnStartup(p)
+ if err != nil {
+ t.Errorf("Error starting grpc client exchanger: %s", err)
+ return
+ }
+ if len(g.clients) != len(pool()) {
+ t.Errorf("Expected %d grpc clients but found %d", len(pool()), len(g.clients))
+ }
+
+ err = g.OnShutdown(p)
+ if err != nil {
+ t.Errorf("Error stopping grpc client exchanger: %s", err)
+ return
+ }
+ if len(g.clients) != 0 {
+ t.Errorf("Shutdown didn't remove clients, found %d", len(g.clients))
+ }
+ if len(g.conns) != 0 {
+ t.Errorf("Shutdown didn't remove conns, found %d", len(g.conns))
+ }
+}
+
+// discard is a Logger that outputs nothing.
+type discard struct{}
+
+func (d discard) Fatal(args ...interface{}) {}
+func (d discard) Fatalf(format string, args ...interface{}) {}
+func (d discard) Fatalln(args ...interface{}) {}
+func (d discard) Print(args ...interface{}) {}
+func (d discard) Printf(format string, args ...interface{}) {}
+func (d discard) Println(args ...interface{}) {}
diff --git a/plugin/proxy/lookup.go b/plugin/proxy/lookup.go
new file mode 100644
index 000000000..9be62edd5
--- /dev/null
+++ b/plugin/proxy/lookup.go
@@ -0,0 +1,132 @@
+package proxy
+
+// functions other plugin might want to use to do lookup in the same style as the proxy.
+
+import (
+ "context"
+ "fmt"
+ "sync/atomic"
+ "time"
+
+ "github.com/coredns/coredns/plugin/pkg/healthcheck"
+ "github.com/coredns/coredns/request"
+
+ "github.com/miekg/dns"
+)
+
+// NewLookup create a new proxy with the hosts in host and a Random policy.
+func NewLookup(hosts []string) Proxy { return NewLookupWithOption(hosts, Options{}) }
+
+// NewLookupWithOption process creates a simple round robin forward with potentially forced proto for upstream.
+func NewLookupWithOption(hosts []string, opts Options) Proxy {
+ p := Proxy{Next: nil}
+
+ // TODO(miek): this needs to be unified with upstream.go's NewStaticUpstreams, caddy uses NewHost
+ // we should copy/make something similar.
+ upstream := &staticUpstream{
+ from: ".",
+ HealthCheck: healthcheck.HealthCheck{
+ FailTimeout: 10 * time.Second,
+ MaxFails: 3, // TODO(miek): disable error checking for simple lookups?
+ Future: 60 * time.Second,
+ },
+ ex: newDNSExWithOption(opts),
+ }
+ upstream.Hosts = make([]*healthcheck.UpstreamHost, len(hosts))
+
+ for i, host := range hosts {
+ uh := &healthcheck.UpstreamHost{
+ Name: host,
+ Conns: 0,
+ Fails: 0,
+ FailTimeout: upstream.FailTimeout,
+
+ CheckDown: func(upstream *staticUpstream) healthcheck.UpstreamHostDownFunc {
+ return func(uh *healthcheck.UpstreamHost) bool {
+
+ down := false
+
+ uh.CheckMu.Lock()
+ until := uh.OkUntil
+ uh.CheckMu.Unlock()
+
+ if !until.IsZero() && time.Now().After(until) {
+ down = true
+ }
+
+ fails := atomic.LoadInt32(&uh.Fails)
+ if fails >= upstream.MaxFails && upstream.MaxFails != 0 {
+ down = true
+ }
+ return down
+ }
+ }(upstream),
+ WithoutPathPrefix: upstream.WithoutPathPrefix,
+ }
+
+ upstream.Hosts[i] = uh
+ }
+ p.Upstreams = &[]Upstream{upstream}
+ return p
+}
+
+// Lookup will use name and type to forge a new message and will send that upstream. It will
+// set any EDNS0 options correctly so that downstream will be able to process the reply.
+func (p Proxy) Lookup(state request.Request, name string, typ uint16) (*dns.Msg, error) {
+ req := new(dns.Msg)
+ req.SetQuestion(name, typ)
+ state.SizeAndDo(req)
+
+ state2 := request.Request{W: state.W, Req: req}
+
+ return p.lookup(state2)
+}
+
+// Forward forward the request in state as-is. Unlike Lookup that adds EDNS0 suffix to the message.
+func (p Proxy) Forward(state request.Request) (*dns.Msg, error) {
+ return p.lookup(state)
+}
+
+func (p Proxy) lookup(state request.Request) (*dns.Msg, error) {
+ upstream := p.match(state)
+ if upstream == nil {
+ return nil, errInvalidDomain
+ }
+ for {
+ start := time.Now()
+ reply := new(dns.Msg)
+ var backendErr error
+
+ // Since Select() should give us "up" hosts, keep retrying
+ // hosts until timeout (or until we get a nil host).
+ for time.Since(start) < tryDuration {
+ host := upstream.Select()
+ if host == nil {
+ return nil, fmt.Errorf("%s: %s", errUnreachable, "no upstream host")
+ }
+
+ // duplicated from proxy.go, but with a twist, we don't write the
+ // reply back to the client, we return it and there is no monitoring.
+
+ atomic.AddInt64(&host.Conns, 1)
+
+ reply, backendErr = upstream.Exchanger().Exchange(context.TODO(), host.Name, state)
+
+ atomic.AddInt64(&host.Conns, -1)
+
+ if backendErr == nil {
+ return reply, nil
+ }
+ timeout := host.FailTimeout
+ if timeout == 0 {
+ timeout = 10 * time.Second
+ }
+ atomic.AddInt32(&host.Fails, 1)
+ go func(host *healthcheck.UpstreamHost, timeout time.Duration) {
+ time.Sleep(timeout)
+ atomic.AddInt32(&host.Fails, -1)
+ }(host, timeout)
+ }
+ return nil, fmt.Errorf("%s: %s", errUnreachable, backendErr)
+ }
+}
diff --git a/plugin/proxy/metrics.go b/plugin/proxy/metrics.go
new file mode 100644
index 000000000..893c26d6b
--- /dev/null
+++ b/plugin/proxy/metrics.go
@@ -0,0 +1,30 @@
+package proxy
+
+import (
+ "sync"
+
+ "github.com/coredns/coredns/plugin"
+
+ "github.com/prometheus/client_golang/prometheus"
+)
+
+// Metrics the proxy plugin exports.
+var (
+ RequestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
+ Namespace: plugin.Namespace,
+ Subsystem: "proxy",
+ Name: "request_duration_milliseconds",
+ Buckets: append(prometheus.DefBuckets, []float64{50, 100, 200, 500, 1000, 2000, 3000, 4000, 5000, 10000}...),
+ Help: "Histogram of the time (in milliseconds) each request took.",
+ }, []string{"proto", "proxy_proto", "from"})
+)
+
+// OnStartupMetrics sets up the metrics on startup. This is done for all proxy protocols.
+func OnStartupMetrics() error {
+ metricsOnce.Do(func() {
+ prometheus.MustRegister(RequestDuration)
+ })
+ return nil
+}
+
+var metricsOnce sync.Once
diff --git a/plugin/proxy/proxy.go b/plugin/proxy/proxy.go
new file mode 100644
index 000000000..9d1e1906b
--- /dev/null
+++ b/plugin/proxy/proxy.go
@@ -0,0 +1,195 @@
+// Package proxy is plugin that proxies requests.
+package proxy
+
+import (
+ "errors"
+ "fmt"
+ "sync/atomic"
+ "time"
+
+ "github.com/coredns/coredns/plugin"
+ "github.com/coredns/coredns/plugin/dnstap"
+ "github.com/coredns/coredns/plugin/dnstap/msg"
+ "github.com/coredns/coredns/plugin/pkg/healthcheck"
+ "github.com/coredns/coredns/request"
+
+ tap "github.com/dnstap/golang-dnstap"
+ "github.com/miekg/dns"
+ ot "github.com/opentracing/opentracing-go"
+ "golang.org/x/net/context"
+)
+
+var (
+ errUnreachable = errors.New("unreachable backend")
+ errInvalidProtocol = errors.New("invalid protocol")
+ errInvalidDomain = errors.New("invalid path for proxy")
+)
+
+// Proxy represents a plugin instance that can proxy requests to another (DNS) server.
+type Proxy struct {
+ Next plugin.Handler
+
+ // Upstreams is a pointer to a slice, so we can update the upstream (used for Google)
+ // midway.
+
+ Upstreams *[]Upstream
+
+ // Trace is the Trace plugin, if it is installed
+ // This is used by the grpc exchanger to trace through the grpc calls
+ Trace plugin.Handler
+}
+
+// Upstream manages a pool of proxy upstream hosts. Select should return a
+// suitable upstream host, or nil if no such hosts are available.
+type Upstream interface {
+ // The domain name this upstream host should be routed on.
+ From() string
+ // Selects an upstream host to be routed to.
+ Select() *healthcheck.UpstreamHost
+ // Checks if subpdomain is not an ignored.
+ IsAllowedDomain(string) bool
+ // Exchanger returns the exchanger to be used for this upstream.
+ Exchanger() Exchanger
+ // Stops the upstream from proxying requests to shutdown goroutines cleanly.
+ Stop() error
+}
+
+// tryDuration is how long to try upstream hosts; failures result in
+// immediate retries until this duration ends or we get a nil host.
+var tryDuration = 60 * time.Second
+
+// ServeDNS satisfies the plugin.Handler interface.
+func (p Proxy) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg) (int, error) {
+ var span, child ot.Span
+ span = ot.SpanFromContext(ctx)
+ state := request.Request{W: w, Req: r}
+
+ upstream := p.match(state)
+ if upstream == nil {
+ return plugin.NextOrFailure(p.Name(), p.Next, ctx, w, r)
+ }
+
+ for {
+ start := time.Now()
+ reply := new(dns.Msg)
+ var backendErr error
+
+ // Since Select() should give us "up" hosts, keep retrying
+ // hosts until timeout (or until we get a nil host).
+ for time.Since(start) < tryDuration {
+ host := upstream.Select()
+ if host == nil {
+
+ RequestDuration.WithLabelValues(state.Proto(), upstream.Exchanger().Protocol(), upstream.From()).Observe(float64(time.Since(start) / time.Millisecond))
+
+ return dns.RcodeServerFailure, fmt.Errorf("%s: %s", errUnreachable, "no upstream host")
+ }
+
+ if span != nil {
+ child = span.Tracer().StartSpan("exchange", ot.ChildOf(span.Context()))
+ ctx = ot.ContextWithSpan(ctx, child)
+ }
+
+ atomic.AddInt64(&host.Conns, 1)
+ queryEpoch := msg.Epoch()
+
+ reply, backendErr = upstream.Exchanger().Exchange(ctx, host.Name, state)
+
+ respEpoch := msg.Epoch()
+ atomic.AddInt64(&host.Conns, -1)
+
+ if child != nil {
+ child.Finish()
+ }
+
+ taperr := toDnstap(ctx, host.Name, upstream.Exchanger(), state, reply, queryEpoch, respEpoch)
+
+ if backendErr == nil {
+ w.WriteMsg(reply)
+
+ RequestDuration.WithLabelValues(state.Proto(), upstream.Exchanger().Protocol(), upstream.From()).Observe(float64(time.Since(start) / time.Millisecond))
+
+ return 0, taperr
+ }
+
+ timeout := host.FailTimeout
+ if timeout == 0 {
+ timeout = 10 * time.Second
+ }
+ atomic.AddInt32(&host.Fails, 1)
+ go func(host *healthcheck.UpstreamHost, timeout time.Duration) {
+ time.Sleep(timeout)
+ atomic.AddInt32(&host.Fails, -1)
+ }(host, timeout)
+ }
+
+ RequestDuration.WithLabelValues(state.Proto(), upstream.Exchanger().Protocol(), upstream.From()).Observe(float64(time.Since(start) / time.Millisecond))
+
+ return dns.RcodeServerFailure, fmt.Errorf("%s: %s", errUnreachable, backendErr)
+ }
+}
+
+func (p Proxy) match(state request.Request) (u Upstream) {
+ if p.Upstreams == nil {
+ return nil
+ }
+
+ longestMatch := 0
+ for _, upstream := range *p.Upstreams {
+ from := upstream.From()
+
+ if !plugin.Name(from).Matches(state.Name()) || !upstream.IsAllowedDomain(state.Name()) {
+ continue
+ }
+
+ if lf := len(from); lf > longestMatch {
+ longestMatch = lf
+ u = upstream
+ }
+ }
+ return u
+
+}
+
+// Name implements the Handler interface.
+func (p Proxy) Name() string { return "proxy" }
+
+// defaultTimeout is the default networking timeout for DNS requests.
+const defaultTimeout = 5 * time.Second
+
+func toDnstap(ctx context.Context, host string, ex Exchanger, state request.Request, reply *dns.Msg, queryEpoch, respEpoch uint64) (err error) {
+ if tapper := dnstap.TapperFromContext(ctx); tapper != nil {
+ // Query
+ b := tapper.TapBuilder()
+ b.TimeSec = queryEpoch
+ if err = b.HostPort(host); err != nil {
+ return
+ }
+ t := ex.Transport()
+ if t == "" {
+ t = state.Proto()
+ }
+ if t == "tcp" {
+ b.SocketProto = tap.SocketProtocol_TCP
+ } else {
+ b.SocketProto = tap.SocketProtocol_UDP
+ }
+ if err = b.Msg(state.Req); err != nil {
+ return
+ }
+ err = tapper.TapMessage(b.ToOutsideQuery(tap.Message_FORWARDER_QUERY))
+ if err != nil {
+ return
+ }
+
+ // Response
+ if reply != nil {
+ b.TimeSec = respEpoch
+ if err = b.Msg(reply); err != nil {
+ return
+ }
+ err = tapper.TapMessage(b.ToOutsideResponse(tap.Message_FORWARDER_RESPONSE))
+ }
+ }
+ return
+}
diff --git a/plugin/proxy/proxy_test.go b/plugin/proxy/proxy_test.go
new file mode 100644
index 000000000..b0cb9c3cb
--- /dev/null
+++ b/plugin/proxy/proxy_test.go
@@ -0,0 +1,87 @@
+package proxy
+
+import (
+ "fmt"
+ "net/http"
+ "net/http/httptest"
+ "strings"
+ "sync/atomic"
+ "testing"
+ "time"
+
+ "github.com/mholt/caddy/caddyfile"
+)
+
+func TestStop(t *testing.T) {
+ config := "proxy . %s {\n health_check /healthcheck:%s %dms \n}"
+ tests := []struct {
+ name string
+ intervalInMilliseconds int
+ numHealthcheckIntervals int
+ }{
+ {
+ "No Healthchecks After Stop - 5ms, 1 intervals",
+ 5,
+ 1,
+ },
+ {
+ "No Healthchecks After Stop - 5ms, 2 intervals",
+ 5,
+ 2,
+ },
+ {
+ "No Healthchecks After Stop - 5ms, 3 intervals",
+ 5,
+ 3,
+ },
+ }
+
+ for _, test := range tests {
+ t.Run(test.name, func(t *testing.T) {
+
+ // Set up proxy.
+ var counter int64
+ backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ r.Body.Close()
+ atomic.AddInt64(&counter, 1)
+ }))
+
+ defer backend.Close()
+
+ port := backend.URL[17:] // Remove all crap up to the port
+ back := backend.URL[7:] // Remove http://
+ c := caddyfile.NewDispenser("Testfile", strings.NewReader(fmt.Sprintf(config, back, port, test.intervalInMilliseconds)))
+ upstreams, err := NewStaticUpstreams(&c)
+ if err != nil {
+ t.Error("Expected no error. Got:", err.Error())
+ }
+
+ // Give some time for healthchecks to hit the server.
+ time.Sleep(time.Duration(test.intervalInMilliseconds*test.numHealthcheckIntervals) * time.Millisecond)
+
+ for _, upstream := range upstreams {
+ if err := upstream.Stop(); err != nil {
+ t.Error("Expected no error stopping upstream. Got: ", err.Error())
+ }
+ }
+
+ counterValueAfterShutdown := atomic.LoadInt64(&counter)
+
+ // Give some time to see if healthchecks are still hitting the server.
+ time.Sleep(time.Duration(test.intervalInMilliseconds*test.numHealthcheckIntervals) * time.Millisecond)
+
+ if counterValueAfterShutdown == 0 {
+ t.Error("Expected healthchecks to hit test server. Got no healthchecks.")
+ }
+
+ // health checks are in a go routine now, so one may well occur after we shutdown,
+ // but we only ever expect one more
+ counterValueAfterWaiting := atomic.LoadInt64(&counter)
+ if counterValueAfterWaiting > (counterValueAfterShutdown + 1) {
+ t.Errorf("Expected no more healthchecks after shutdown. Got: %d healthchecks after shutdown", counterValueAfterWaiting-counterValueAfterShutdown)
+ }
+
+ })
+
+ }
+}
diff --git a/plugin/proxy/response.go b/plugin/proxy/response.go
new file mode 100644
index 000000000..2ad553c41
--- /dev/null
+++ b/plugin/proxy/response.go
@@ -0,0 +1,21 @@
+package proxy
+
+import (
+ "net"
+
+ "github.com/miekg/dns"
+)
+
+type fakeBootWriter struct {
+ dns.ResponseWriter
+}
+
+func (w *fakeBootWriter) LocalAddr() net.Addr {
+ local := net.ParseIP("127.0.0.1")
+ return &net.UDPAddr{IP: local, Port: 53} // Port is not used here
+}
+
+func (w *fakeBootWriter) RemoteAddr() net.Addr {
+ remote := net.ParseIP("8.8.8.8")
+ return &net.UDPAddr{IP: remote, Port: 53} // Port is not used here
+}
diff --git a/plugin/proxy/setup.go b/plugin/proxy/setup.go
new file mode 100644
index 000000000..bbe65c35d
--- /dev/null
+++ b/plugin/proxy/setup.go
@@ -0,0 +1,46 @@
+package proxy
+
+import (
+ "github.com/coredns/coredns/core/dnsserver"
+ "github.com/coredns/coredns/plugin"
+
+ "github.com/mholt/caddy"
+)
+
+func init() {
+ caddy.RegisterPlugin("proxy", caddy.Plugin{
+ ServerType: "dns",
+ Action: setup,
+ })
+}
+
+func setup(c *caddy.Controller) error {
+ upstreams, err := NewStaticUpstreams(&c.Dispenser)
+ if err != nil {
+ return plugin.Error("proxy", err)
+ }
+
+ t := dnsserver.GetConfig(c).Handler("trace")
+ P := &Proxy{Trace: t}
+ dnsserver.GetConfig(c).AddPlugin(func(next plugin.Handler) plugin.Handler {
+ P.Next = next
+ P.Upstreams = &upstreams
+ return P
+ })
+
+ c.OnStartup(OnStartupMetrics)
+
+ for i := range upstreams {
+ u := upstreams[i]
+ c.OnStartup(func() error {
+ return u.Exchanger().OnStartup(P)
+ })
+ c.OnShutdown(func() error {
+ return u.Exchanger().OnShutdown(P)
+ })
+ // Register shutdown handlers.
+ c.OnShutdown(u.Stop)
+ }
+
+ return nil
+}
diff --git a/plugin/proxy/upstream.go b/plugin/proxy/upstream.go
new file mode 100644
index 000000000..b60b6ff58
--- /dev/null
+++ b/plugin/proxy/upstream.go
@@ -0,0 +1,234 @@
+package proxy
+
+import (
+ "fmt"
+ "net"
+ "strconv"
+ "sync/atomic"
+ "time"
+
+ "github.com/coredns/coredns/plugin"
+ "github.com/coredns/coredns/plugin/pkg/dnsutil"
+ "github.com/coredns/coredns/plugin/pkg/healthcheck"
+ "github.com/coredns/coredns/plugin/pkg/tls"
+ "github.com/mholt/caddy/caddyfile"
+ "github.com/miekg/dns"
+)
+
+type staticUpstream struct {
+ from string
+
+ healthcheck.HealthCheck
+
+ WithoutPathPrefix string
+ IgnoredSubDomains []string
+ ex Exchanger
+}
+
+// NewStaticUpstreams parses the configuration input and sets up
+// static upstreams for the proxy plugin.
+func NewStaticUpstreams(c *caddyfile.Dispenser) ([]Upstream, error) {
+ var upstreams []Upstream
+ for c.Next() {
+ upstream := &staticUpstream{
+ from: ".",
+ HealthCheck: healthcheck.HealthCheck{
+ FailTimeout: 10 * time.Second,
+ MaxFails: 1,
+ Future: 60 * time.Second,
+ },
+ ex: newDNSEx(),
+ }
+
+ if !c.Args(&upstream.from) {
+ return upstreams, c.ArgErr()
+ }
+ upstream.from = plugin.Host(upstream.from).Normalize()
+
+ to := c.RemainingArgs()
+ if len(to) == 0 {
+ return upstreams, c.ArgErr()
+ }
+
+ // process the host list, substituting in any nameservers in files
+ toHosts, err := dnsutil.ParseHostPortOrFile(to...)
+ if err != nil {
+ return upstreams, err
+ }
+
+ for c.NextBlock() {
+ if err := parseBlock(c, upstream); err != nil {
+ return upstreams, err
+ }
+ }
+
+ upstream.Hosts = make([]*healthcheck.UpstreamHost, len(toHosts))
+ for i, host := range toHosts {
+ uh := &healthcheck.UpstreamHost{
+ Name: host,
+ Conns: 0,
+ Fails: 0,
+ FailTimeout: upstream.FailTimeout,
+
+ CheckDown: func(upstream *staticUpstream) healthcheck.UpstreamHostDownFunc {
+ return func(uh *healthcheck.UpstreamHost) bool {
+
+ down := false
+
+ uh.CheckMu.Lock()
+ until := uh.OkUntil
+ uh.CheckMu.Unlock()
+
+ if !until.IsZero() && time.Now().After(until) {
+ down = true
+ }
+
+ fails := atomic.LoadInt32(&uh.Fails)
+ if fails >= upstream.MaxFails && upstream.MaxFails != 0 {
+ down = true
+ }
+ return down
+ }
+ }(upstream),
+ WithoutPathPrefix: upstream.WithoutPathPrefix,
+ }
+
+ upstream.Hosts[i] = uh
+ }
+ upstream.Start()
+
+ upstreams = append(upstreams, upstream)
+ }
+ return upstreams, nil
+}
+
+func (u *staticUpstream) From() string {
+ return u.from
+}
+
+func parseBlock(c *caddyfile.Dispenser, u *staticUpstream) error {
+ switch c.Val() {
+ case "policy":
+ if !c.NextArg() {
+ return c.ArgErr()
+ }
+ policyCreateFunc, ok := healthcheck.SupportedPolicies[c.Val()]
+ if !ok {
+ return c.ArgErr()
+ }
+ u.Policy = policyCreateFunc()
+ case "fail_timeout":
+ if !c.NextArg() {
+ return c.ArgErr()
+ }
+ dur, err := time.ParseDuration(c.Val())
+ if err != nil {
+ return err
+ }
+ u.FailTimeout = dur
+ case "max_fails":
+ if !c.NextArg() {
+ return c.ArgErr()
+ }
+ n, err := strconv.Atoi(c.Val())
+ if err != nil {
+ return err
+ }
+ u.MaxFails = int32(n)
+ case "health_check":
+ if !c.NextArg() {
+ return c.ArgErr()
+ }
+ var err error
+ u.HealthCheck.Path, u.HealthCheck.Port, err = net.SplitHostPort(c.Val())
+ if err != nil {
+ return err
+ }
+ u.HealthCheck.Interval = 30 * time.Second
+ if c.NextArg() {
+ dur, err := time.ParseDuration(c.Val())
+ if err != nil {
+ return err
+ }
+ u.HealthCheck.Interval = dur
+ u.Future = 2 * dur
+
+ // set a minimum of 3 seconds
+ if u.Future < (3 * time.Second) {
+ u.Future = 3 * time.Second
+ }
+ }
+ case "without":
+ if !c.NextArg() {
+ return c.ArgErr()
+ }
+ u.WithoutPathPrefix = c.Val()
+ case "except":
+ ignoredDomains := c.RemainingArgs()
+ if len(ignoredDomains) == 0 {
+ return c.ArgErr()
+ }
+ for i := 0; i < len(ignoredDomains); i++ {
+ ignoredDomains[i] = plugin.Host(ignoredDomains[i]).Normalize()
+ }
+ u.IgnoredSubDomains = ignoredDomains
+ case "spray":
+ u.Spray = &healthcheck.Spray{}
+ case "protocol":
+ encArgs := c.RemainingArgs()
+ if len(encArgs) == 0 {
+ return c.ArgErr()
+ }
+ switch encArgs[0] {
+ case "dns":
+ if len(encArgs) > 1 {
+ if encArgs[1] == "force_tcp" {
+ opts := Options{ForceTCP: true}
+ u.ex = newDNSExWithOption(opts)
+ } else {
+ return fmt.Errorf("only force_tcp allowed as parameter to dns")
+ }
+ } else {
+ u.ex = newDNSEx()
+ }
+ case "https_google":
+ boot := []string{"8.8.8.8:53", "8.8.4.4:53"}
+ if len(encArgs) > 2 && encArgs[1] == "bootstrap" {
+ boot = encArgs[2:]
+ }
+
+ u.ex = newGoogle("", boot) // "" for default in google.go
+ case "grpc":
+ if len(encArgs) == 2 && encArgs[1] == "insecure" {
+ u.ex = newGrpcClient(nil, u)
+ return nil
+ }
+ tls, err := tls.NewTLSConfigFromArgs(encArgs[1:]...)
+ if err != nil {
+ return err
+ }
+ u.ex = newGrpcClient(tls, u)
+ default:
+ return fmt.Errorf("%s: %s", errInvalidProtocol, encArgs[0])
+ }
+
+ default:
+ return c.Errf("unknown property '%s'", c.Val())
+ }
+ return nil
+}
+
+func (u *staticUpstream) IsAllowedDomain(name string) bool {
+ if dns.Name(name) == dns.Name(u.From()) {
+ return true
+ }
+
+ for _, ignoredSubDomain := range u.IgnoredSubDomains {
+ if plugin.Name(ignoredSubDomain).Matches(name) {
+ return false
+ }
+ }
+ return true
+}
+
+func (u *staticUpstream) Exchanger() Exchanger { return u.ex }
diff --git a/plugin/proxy/upstream_test.go b/plugin/proxy/upstream_test.go
new file mode 100644
index 000000000..42d50cac3
--- /dev/null
+++ b/plugin/proxy/upstream_test.go
@@ -0,0 +1,324 @@
+package proxy
+
+import (
+ "path/filepath"
+ "strings"
+ "testing"
+
+ "github.com/coredns/coredns/plugin/test"
+
+ "github.com/mholt/caddy"
+)
+
+func TestAllowedDomain(t *testing.T) {
+ upstream := &staticUpstream{
+ from: "miek.nl.",
+ IgnoredSubDomains: []string{"download.miek.nl.", "static.miek.nl."}, // closing dot mandatory
+ }
+ tests := []struct {
+ name string
+ expected bool
+ }{
+ {"miek.nl.", true},
+ {"download.miek.nl.", false},
+ {"static.miek.nl.", false},
+ {"blaat.miek.nl.", true},
+ }
+
+ for i, test := range tests {
+ isAllowed := upstream.IsAllowedDomain(test.name)
+ if test.expected != isAllowed {
+ t.Errorf("Test %d: expected %v found %v for %s", i+1, test.expected, isAllowed, test.name)
+ }
+ }
+}
+
+func TestProxyParse(t *testing.T) {
+ rmFunc, cert, key, ca := getPEMFiles(t)
+ defer rmFunc()
+
+ grpc1 := "proxy . 8.8.8.8:53 {\n protocol grpc " + ca + "\n}"
+ grpc2 := "proxy . 8.8.8.8:53 {\n protocol grpc " + cert + " " + key + "\n}"
+ grpc3 := "proxy . 8.8.8.8:53 {\n protocol grpc " + cert + " " + key + " " + ca + "\n}"
+ grpc4 := "proxy . 8.8.8.8:53 {\n protocol grpc " + key + "\n}"
+
+ tests := []struct {
+ inputUpstreams string
+ shouldErr bool
+ }{
+ {
+ `proxy . 8.8.8.8:53`,
+ false,
+ },
+ {
+ `proxy 10.0.0.0/24 8.8.8.8:53`,
+ false,
+ },
+ {
+ `
+proxy . 8.8.8.8:53 {
+ policy round_robin
+}`,
+ false,
+ },
+ {
+ `
+proxy . 8.8.8.8:53 {
+ fail_timeout 5s
+}`,
+ false,
+ },
+ {
+ `
+proxy . 8.8.8.8:53 {
+ max_fails 10
+}`,
+ false,
+ },
+ {
+ `
+proxy . 8.8.8.8:53 {
+ health_check /health:8080
+}`,
+ false,
+ },
+ {
+ `
+proxy . 8.8.8.8:53 {
+ without without
+}`,
+ false,
+ },
+ {
+ `
+proxy . 8.8.8.8:53 {
+ except miek.nl example.org 10.0.0.0/24
+}`,
+ false,
+ },
+ {
+ `
+proxy . 8.8.8.8:53 {
+ spray
+}`,
+ false,
+ },
+ {
+ `
+proxy . 8.8.8.8:53 {
+ error_option
+}`,
+ true,
+ },
+ {
+ `
+proxy . some_bogus_filename`,
+ true,
+ },
+ {
+ `
+proxy . 8.8.8.8:53 {
+ protocol dns
+}`,
+ false,
+ },
+ {
+ `
+proxy . 8.8.8.8:53 {
+ protocol grpc
+}`,
+ false,
+ },
+ {
+ `
+proxy . 8.8.8.8:53 {
+ protocol grpc insecure
+}`,
+ false,
+ },
+ {
+ `
+proxy . 8.8.8.8:53 {
+ protocol dns force_tcp
+}`,
+ false,
+ },
+ {
+ `
+proxy . 8.8.8.8:53 {
+ protocol grpc a b c d
+}`,
+ true,
+ },
+ {
+ grpc1,
+ false,
+ },
+ {
+ grpc2,
+ false,
+ },
+ {
+ grpc3,
+ false,
+ },
+ {
+ grpc4,
+ true,
+ },
+ {
+ `
+proxy . 8.8.8.8:53 {
+ protocol foobar
+}`,
+ true,
+ },
+ {
+ `proxy`,
+ true,
+ },
+ {
+ `
+proxy . 8.8.8.8:53 {
+ protocol foobar
+}`,
+ true,
+ },
+ {
+ `
+proxy . 8.8.8.8:53 {
+ policy
+}`,
+ true,
+ },
+ {
+ `
+proxy . 8.8.8.8:53 {
+ fail_timeout
+}`,
+ true,
+ },
+ {
+ `
+proxy . 8.8.8.8:53 {
+ fail_timeout junky
+}`,
+ true,
+ },
+ {
+ `
+proxy . 8.8.8.8:53 {
+ health_check
+}`,
+ true,
+ },
+ {
+ `
+proxy . 8.8.8.8:53 {
+ protocol dns force
+}`,
+ true,
+ },
+ }
+ for i, test := range tests {
+ c := caddy.NewTestController("dns", test.inputUpstreams)
+ _, err := NewStaticUpstreams(&c.Dispenser)
+ if (err != nil) != test.shouldErr {
+ t.Errorf("Test %d expected no error, got %v for %s", i+1, err, test.inputUpstreams)
+ }
+ }
+}
+
+func TestResolvParse(t *testing.T) {
+ tests := []struct {
+ inputUpstreams string
+ filedata string
+ shouldErr bool
+ expected []string
+ }{
+ {
+ `
+proxy . FILE
+`,
+ `
+nameserver 1.2.3.4
+nameserver 4.3.2.1
+`,
+ false,
+ []string{"1.2.3.4:53", "4.3.2.1:53"},
+ },
+ {
+ `
+proxy example.com 1.1.1.1:5000
+proxy . FILE
+proxy example.org 2.2.2.2:1234
+`,
+ `
+nameserver 1.2.3.4
+`,
+ false,
+ []string{"1.1.1.1:5000", "1.2.3.4:53", "2.2.2.2:1234"},
+ },
+ {
+ `
+proxy example.com 1.1.1.1:5000
+proxy . FILE
+proxy example.org 2.2.2.2:1234
+`,
+ `
+junky resolve.conf
+`,
+ false,
+ []string{"1.1.1.1:5000", "2.2.2.2:1234"},
+ },
+ }
+ for i, tc := range tests {
+
+ path, rm, err := test.TempFile(".", tc.filedata)
+ if err != nil {
+ t.Fatalf("Test %d could not creat temp file %v", i, err)
+ }
+ defer rm()
+
+ config := strings.Replace(tc.inputUpstreams, "FILE", path, -1)
+ c := caddy.NewTestController("dns", config)
+ upstreams, err := NewStaticUpstreams(&c.Dispenser)
+ if (err != nil) != tc.shouldErr {
+ t.Errorf("Test %d expected no error, got %v", i+1, err)
+ }
+ var hosts []string
+ for _, u := range upstreams {
+ for _, h := range u.(*staticUpstream).Hosts {
+ hosts = append(hosts, h.Name)
+ }
+ }
+ if !tc.shouldErr {
+ if len(hosts) != len(tc.expected) {
+ t.Errorf("Test %d expected %d hosts got %d", i+1, len(tc.expected), len(upstreams))
+ } else {
+ ok := true
+ for i, v := range tc.expected {
+ if v != hosts[i] {
+ ok = false
+ }
+ }
+ if !ok {
+ t.Errorf("Test %d expected %v got %v", i+1, tc.expected, upstreams)
+ }
+ }
+ }
+ }
+}
+
+func getPEMFiles(t *testing.T) (rmFunc func(), cert, key, ca string) {
+ tempDir, rmFunc, err := test.WritePEMFiles("")
+ if err != nil {
+ t.Fatalf("Could not write PEM files: %s", err)
+ }
+
+ cert = filepath.Join(tempDir, "cert.pem")
+ key = filepath.Join(tempDir, "key.pem")
+ ca = filepath.Join(tempDir, "ca.pem")
+
+ return
+}