diff options
Diffstat (limited to 'plugin/proxy')
-rw-r--r-- | plugin/proxy/README.md | 175 | ||||
-rw-r--r-- | plugin/proxy/dns.go | 106 | ||||
-rw-r--r-- | plugin/proxy/dnstap_test.go | 57 | ||||
-rw-r--r-- | plugin/proxy/exchanger.go | 22 | ||||
-rw-r--r-- | plugin/proxy/google.go | 244 | ||||
-rw-r--r-- | plugin/proxy/google_rr.go | 89 | ||||
-rw-r--r-- | plugin/proxy/google_test.go | 5 | ||||
-rw-r--r-- | plugin/proxy/grpc.go | 96 | ||||
-rw-r--r-- | plugin/proxy/grpc_test.go | 71 | ||||
-rw-r--r-- | plugin/proxy/lookup.go | 132 | ||||
-rw-r--r-- | plugin/proxy/metrics.go | 30 | ||||
-rw-r--r-- | plugin/proxy/proxy.go | 195 | ||||
-rw-r--r-- | plugin/proxy/proxy_test.go | 87 | ||||
-rw-r--r-- | plugin/proxy/response.go | 21 | ||||
-rw-r--r-- | plugin/proxy/setup.go | 46 | ||||
-rw-r--r-- | plugin/proxy/upstream.go | 234 | ||||
-rw-r--r-- | plugin/proxy/upstream_test.go | 324 |
17 files changed, 1934 insertions, 0 deletions
diff --git a/plugin/proxy/README.md b/plugin/proxy/README.md new file mode 100644 index 000000000..3cccf05ee --- /dev/null +++ b/plugin/proxy/README.md @@ -0,0 +1,175 @@ +# proxy + +*proxy* facilitates both a basic reverse proxy and a robust load balancer. + +The proxy has support for multiple backends. The load balancing features include multiple policies, +health checks, and failovers. If all hosts fail their health check the proxy plugin will fail +back to randomly selecting a target and sending packets to it. + +## Syntax + +In its most basic form, a simple reverse proxy uses this syntax: + +~~~ +proxy FROM TO +~~~ + +* **FROM** is the base domain to match for the request to be proxied. +* **TO** is the destination endpoint to proxy to. + +However, advanced features including load balancing can be utilized with an expanded syntax: + +~~~ +proxy FROM TO... { + policy random|least_conn|round_robin + fail_timeout DURATION + max_fails INTEGER + health_check PATH:PORT [DURATION] + except IGNORED_NAMES... + spray + protocol [dns [force_tcp]|https_google [bootstrap ADDRESS...]|grpc [insecure|CACERT|KEY CERT|KEY CERT CACERT]] +} +~~~ + +* **FROM** is the name to match for the request to be proxied. +* **TO** is the destination endpoint to proxy to. At least one is required, but multiple may be + specified. **TO** may be an IP:Port pair, or may reference a file in resolv.conf format +* `policy` is the load balancing policy to use; applies only with multiple backends. May be one of + random, least_conn, or round_robin. Default is random. +* `fail_timeout` specifies how long to consider a backend as down after it has failed. While it is + down, requests will not be routed to that backend. A backend is "down" if CoreDNS fails to + communicate with it. The default value is 10 seconds ("10s"). +* `max_fails` is the number of failures within fail_timeout that are needed before considering + a backend to be down. If 0, the backend will never be marked as down. Default is 1. +* `health_check` will check path (on port) on each backend. If a backend returns a status code of + 200-399, then that backend is marked healthy for double the healthcheck duration. If it doesn't, + it is marked as unhealthy and no requests are routed to it. If this option is not provided then + health checks are disabled. The default duration is 30 seconds ("30s"). +* **IGNORED_NAMES** in `except` is a space-separated list of domains to exclude from proxying. + Requests that match none of these names will be passed through. +* `spray` when all backends are unhealthy, randomly pick one to send the traffic to. (This is + a failsafe.) +* `protocol` specifies what protocol to use to speak to an upstream, `dns` (the default) is plain + old DNS, and `https_google` uses `https://dns.google.com` and speaks a JSON DNS dialect. Note when + using this **TO** will be ignored. The `grpc` option will talk to a server that has implemented + the [DnsService](https://github.com/coredns/coredns/pb/dns.proto). + An out-of-tree plugin that implements the server side of this can be found at + [here](https://github.com/infobloxopen/coredns-grpc). + +## Policies + +There are three load-balancing policies available: +* `random` (default) - Randomly select a backend +* `least_conn` - Select the backend with the fewest active connections +* `round_robin` - Select the backend in round-robin fashion + +All polices implement randomly spraying packets to backend hosts when *no healthy* hosts are +available. This is to preeempt the case where the healthchecking (as a mechanism) fails. + +## Upstream Protocols + +Currently `protocol` supports `dns` (i.e., standard DNS over UDP/TCP) and `https_google` (JSON +payload over HTTPS). Note that with `https_google` the entire transport is encrypted. Only *you* and +*Google* can see your DNS activity. + +* `dns`: uses the standard DNS exchange. You can pass `force_tcp` to make sure that the proxied connection is performed + over TCP, regardless of the inbound request's protocol. +* `https_google`: bootstrap **ADDRESS...** is used to (re-)resolve `dns.google.com` to an address to + connect to. This happens every 300s. If not specified the default is used: 8.8.8.8:53/8.8.4.4:53. + Note that **TO** is *ignored* when `https_google` is used, as its upstream is defined as + `dns.google.com`. + + Debug queries are enabled by default and currently there is no way to turn them off. When CoreDNS + receives a debug query (i.e. the name is prefixed with `o-o.debug.`) a TXT record with Comment + from `dns.google.com` is added. Note this is not always set. +* `grpc`: options are used to control how the TLS connection is made to the gRPC server. + * None - No client authentication is used, and the system CAs are used to verify the server certificate. + * `insecure` - TLS is not used, the connection is made in plaintext (not good in production). + * **CACERT** - No client authentication is used, and the file **CACERT** is used to verify the server certificate. + * **KEY** **CERT** - Client authentication is used with the specified key/cert pair. The server + certificate is verified with the system CAs. + * **KEY** **CERT** **CACERT** - Client authentication is used with the specified key/cert pair. The + server certificate is verified using the **CACERT** file. + + An out-of-tree plugin that implements the server side of this can be found at + [here](https://github.com/infobloxopen/coredns-grpc). + +## Metrics + +If monitoring is enabled (via the *prometheus* directive) then the following metric is exported: + +* coredns_proxy_request_count_total{proto, proxy_proto, from} + +Where `proxy_proto` is the protocol used (`dns`, `grpc`, or `https_google`) and `from` is **FROM** +specified in the config, `proto` is the protocol used by the incoming query ("tcp" or "udp"). + +## Examples + +Proxy all requests within example.org. to a backend system: + +~~~ +proxy example.org 127.0.0.1:9005 +~~~ + +Load-balance all requests between three backends (using random policy): + +~~~ +proxy . 10.0.0.10:53 10.0.0.11:1053 10.0.0.12 +~~~ + +Same as above, but round-robin style: + +~~~ +proxy . 10.0.0.10:53 10.0.0.11:1053 10.0.0.12 { + policy round_robin +} +~~~ + +With health checks and proxy headers to pass hostname, IP, and scheme upstream: + +~~~ +proxy . 10.0.0.11:53 10.0.0.11:53 10.0.0.12:53 { + policy round_robin + health_check /health:8080 +} +~~~ + +Proxy everything except requests to miek.nl or example.org + +~~~ +proxy . 10.0.0.10:1234 { + except miek.nl example.org +} +~~~ + +Proxy everything except example.org using the host resolv.conf nameservers: + +~~~ +proxy . /etc/resolv.conf { + except miek.nl example.org +} +~~~ + +Proxy all requests within example.org to Google's dns.google.com. + +~~~ +proxy example.org 1.2.3.4:53 { + protocol https_google +} +~~~ + +Proxy everything with HTTPS to `dns.google.com`, except `example.org`. Then have another proxy in +another stanza that uses plain DNS to resolve names under `example.org`. + +~~~ +. { + proxy . 1.2.3.4:53 { + except example.org + protocol https_google + } +} + +example.org { + proxy . 8.8.8.8:53 +} +~~~ diff --git a/plugin/proxy/dns.go b/plugin/proxy/dns.go new file mode 100644 index 000000000..4d8038422 --- /dev/null +++ b/plugin/proxy/dns.go @@ -0,0 +1,106 @@ +package proxy + +import ( + "context" + "net" + "time" + + "github.com/coredns/coredns/request" + + "github.com/miekg/dns" +) + +type dnsEx struct { + Timeout time.Duration + Options +} + +// Options define the options understood by dns.Exchange. +type Options struct { + ForceTCP bool // If true use TCP for upstream no matter what +} + +func newDNSEx() *dnsEx { + return newDNSExWithOption(Options{}) +} + +func newDNSExWithOption(opt Options) *dnsEx { + return &dnsEx{Timeout: defaultTimeout * time.Second, Options: opt} +} + +func (d *dnsEx) Transport() string { + if d.Options.ForceTCP { + return "tcp" + } + + // The protocol will be determined by `state.Proto()` during Exchange. + return "" +} +func (d *dnsEx) Protocol() string { return "dns" } +func (d *dnsEx) OnShutdown(p *Proxy) error { return nil } +func (d *dnsEx) OnStartup(p *Proxy) error { return nil } + +// Exchange implements the Exchanger interface. +func (d *dnsEx) Exchange(ctx context.Context, addr string, state request.Request) (*dns.Msg, error) { + proto := state.Proto() + if d.Options.ForceTCP { + proto = "tcp" + } + co, err := net.DialTimeout(proto, addr, d.Timeout) + if err != nil { + return nil, err + } + + reply, _, err := d.ExchangeConn(state.Req, co) + + co.Close() + + if reply != nil && reply.Truncated { + // Suppress proxy error for truncated responses + err = nil + } + + if err != nil { + return nil, err + } + // Make sure it fits in the DNS response. + reply, _ = state.Scrub(reply) + reply.Compress = true + reply.Id = state.Req.Id + + return reply, nil +} + +func (d *dnsEx) ExchangeConn(m *dns.Msg, co net.Conn) (*dns.Msg, time.Duration, error) { + start := time.Now() + r, err := exchange(m, co) + rtt := time.Since(start) + + return r, rtt, err +} + +func exchange(m *dns.Msg, co net.Conn) (*dns.Msg, error) { + opt := m.IsEdns0() + + udpsize := uint16(dns.MinMsgSize) + // If EDNS0 is used use that for size. + if opt != nil && opt.UDPSize() >= dns.MinMsgSize { + udpsize = opt.UDPSize() + } + + dnsco := &dns.Conn{Conn: co, UDPSize: udpsize} + + writeDeadline := time.Now().Add(defaultTimeout) + dnsco.SetWriteDeadline(writeDeadline) + dnsco.WriteMsg(m) + + readDeadline := time.Now().Add(defaultTimeout) + co.SetReadDeadline(readDeadline) + r, err := dnsco.ReadMsg() + + dnsco.Close() + if r == nil { + return nil, err + } + return r, err +} diff --git a/plugin/proxy/dnstap_test.go b/plugin/proxy/dnstap_test.go new file mode 100644 index 000000000..05169a1ca --- /dev/null +++ b/plugin/proxy/dnstap_test.go @@ -0,0 +1,57 @@ +package proxy + +import ( + "testing" + + "github.com/coredns/coredns/plugin/dnstap/msg" + "github.com/coredns/coredns/plugin/dnstap/test" + mwtest "github.com/coredns/coredns/plugin/test" + "github.com/coredns/coredns/request" + + tap "github.com/dnstap/golang-dnstap" + "github.com/miekg/dns" + "golang.org/x/net/context" +) + +func testCase(t *testing.T, ex Exchanger, q, r *dns.Msg, datq, datr *msg.Data) { + tapq := datq.ToOutsideQuery(tap.Message_FORWARDER_QUERY) + tapr := datr.ToOutsideResponse(tap.Message_FORWARDER_RESPONSE) + ctx := test.Context{} + err := toDnstap(&ctx, "10.240.0.1:40212", ex, + request.Request{W: &mwtest.ResponseWriter{}, Req: q}, r, 0, 0) + if err != nil { + t.Fatal(err) + } + if len(ctx.Trap) != 2 { + t.Fatalf("messages: %d", len(ctx.Trap)) + } + if !test.MsgEqual(ctx.Trap[0], tapq) { + t.Errorf("want: %v\nhave: %v", tapq, ctx.Trap[0]) + } + if !test.MsgEqual(ctx.Trap[1], tapr) { + t.Errorf("want: %v\nhave: %v", tapr, ctx.Trap[1]) + } +} + +func TestDnstap(t *testing.T) { + q := mwtest.Case{Qname: "example.org", Qtype: dns.TypeA}.Msg() + r := mwtest.Case{ + Qname: "example.org.", Qtype: dns.TypeA, + Answer: []dns.RR{ + mwtest.A("example.org. 3600 IN A 10.0.0.1"), + }, + }.Msg() + tapq, tapr := test.TestingData(), test.TestingData() + testCase(t, newDNSEx(), q, r, tapq, tapr) + tapq.SocketProto = tap.SocketProtocol_TCP + tapr.SocketProto = tap.SocketProtocol_TCP + testCase(t, newDNSExWithOption(Options{ForceTCP: true}), q, r, tapq, tapr) + testCase(t, newGoogle("", []string{"8.8.8.8:53", "8.8.4.4:53"}), q, r, tapq, tapr) +} + +func TestNoDnstap(t *testing.T) { + err := toDnstap(context.TODO(), "", nil, request.Request{}, nil, 0, 0) + if err != nil { + t.Fatal(err) + } +} diff --git a/plugin/proxy/exchanger.go b/plugin/proxy/exchanger.go new file mode 100644 index 000000000..b98a687e7 --- /dev/null +++ b/plugin/proxy/exchanger.go @@ -0,0 +1,22 @@ +package proxy + +import ( + "context" + + "github.com/coredns/coredns/request" + "github.com/miekg/dns" +) + +// Exchanger is an interface that specifies a type implementing a DNS resolver that +// can use whatever transport it likes. +type Exchanger interface { + Exchange(ctx context.Context, addr string, state request.Request) (*dns.Msg, error) + Protocol() string + + // Transport returns the only transport protocol used by this Exchanger or "". + // If the return value is "", Exchange must use `state.Proto()`. + Transport() string + + OnStartup(*Proxy) error + OnShutdown(*Proxy) error +} diff --git a/plugin/proxy/google.go b/plugin/proxy/google.go new file mode 100644 index 000000000..ecc5e6dfd --- /dev/null +++ b/plugin/proxy/google.go @@ -0,0 +1,244 @@ +package proxy + +import ( + "context" + "crypto/tls" + "encoding/json" + "fmt" + "io/ioutil" + "log" + "net" + "net/http" + "net/url" + "sync/atomic" + "time" + + "github.com/coredns/coredns/plugin/pkg/healthcheck" + "github.com/coredns/coredns/request" + + "github.com/miekg/dns" +) + +type google struct { + client *http.Client + + endpoint string // Name to resolve via 'bootstrapProxy' + + bootstrapProxy Proxy + quit chan bool +} + +func newGoogle(endpoint string, bootstrap []string) *google { + if endpoint == "" { + endpoint = ghost + } + tls := &tls.Config{ServerName: endpoint} + client := &http.Client{ + Timeout: time.Second * defaultTimeout, + Transport: &http.Transport{TLSClientConfig: tls}, + } + + boot := NewLookup(bootstrap) + + return &google{client: client, endpoint: dns.Fqdn(endpoint), bootstrapProxy: boot, quit: make(chan bool)} +} + +func (g *google) Exchange(ctx context.Context, addr string, state request.Request) (*dns.Msg, error) { + v := url.Values{} + + v.Set("name", state.Name()) + v.Set("type", fmt.Sprintf("%d", state.QType())) + + buf, backendErr := g.exchangeJSON(addr, v.Encode()) + + if backendErr == nil { + gm := new(googleMsg) + if err := json.Unmarshal(buf, gm); err != nil { + return nil, err + } + + m, err := toMsg(gm) + if err != nil { + return nil, err + } + + m.Id = state.Req.Id + return m, nil + } + + log.Printf("[WARNING] Failed to connect to HTTPS backend %q: %s", g.endpoint, backendErr) + return nil, backendErr +} + +func (g *google) exchangeJSON(addr, json string) ([]byte, error) { + url := "https://" + addr + "/resolve?" + json + req, err := http.NewRequest("GET", url, nil) + if err != nil { + return nil, err + } + + req.Host = g.endpoint // TODO(miek): works with the extra dot at the end? + + resp, err := g.client.Do(req) + if err != nil { + return nil, err + } + + buf, err := ioutil.ReadAll(resp.Body) + resp.Body.Close() + if err != nil { + return nil, err + } + + if resp.StatusCode != 200 { + return nil, fmt.Errorf("failed to get 200 status code, got %d", resp.StatusCode) + } + + return buf, nil +} + +func (g *google) Transport() string { return "tcp" } +func (g *google) Protocol() string { return "https_google" } + +func (g *google) OnShutdown(p *Proxy) error { + g.quit <- true + return nil +} + +func (g *google) OnStartup(p *Proxy) error { + // We fake a state because normally the proxy is called after we already got a incoming query. + // This is a non-edns0, udp request to g.endpoint. + req := new(dns.Msg) + req.SetQuestion(g.endpoint, dns.TypeA) + state := request.Request{W: new(fakeBootWriter), Req: req} + + if len(*p.Upstreams) == 0 { + return fmt.Errorf("no upstreams defined") + } + + oldUpstream := (*p.Upstreams)[0] + + log.Printf("[INFO] Bootstrapping A records %q", g.endpoint) + + new, err := g.bootstrapProxy.Lookup(state, g.endpoint, dns.TypeA) + if err != nil { + log.Printf("[WARNING] Failed to bootstrap A records %q: %s", g.endpoint, err) + } else { + addrs, err1 := extractAnswer(new) + if err1 != nil { + log.Printf("[WARNING] Failed to bootstrap A records %q: %s", g.endpoint, err1) + } else { + + up := newUpstream(addrs, oldUpstream.(*staticUpstream)) + p.Upstreams = &[]Upstream{up} + + log.Printf("[INFO] Bootstrapping A records %q found: %v", g.endpoint, addrs) + } + } + + go func() { + tick := time.NewTicker(120 * time.Second) + + for { + select { + case <-tick.C: + + log.Printf("[INFO] Resolving A records %q", g.endpoint) + + new, err := g.bootstrapProxy.Lookup(state, g.endpoint, dns.TypeA) + if err != nil { + log.Printf("[WARNING] Failed to resolve A records %q: %s", g.endpoint, err) + continue + } + + addrs, err1 := extractAnswer(new) + if err1 != nil { + log.Printf("[WARNING] Failed to resolve A records %q: %s", g.endpoint, err1) + continue + } + + up := newUpstream(addrs, oldUpstream.(*staticUpstream)) + p.Upstreams = &[]Upstream{up} + + log.Printf("[INFO] Resolving A records %q found: %v", g.endpoint, addrs) + + case <-g.quit: + return + } + } + }() + + return nil +} + +func extractAnswer(m *dns.Msg) ([]string, error) { + if len(m.Answer) == 0 { + return nil, fmt.Errorf("no answer section in response") + } + ret := []string{} + for _, an := range m.Answer { + if a, ok := an.(*dns.A); ok { + ret = append(ret, net.JoinHostPort(a.A.String(), "443")) + } + } + if len(ret) > 0 { + return ret, nil + } + + return nil, fmt.Errorf("no address records in answer section") +} + +// newUpstream returns an upstream initialized with hosts. +func newUpstream(hosts []string, old *staticUpstream) Upstream { + upstream := &staticUpstream{ + from: old.from, + HealthCheck: healthcheck.HealthCheck{ + FailTimeout: 10 * time.Second, + MaxFails: 3, + Future: 60 * time.Second, + }, + ex: old.ex, + WithoutPathPrefix: old.WithoutPathPrefix, + IgnoredSubDomains: old.IgnoredSubDomains, + } + + upstream.Hosts = make([]*healthcheck.UpstreamHost, len(hosts)) + for i, h := range hosts { + uh := &healthcheck.UpstreamHost{ + Name: h, + Conns: 0, + Fails: 0, + FailTimeout: upstream.FailTimeout, + + CheckDown: func(upstream *staticUpstream) healthcheck.UpstreamHostDownFunc { + return func(uh *healthcheck.UpstreamHost) bool { + + down := false + + uh.CheckMu.Lock() + until := uh.OkUntil + uh.CheckMu.Unlock() + + if !until.IsZero() && time.Now().After(until) { + down = true + } + + fails := atomic.LoadInt32(&uh.Fails) + if fails >= upstream.MaxFails && upstream.MaxFails != 0 { + down = true + } + return down + } + }(upstream), + WithoutPathPrefix: upstream.WithoutPathPrefix, + } + + upstream.Hosts[i] = uh + } + return upstream +} + +const ( + // Default endpoint for this service. + ghost = "dns.google.com." +) diff --git a/plugin/proxy/google_rr.go b/plugin/proxy/google_rr.go new file mode 100644 index 000000000..3b9233b7b --- /dev/null +++ b/plugin/proxy/google_rr.go @@ -0,0 +1,89 @@ +package proxy + +import ( + "fmt" + + "github.com/miekg/dns" +) + +// toMsg converts a googleMsg into the dns message. +func toMsg(g *googleMsg) (*dns.Msg, error) { + m := new(dns.Msg) + m.Response = true + m.Rcode = g.Status + m.Truncated = g.TC + m.RecursionDesired = g.RD + m.RecursionAvailable = g.RA + m.AuthenticatedData = g.AD + m.CheckingDisabled = g.CD + + m.Question = make([]dns.Question, 1) + m.Answer = make([]dns.RR, len(g.Answer)) + m.Ns = make([]dns.RR, len(g.Authority)) + m.Extra = make([]dns.RR, len(g.Additional)) + + m.Question[0] = dns.Question{Name: g.Question[0].Name, Qtype: g.Question[0].Type, Qclass: dns.ClassINET} + + var err error + for i := 0; i < len(m.Answer); i++ { + m.Answer[i], err = toRR(g.Answer[i]) + if err != nil { + return nil, err + } + } + for i := 0; i < len(m.Ns); i++ { + m.Ns[i], err = toRR(g.Authority[i]) + if err != nil { + return nil, err + } + } + for i := 0; i < len(m.Extra); i++ { + m.Extra[i], err = toRR(g.Additional[i]) + if err != nil { + return nil, err + } + } + + return m, nil +} + +// toRR transforms a "google" RR to a dns.RR. +func toRR(g googleRR) (dns.RR, error) { + typ, ok := dns.TypeToString[g.Type] + if !ok { + return nil, fmt.Errorf("failed to convert type %q", g.Type) + } + + str := fmt.Sprintf("%s %d %s %s", g.Name, g.TTL, typ, g.Data) + rr, err := dns.NewRR(str) + if err != nil { + return nil, fmt.Errorf("failed to parse %q: %s", str, err) + } + return rr, nil +} + +// googleRR represents a dns.RR in another form. +type googleRR struct { + Name string + Type uint16 + TTL uint32 + Data string +} + +// googleMsg is a JSON representation of the dns.Msg. +type googleMsg struct { + Status int + TC bool + RD bool + RA bool + AD bool + CD bool + Question []struct { + Name string + Type uint16 + } + Answer []googleRR + Authority []googleRR + Additional []googleRR + Comment string +} diff --git a/plugin/proxy/google_test.go b/plugin/proxy/google_test.go new file mode 100644 index 000000000..1ce591664 --- /dev/null +++ b/plugin/proxy/google_test.go @@ -0,0 +1,5 @@ +package proxy + +// TODO(miek): +// Test cert failures - put those in SERVFAIL messages, but attach error code in TXT +// Test connecting to a a bad host. diff --git a/plugin/proxy/grpc.go b/plugin/proxy/grpc.go new file mode 100644 index 000000000..f98fd2e91 --- /dev/null +++ b/plugin/proxy/grpc.go @@ -0,0 +1,96 @@ +package proxy + +import ( + "context" + "crypto/tls" + "log" + + "github.com/coredns/coredns/pb" + "github.com/coredns/coredns/plugin/pkg/trace" + "github.com/coredns/coredns/request" + + "github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc" + "github.com/miekg/dns" + opentracing "github.com/opentracing/opentracing-go" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" +) + +type grpcClient struct { + dialOpts []grpc.DialOption + clients map[string]pb.DnsServiceClient + conns []*grpc.ClientConn + upstream *staticUpstream +} + +func newGrpcClient(tls *tls.Config, u *staticUpstream) *grpcClient { + g := &grpcClient{upstream: u} + + if tls == nil { + g.dialOpts = append(g.dialOpts, grpc.WithInsecure()) + } else { + g.dialOpts = append(g.dialOpts, grpc.WithTransportCredentials(credentials.NewTLS(tls))) + } + g.clients = map[string]pb.DnsServiceClient{} + + return g +} + +func (g *grpcClient) Exchange(ctx context.Context, addr string, state request.Request) (*dns.Msg, error) { + msg, err := state.Req.Pack() + if err != nil { + return nil, err + } + + reply, err := g.clients[addr].Query(ctx, &pb.DnsPacket{Msg: msg}) + if err != nil { + return nil, err + } + d := new(dns.Msg) + err = d.Unpack(reply.Msg) + if err != nil { + return nil, err + } + return d, nil +} + +func (g *grpcClient) Transport() string { return "tcp" } + +func (g *grpcClient) Protocol() string { return "grpc" } + +func (g *grpcClient) OnShutdown(p *Proxy) error { + g.clients = map[string]pb.DnsServiceClient{} + for i, conn := range g.conns { + err := conn.Close() + if err != nil { + log.Printf("[WARNING] Error closing connection %d: %s\n", i, err) + } + } + g.conns = []*grpc.ClientConn{} + return nil +} + +func (g *grpcClient) OnStartup(p *Proxy) error { + dialOpts := g.dialOpts + if p.Trace != nil { + if t, ok := p.Trace.(trace.Trace); ok { + onlyIfParent := func(parentSpanCtx opentracing.SpanContext, method string, req, resp interface{}) bool { + return parentSpanCtx != nil + } + intercept := otgrpc.OpenTracingClientInterceptor(t.Tracer(), otgrpc.IncludingSpans(onlyIfParent)) + dialOpts = append(dialOpts, grpc.WithUnaryInterceptor(intercept)) + } else { + log.Printf("[WARNING] Wrong type for trace plugin reference: %s", p.Trace) + } + } + for _, host := range g.upstream.Hosts { + conn, err := grpc.Dial(host.Name, dialOpts...) + if err != nil { + log.Printf("[WARNING] Skipping gRPC host '%s' due to Dial error: %s\n", host.Name, err) + } else { + g.clients[host.Name] = pb.NewDnsServiceClient(conn) + g.conns = append(g.conns, conn) + } + } + return nil +} diff --git a/plugin/proxy/grpc_test.go b/plugin/proxy/grpc_test.go new file mode 100644 index 000000000..52c5737d6 --- /dev/null +++ b/plugin/proxy/grpc_test.go @@ -0,0 +1,71 @@ +package proxy + +import ( + "testing" + "time" + + "github.com/coredns/coredns/plugin/pkg/healthcheck" + + "google.golang.org/grpc/grpclog" +) + +func pool() []*healthcheck.UpstreamHost { + return []*healthcheck.UpstreamHost{ + { + Name: "localhost:10053", + }, + { + Name: "localhost:10054", + }, + } +} + +func TestStartupShutdown(t *testing.T) { + grpclog.SetLogger(discard{}) + + upstream := &staticUpstream{ + from: ".", + HealthCheck: healthcheck.HealthCheck{ + Hosts: pool(), + FailTimeout: 10 * time.Second, + Future: 60 * time.Second, + MaxFails: 1, + }, + } + g := newGrpcClient(nil, upstream) + upstream.ex = g + + p := &Proxy{} + p.Upstreams = &[]Upstream{upstream} + + err := g.OnStartup(p) + if err != nil { + t.Errorf("Error starting grpc client exchanger: %s", err) + return + } + if len(g.clients) != len(pool()) { + t.Errorf("Expected %d grpc clients but found %d", len(pool()), len(g.clients)) + } + + err = g.OnShutdown(p) + if err != nil { + t.Errorf("Error stopping grpc client exchanger: %s", err) + return + } + if len(g.clients) != 0 { + t.Errorf("Shutdown didn't remove clients, found %d", len(g.clients)) + } + if len(g.conns) != 0 { + t.Errorf("Shutdown didn't remove conns, found %d", len(g.conns)) + } +} + +// discard is a Logger that outputs nothing. +type discard struct{} + +func (d discard) Fatal(args ...interface{}) {} +func (d discard) Fatalf(format string, args ...interface{}) {} +func (d discard) Fatalln(args ...interface{}) {} +func (d discard) Print(args ...interface{}) {} +func (d discard) Printf(format string, args ...interface{}) {} +func (d discard) Println(args ...interface{}) {} diff --git a/plugin/proxy/lookup.go b/plugin/proxy/lookup.go new file mode 100644 index 000000000..9be62edd5 --- /dev/null +++ b/plugin/proxy/lookup.go @@ -0,0 +1,132 @@ +package proxy + +// functions other plugin might want to use to do lookup in the same style as the proxy. + +import ( + "context" + "fmt" + "sync/atomic" + "time" + + "github.com/coredns/coredns/plugin/pkg/healthcheck" + "github.com/coredns/coredns/request" + + "github.com/miekg/dns" +) + +// NewLookup create a new proxy with the hosts in host and a Random policy. +func NewLookup(hosts []string) Proxy { return NewLookupWithOption(hosts, Options{}) } + +// NewLookupWithOption process creates a simple round robin forward with potentially forced proto for upstream. +func NewLookupWithOption(hosts []string, opts Options) Proxy { + p := Proxy{Next: nil} + + // TODO(miek): this needs to be unified with upstream.go's NewStaticUpstreams, caddy uses NewHost + // we should copy/make something similar. + upstream := &staticUpstream{ + from: ".", + HealthCheck: healthcheck.HealthCheck{ + FailTimeout: 10 * time.Second, + MaxFails: 3, // TODO(miek): disable error checking for simple lookups? + Future: 60 * time.Second, + }, + ex: newDNSExWithOption(opts), + } + upstream.Hosts = make([]*healthcheck.UpstreamHost, len(hosts)) + + for i, host := range hosts { + uh := &healthcheck.UpstreamHost{ + Name: host, + Conns: 0, + Fails: 0, + FailTimeout: upstream.FailTimeout, + + CheckDown: func(upstream *staticUpstream) healthcheck.UpstreamHostDownFunc { + return func(uh *healthcheck.UpstreamHost) bool { + + down := false + + uh.CheckMu.Lock() + until := uh.OkUntil + uh.CheckMu.Unlock() + + if !until.IsZero() && time.Now().After(until) { + down = true + } + + fails := atomic.LoadInt32(&uh.Fails) + if fails >= upstream.MaxFails && upstream.MaxFails != 0 { + down = true + } + return down + } + }(upstream), + WithoutPathPrefix: upstream.WithoutPathPrefix, + } + + upstream.Hosts[i] = uh + } + p.Upstreams = &[]Upstream{upstream} + return p +} + +// Lookup will use name and type to forge a new message and will send that upstream. It will +// set any EDNS0 options correctly so that downstream will be able to process the reply. +func (p Proxy) Lookup(state request.Request, name string, typ uint16) (*dns.Msg, error) { + req := new(dns.Msg) + req.SetQuestion(name, typ) + state.SizeAndDo(req) + + state2 := request.Request{W: state.W, Req: req} + + return p.lookup(state2) +} + +// Forward forward the request in state as-is. Unlike Lookup that adds EDNS0 suffix to the message. +func (p Proxy) Forward(state request.Request) (*dns.Msg, error) { + return p.lookup(state) +} + +func (p Proxy) lookup(state request.Request) (*dns.Msg, error) { + upstream := p.match(state) + if upstream == nil { + return nil, errInvalidDomain + } + for { + start := time.Now() + reply := new(dns.Msg) + var backendErr error + + // Since Select() should give us "up" hosts, keep retrying + // hosts until timeout (or until we get a nil host). + for time.Since(start) < tryDuration { + host := upstream.Select() + if host == nil { + return nil, fmt.Errorf("%s: %s", errUnreachable, "no upstream host") + } + + // duplicated from proxy.go, but with a twist, we don't write the + // reply back to the client, we return it and there is no monitoring. + + atomic.AddInt64(&host.Conns, 1) + + reply, backendErr = upstream.Exchanger().Exchange(context.TODO(), host.Name, state) + + atomic.AddInt64(&host.Conns, -1) + + if backendErr == nil { + return reply, nil + } + timeout := host.FailTimeout + if timeout == 0 { + timeout = 10 * time.Second + } + atomic.AddInt32(&host.Fails, 1) + go func(host *healthcheck.UpstreamHost, timeout time.Duration) { + time.Sleep(timeout) + atomic.AddInt32(&host.Fails, -1) + }(host, timeout) + } + return nil, fmt.Errorf("%s: %s", errUnreachable, backendErr) + } +} diff --git a/plugin/proxy/metrics.go b/plugin/proxy/metrics.go new file mode 100644 index 000000000..893c26d6b --- /dev/null +++ b/plugin/proxy/metrics.go @@ -0,0 +1,30 @@ +package proxy + +import ( + "sync" + + "github.com/coredns/coredns/plugin" + + "github.com/prometheus/client_golang/prometheus" +) + +// Metrics the proxy plugin exports. +var ( + RequestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ + Namespace: plugin.Namespace, + Subsystem: "proxy", + Name: "request_duration_milliseconds", + Buckets: append(prometheus.DefBuckets, []float64{50, 100, 200, 500, 1000, 2000, 3000, 4000, 5000, 10000}...), + Help: "Histogram of the time (in milliseconds) each request took.", + }, []string{"proto", "proxy_proto", "from"}) +) + +// OnStartupMetrics sets up the metrics on startup. This is done for all proxy protocols. +func OnStartupMetrics() error { + metricsOnce.Do(func() { + prometheus.MustRegister(RequestDuration) + }) + return nil +} + +var metricsOnce sync.Once diff --git a/plugin/proxy/proxy.go b/plugin/proxy/proxy.go new file mode 100644 index 000000000..9d1e1906b --- /dev/null +++ b/plugin/proxy/proxy.go @@ -0,0 +1,195 @@ +// Package proxy is plugin that proxies requests. +package proxy + +import ( + "errors" + "fmt" + "sync/atomic" + "time" + + "github.com/coredns/coredns/plugin" + "github.com/coredns/coredns/plugin/dnstap" + "github.com/coredns/coredns/plugin/dnstap/msg" + "github.com/coredns/coredns/plugin/pkg/healthcheck" + "github.com/coredns/coredns/request" + + tap "github.com/dnstap/golang-dnstap" + "github.com/miekg/dns" + ot "github.com/opentracing/opentracing-go" + "golang.org/x/net/context" +) + +var ( + errUnreachable = errors.New("unreachable backend") + errInvalidProtocol = errors.New("invalid protocol") + errInvalidDomain = errors.New("invalid path for proxy") +) + +// Proxy represents a plugin instance that can proxy requests to another (DNS) server. +type Proxy struct { + Next plugin.Handler + + // Upstreams is a pointer to a slice, so we can update the upstream (used for Google) + // midway. + + Upstreams *[]Upstream + + // Trace is the Trace plugin, if it is installed + // This is used by the grpc exchanger to trace through the grpc calls + Trace plugin.Handler +} + +// Upstream manages a pool of proxy upstream hosts. Select should return a +// suitable upstream host, or nil if no such hosts are available. +type Upstream interface { + // The domain name this upstream host should be routed on. + From() string + // Selects an upstream host to be routed to. + Select() *healthcheck.UpstreamHost + // Checks if subpdomain is not an ignored. + IsAllowedDomain(string) bool + // Exchanger returns the exchanger to be used for this upstream. + Exchanger() Exchanger + // Stops the upstream from proxying requests to shutdown goroutines cleanly. + Stop() error +} + +// tryDuration is how long to try upstream hosts; failures result in +// immediate retries until this duration ends or we get a nil host. +var tryDuration = 60 * time.Second + +// ServeDNS satisfies the plugin.Handler interface. +func (p Proxy) ServeDNS(ctx context.Context, w dns.ResponseWriter, r *dns.Msg) (int, error) { + var span, child ot.Span + span = ot.SpanFromContext(ctx) + state := request.Request{W: w, Req: r} + + upstream := p.match(state) + if upstream == nil { + return plugin.NextOrFailure(p.Name(), p.Next, ctx, w, r) + } + + for { + start := time.Now() + reply := new(dns.Msg) + var backendErr error + + // Since Select() should give us "up" hosts, keep retrying + // hosts until timeout (or until we get a nil host). + for time.Since(start) < tryDuration { + host := upstream.Select() + if host == nil { + + RequestDuration.WithLabelValues(state.Proto(), upstream.Exchanger().Protocol(), upstream.From()).Observe(float64(time.Since(start) / time.Millisecond)) + + return dns.RcodeServerFailure, fmt.Errorf("%s: %s", errUnreachable, "no upstream host") + } + + if span != nil { + child = span.Tracer().StartSpan("exchange", ot.ChildOf(span.Context())) + ctx = ot.ContextWithSpan(ctx, child) + } + + atomic.AddInt64(&host.Conns, 1) + queryEpoch := msg.Epoch() + + reply, backendErr = upstream.Exchanger().Exchange(ctx, host.Name, state) + + respEpoch := msg.Epoch() + atomic.AddInt64(&host.Conns, -1) + + if child != nil { + child.Finish() + } + + taperr := toDnstap(ctx, host.Name, upstream.Exchanger(), state, reply, queryEpoch, respEpoch) + + if backendErr == nil { + w.WriteMsg(reply) + + RequestDuration.WithLabelValues(state.Proto(), upstream.Exchanger().Protocol(), upstream.From()).Observe(float64(time.Since(start) / time.Millisecond)) + + return 0, taperr + } + + timeout := host.FailTimeout + if timeout == 0 { + timeout = 10 * time.Second + } + atomic.AddInt32(&host.Fails, 1) + go func(host *healthcheck.UpstreamHost, timeout time.Duration) { + time.Sleep(timeout) + atomic.AddInt32(&host.Fails, -1) + }(host, timeout) + } + + RequestDuration.WithLabelValues(state.Proto(), upstream.Exchanger().Protocol(), upstream.From()).Observe(float64(time.Since(start) / time.Millisecond)) + + return dns.RcodeServerFailure, fmt.Errorf("%s: %s", errUnreachable, backendErr) + } +} + +func (p Proxy) match(state request.Request) (u Upstream) { + if p.Upstreams == nil { + return nil + } + + longestMatch := 0 + for _, upstream := range *p.Upstreams { + from := upstream.From() + + if !plugin.Name(from).Matches(state.Name()) || !upstream.IsAllowedDomain(state.Name()) { + continue + } + + if lf := len(from); lf > longestMatch { + longestMatch = lf + u = upstream + } + } + return u + +} + +// Name implements the Handler interface. +func (p Proxy) Name() string { return "proxy" } + +// defaultTimeout is the default networking timeout for DNS requests. +const defaultTimeout = 5 * time.Second + +func toDnstap(ctx context.Context, host string, ex Exchanger, state request.Request, reply *dns.Msg, queryEpoch, respEpoch uint64) (err error) { + if tapper := dnstap.TapperFromContext(ctx); tapper != nil { + // Query + b := tapper.TapBuilder() + b.TimeSec = queryEpoch + if err = b.HostPort(host); err != nil { + return + } + t := ex.Transport() + if t == "" { + t = state.Proto() + } + if t == "tcp" { + b.SocketProto = tap.SocketProtocol_TCP + } else { + b.SocketProto = tap.SocketProtocol_UDP + } + if err = b.Msg(state.Req); err != nil { + return + } + err = tapper.TapMessage(b.ToOutsideQuery(tap.Message_FORWARDER_QUERY)) + if err != nil { + return + } + + // Response + if reply != nil { + b.TimeSec = respEpoch + if err = b.Msg(reply); err != nil { + return + } + err = tapper.TapMessage(b.ToOutsideResponse(tap.Message_FORWARDER_RESPONSE)) + } + } + return +} diff --git a/plugin/proxy/proxy_test.go b/plugin/proxy/proxy_test.go new file mode 100644 index 000000000..b0cb9c3cb --- /dev/null +++ b/plugin/proxy/proxy_test.go @@ -0,0 +1,87 @@ +package proxy + +import ( + "fmt" + "net/http" + "net/http/httptest" + "strings" + "sync/atomic" + "testing" + "time" + + "github.com/mholt/caddy/caddyfile" +) + +func TestStop(t *testing.T) { + config := "proxy . %s {\n health_check /healthcheck:%s %dms \n}" + tests := []struct { + name string + intervalInMilliseconds int + numHealthcheckIntervals int + }{ + { + "No Healthchecks After Stop - 5ms, 1 intervals", + 5, + 1, + }, + { + "No Healthchecks After Stop - 5ms, 2 intervals", + 5, + 2, + }, + { + "No Healthchecks After Stop - 5ms, 3 intervals", + 5, + 3, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + + // Set up proxy. + var counter int64 + backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + r.Body.Close() + atomic.AddInt64(&counter, 1) + })) + + defer backend.Close() + + port := backend.URL[17:] // Remove all crap up to the port + back := backend.URL[7:] // Remove http:// + c := caddyfile.NewDispenser("Testfile", strings.NewReader(fmt.Sprintf(config, back, port, test.intervalInMilliseconds))) + upstreams, err := NewStaticUpstreams(&c) + if err != nil { + t.Error("Expected no error. Got:", err.Error()) + } + + // Give some time for healthchecks to hit the server. + time.Sleep(time.Duration(test.intervalInMilliseconds*test.numHealthcheckIntervals) * time.Millisecond) + + for _, upstream := range upstreams { + if err := upstream.Stop(); err != nil { + t.Error("Expected no error stopping upstream. Got: ", err.Error()) + } + } + + counterValueAfterShutdown := atomic.LoadInt64(&counter) + + // Give some time to see if healthchecks are still hitting the server. + time.Sleep(time.Duration(test.intervalInMilliseconds*test.numHealthcheckIntervals) * time.Millisecond) + + if counterValueAfterShutdown == 0 { + t.Error("Expected healthchecks to hit test server. Got no healthchecks.") + } + + // health checks are in a go routine now, so one may well occur after we shutdown, + // but we only ever expect one more + counterValueAfterWaiting := atomic.LoadInt64(&counter) + if counterValueAfterWaiting > (counterValueAfterShutdown + 1) { + t.Errorf("Expected no more healthchecks after shutdown. Got: %d healthchecks after shutdown", counterValueAfterWaiting-counterValueAfterShutdown) + } + + }) + + } +} diff --git a/plugin/proxy/response.go b/plugin/proxy/response.go new file mode 100644 index 000000000..2ad553c41 --- /dev/null +++ b/plugin/proxy/response.go @@ -0,0 +1,21 @@ +package proxy + +import ( + "net" + + "github.com/miekg/dns" +) + +type fakeBootWriter struct { + dns.ResponseWriter +} + +func (w *fakeBootWriter) LocalAddr() net.Addr { + local := net.ParseIP("127.0.0.1") + return &net.UDPAddr{IP: local, Port: 53} // Port is not used here +} + +func (w *fakeBootWriter) RemoteAddr() net.Addr { + remote := net.ParseIP("8.8.8.8") + return &net.UDPAddr{IP: remote, Port: 53} // Port is not used here +} diff --git a/plugin/proxy/setup.go b/plugin/proxy/setup.go new file mode 100644 index 000000000..bbe65c35d --- /dev/null +++ b/plugin/proxy/setup.go @@ -0,0 +1,46 @@ +package proxy + +import ( + "github.com/coredns/coredns/core/dnsserver" + "github.com/coredns/coredns/plugin" + + "github.com/mholt/caddy" +) + +func init() { + caddy.RegisterPlugin("proxy", caddy.Plugin{ + ServerType: "dns", + Action: setup, + }) +} + +func setup(c *caddy.Controller) error { + upstreams, err := NewStaticUpstreams(&c.Dispenser) + if err != nil { + return plugin.Error("proxy", err) + } + + t := dnsserver.GetConfig(c).Handler("trace") + P := &Proxy{Trace: t} + dnsserver.GetConfig(c).AddPlugin(func(next plugin.Handler) plugin.Handler { + P.Next = next + P.Upstreams = &upstreams + return P + }) + + c.OnStartup(OnStartupMetrics) + + for i := range upstreams { + u := upstreams[i] + c.OnStartup(func() error { + return u.Exchanger().OnStartup(P) + }) + c.OnShutdown(func() error { + return u.Exchanger().OnShutdown(P) + }) + // Register shutdown handlers. + c.OnShutdown(u.Stop) + } + + return nil +} diff --git a/plugin/proxy/upstream.go b/plugin/proxy/upstream.go new file mode 100644 index 000000000..b60b6ff58 --- /dev/null +++ b/plugin/proxy/upstream.go @@ -0,0 +1,234 @@ +package proxy + +import ( + "fmt" + "net" + "strconv" + "sync/atomic" + "time" + + "github.com/coredns/coredns/plugin" + "github.com/coredns/coredns/plugin/pkg/dnsutil" + "github.com/coredns/coredns/plugin/pkg/healthcheck" + "github.com/coredns/coredns/plugin/pkg/tls" + "github.com/mholt/caddy/caddyfile" + "github.com/miekg/dns" +) + +type staticUpstream struct { + from string + + healthcheck.HealthCheck + + WithoutPathPrefix string + IgnoredSubDomains []string + ex Exchanger +} + +// NewStaticUpstreams parses the configuration input and sets up +// static upstreams for the proxy plugin. +func NewStaticUpstreams(c *caddyfile.Dispenser) ([]Upstream, error) { + var upstreams []Upstream + for c.Next() { + upstream := &staticUpstream{ + from: ".", + HealthCheck: healthcheck.HealthCheck{ + FailTimeout: 10 * time.Second, + MaxFails: 1, + Future: 60 * time.Second, + }, + ex: newDNSEx(), + } + + if !c.Args(&upstream.from) { + return upstreams, c.ArgErr() + } + upstream.from = plugin.Host(upstream.from).Normalize() + + to := c.RemainingArgs() + if len(to) == 0 { + return upstreams, c.ArgErr() + } + + // process the host list, substituting in any nameservers in files + toHosts, err := dnsutil.ParseHostPortOrFile(to...) + if err != nil { + return upstreams, err + } + + for c.NextBlock() { + if err := parseBlock(c, upstream); err != nil { + return upstreams, err + } + } + + upstream.Hosts = make([]*healthcheck.UpstreamHost, len(toHosts)) + for i, host := range toHosts { + uh := &healthcheck.UpstreamHost{ + Name: host, + Conns: 0, + Fails: 0, + FailTimeout: upstream.FailTimeout, + + CheckDown: func(upstream *staticUpstream) healthcheck.UpstreamHostDownFunc { + return func(uh *healthcheck.UpstreamHost) bool { + + down := false + + uh.CheckMu.Lock() + until := uh.OkUntil + uh.CheckMu.Unlock() + + if !until.IsZero() && time.Now().After(until) { + down = true + } + + fails := atomic.LoadInt32(&uh.Fails) + if fails >= upstream.MaxFails && upstream.MaxFails != 0 { + down = true + } + return down + } + }(upstream), + WithoutPathPrefix: upstream.WithoutPathPrefix, + } + + upstream.Hosts[i] = uh + } + upstream.Start() + + upstreams = append(upstreams, upstream) + } + return upstreams, nil +} + +func (u *staticUpstream) From() string { + return u.from +} + +func parseBlock(c *caddyfile.Dispenser, u *staticUpstream) error { + switch c.Val() { + case "policy": + if !c.NextArg() { + return c.ArgErr() + } + policyCreateFunc, ok := healthcheck.SupportedPolicies[c.Val()] + if !ok { + return c.ArgErr() + } + u.Policy = policyCreateFunc() + case "fail_timeout": + if !c.NextArg() { + return c.ArgErr() + } + dur, err := time.ParseDuration(c.Val()) + if err != nil { + return err + } + u.FailTimeout = dur + case "max_fails": + if !c.NextArg() { + return c.ArgErr() + } + n, err := strconv.Atoi(c.Val()) + if err != nil { + return err + } + u.MaxFails = int32(n) + case "health_check": + if !c.NextArg() { + return c.ArgErr() + } + var err error + u.HealthCheck.Path, u.HealthCheck.Port, err = net.SplitHostPort(c.Val()) + if err != nil { + return err + } + u.HealthCheck.Interval = 30 * time.Second + if c.NextArg() { + dur, err := time.ParseDuration(c.Val()) + if err != nil { + return err + } + u.HealthCheck.Interval = dur + u.Future = 2 * dur + + // set a minimum of 3 seconds + if u.Future < (3 * time.Second) { + u.Future = 3 * time.Second + } + } + case "without": + if !c.NextArg() { + return c.ArgErr() + } + u.WithoutPathPrefix = c.Val() + case "except": + ignoredDomains := c.RemainingArgs() + if len(ignoredDomains) == 0 { + return c.ArgErr() + } + for i := 0; i < len(ignoredDomains); i++ { + ignoredDomains[i] = plugin.Host(ignoredDomains[i]).Normalize() + } + u.IgnoredSubDomains = ignoredDomains + case "spray": + u.Spray = &healthcheck.Spray{} + case "protocol": + encArgs := c.RemainingArgs() + if len(encArgs) == 0 { + return c.ArgErr() + } + switch encArgs[0] { + case "dns": + if len(encArgs) > 1 { + if encArgs[1] == "force_tcp" { + opts := Options{ForceTCP: true} + u.ex = newDNSExWithOption(opts) + } else { + return fmt.Errorf("only force_tcp allowed as parameter to dns") + } + } else { + u.ex = newDNSEx() + } + case "https_google": + boot := []string{"8.8.8.8:53", "8.8.4.4:53"} + if len(encArgs) > 2 && encArgs[1] == "bootstrap" { + boot = encArgs[2:] + } + + u.ex = newGoogle("", boot) // "" for default in google.go + case "grpc": + if len(encArgs) == 2 && encArgs[1] == "insecure" { + u.ex = newGrpcClient(nil, u) + return nil + } + tls, err := tls.NewTLSConfigFromArgs(encArgs[1:]...) + if err != nil { + return err + } + u.ex = newGrpcClient(tls, u) + default: + return fmt.Errorf("%s: %s", errInvalidProtocol, encArgs[0]) + } + + default: + return c.Errf("unknown property '%s'", c.Val()) + } + return nil +} + +func (u *staticUpstream) IsAllowedDomain(name string) bool { + if dns.Name(name) == dns.Name(u.From()) { + return true + } + + for _, ignoredSubDomain := range u.IgnoredSubDomains { + if plugin.Name(ignoredSubDomain).Matches(name) { + return false + } + } + return true +} + +func (u *staticUpstream) Exchanger() Exchanger { return u.ex } diff --git a/plugin/proxy/upstream_test.go b/plugin/proxy/upstream_test.go new file mode 100644 index 000000000..42d50cac3 --- /dev/null +++ b/plugin/proxy/upstream_test.go @@ -0,0 +1,324 @@ +package proxy + +import ( + "path/filepath" + "strings" + "testing" + + "github.com/coredns/coredns/plugin/test" + + "github.com/mholt/caddy" +) + +func TestAllowedDomain(t *testing.T) { + upstream := &staticUpstream{ + from: "miek.nl.", + IgnoredSubDomains: []string{"download.miek.nl.", "static.miek.nl."}, // closing dot mandatory + } + tests := []struct { + name string + expected bool + }{ + {"miek.nl.", true}, + {"download.miek.nl.", false}, + {"static.miek.nl.", false}, + {"blaat.miek.nl.", true}, + } + + for i, test := range tests { + isAllowed := upstream.IsAllowedDomain(test.name) + if test.expected != isAllowed { + t.Errorf("Test %d: expected %v found %v for %s", i+1, test.expected, isAllowed, test.name) + } + } +} + +func TestProxyParse(t *testing.T) { + rmFunc, cert, key, ca := getPEMFiles(t) + defer rmFunc() + + grpc1 := "proxy . 8.8.8.8:53 {\n protocol grpc " + ca + "\n}" + grpc2 := "proxy . 8.8.8.8:53 {\n protocol grpc " + cert + " " + key + "\n}" + grpc3 := "proxy . 8.8.8.8:53 {\n protocol grpc " + cert + " " + key + " " + ca + "\n}" + grpc4 := "proxy . 8.8.8.8:53 {\n protocol grpc " + key + "\n}" + + tests := []struct { + inputUpstreams string + shouldErr bool + }{ + { + `proxy . 8.8.8.8:53`, + false, + }, + { + `proxy 10.0.0.0/24 8.8.8.8:53`, + false, + }, + { + ` +proxy . 8.8.8.8:53 { + policy round_robin +}`, + false, + }, + { + ` +proxy . 8.8.8.8:53 { + fail_timeout 5s +}`, + false, + }, + { + ` +proxy . 8.8.8.8:53 { + max_fails 10 +}`, + false, + }, + { + ` +proxy . 8.8.8.8:53 { + health_check /health:8080 +}`, + false, + }, + { + ` +proxy . 8.8.8.8:53 { + without without +}`, + false, + }, + { + ` +proxy . 8.8.8.8:53 { + except miek.nl example.org 10.0.0.0/24 +}`, + false, + }, + { + ` +proxy . 8.8.8.8:53 { + spray +}`, + false, + }, + { + ` +proxy . 8.8.8.8:53 { + error_option +}`, + true, + }, + { + ` +proxy . some_bogus_filename`, + true, + }, + { + ` +proxy . 8.8.8.8:53 { + protocol dns +}`, + false, + }, + { + ` +proxy . 8.8.8.8:53 { + protocol grpc +}`, + false, + }, + { + ` +proxy . 8.8.8.8:53 { + protocol grpc insecure +}`, + false, + }, + { + ` +proxy . 8.8.8.8:53 { + protocol dns force_tcp +}`, + false, + }, + { + ` +proxy . 8.8.8.8:53 { + protocol grpc a b c d +}`, + true, + }, + { + grpc1, + false, + }, + { + grpc2, + false, + }, + { + grpc3, + false, + }, + { + grpc4, + true, + }, + { + ` +proxy . 8.8.8.8:53 { + protocol foobar +}`, + true, + }, + { + `proxy`, + true, + }, + { + ` +proxy . 8.8.8.8:53 { + protocol foobar +}`, + true, + }, + { + ` +proxy . 8.8.8.8:53 { + policy +}`, + true, + }, + { + ` +proxy . 8.8.8.8:53 { + fail_timeout +}`, + true, + }, + { + ` +proxy . 8.8.8.8:53 { + fail_timeout junky +}`, + true, + }, + { + ` +proxy . 8.8.8.8:53 { + health_check +}`, + true, + }, + { + ` +proxy . 8.8.8.8:53 { + protocol dns force +}`, + true, + }, + } + for i, test := range tests { + c := caddy.NewTestController("dns", test.inputUpstreams) + _, err := NewStaticUpstreams(&c.Dispenser) + if (err != nil) != test.shouldErr { + t.Errorf("Test %d expected no error, got %v for %s", i+1, err, test.inputUpstreams) + } + } +} + +func TestResolvParse(t *testing.T) { + tests := []struct { + inputUpstreams string + filedata string + shouldErr bool + expected []string + }{ + { + ` +proxy . FILE +`, + ` +nameserver 1.2.3.4 +nameserver 4.3.2.1 +`, + false, + []string{"1.2.3.4:53", "4.3.2.1:53"}, + }, + { + ` +proxy example.com 1.1.1.1:5000 +proxy . FILE +proxy example.org 2.2.2.2:1234 +`, + ` +nameserver 1.2.3.4 +`, + false, + []string{"1.1.1.1:5000", "1.2.3.4:53", "2.2.2.2:1234"}, + }, + { + ` +proxy example.com 1.1.1.1:5000 +proxy . FILE +proxy example.org 2.2.2.2:1234 +`, + ` +junky resolve.conf +`, + false, + []string{"1.1.1.1:5000", "2.2.2.2:1234"}, + }, + } + for i, tc := range tests { + + path, rm, err := test.TempFile(".", tc.filedata) + if err != nil { + t.Fatalf("Test %d could not creat temp file %v", i, err) + } + defer rm() + + config := strings.Replace(tc.inputUpstreams, "FILE", path, -1) + c := caddy.NewTestController("dns", config) + upstreams, err := NewStaticUpstreams(&c.Dispenser) + if (err != nil) != tc.shouldErr { + t.Errorf("Test %d expected no error, got %v", i+1, err) + } + var hosts []string + for _, u := range upstreams { + for _, h := range u.(*staticUpstream).Hosts { + hosts = append(hosts, h.Name) + } + } + if !tc.shouldErr { + if len(hosts) != len(tc.expected) { + t.Errorf("Test %d expected %d hosts got %d", i+1, len(tc.expected), len(upstreams)) + } else { + ok := true + for i, v := range tc.expected { + if v != hosts[i] { + ok = false + } + } + if !ok { + t.Errorf("Test %d expected %v got %v", i+1, tc.expected, upstreams) + } + } + } + } +} + +func getPEMFiles(t *testing.T) (rmFunc func(), cert, key, ca string) { + tempDir, rmFunc, err := test.WritePEMFiles("") + if err != nil { + t.Fatalf("Could not write PEM files: %s", err) + } + + cert = filepath.Join(tempDir, "cert.pem") + key = filepath.Join(tempDir, "key.pem") + ca = filepath.Join(tempDir, "ca.pem") + + return +} |