diff options
author | 2020-07-07 21:38:07 +0200 | |
---|---|---|
committer | 2020-07-07 12:38:07 -0700 | |
commit | 68f1dd5ddf0451cc3a1b24a72c2965b8d896ffba (patch) | |
tree | dfac4d4c60f1e3a088737e15119c7648f056e407 /plugin/kubernetes/xfr.go | |
parent | 435d27b58d813ccc01ecc6d8620b78929c516c25 (diff) | |
download | coredns-68f1dd5ddf0451cc3a1b24a72c2965b8d896ffba.tar.gz coredns-68f1dd5ddf0451cc3a1b24a72c2965b8d896ffba.tar.zst coredns-68f1dd5ddf0451cc3a1b24a72c2965b8d896ffba.zip |
Implement notifies for transfer plugin (#3972)
* Fix notifies in transfer plugin
Signed-off-by: Miek Gieben <miek@miek.nl>
* Make it compile
Signed-off-by: Miek Gieben <miek@miek.nl>
* Port more plugins
Signed-off-by: Miek Gieben <miek@miek.nl>
* golint
Signed-off-by: Miek Gieben <miek@miek.nl>
* Fix tests
Signed-off-by: Miek Gieben <miek@miek.nl>
* Fix notifies in transfer plugin
Signed-off-by: Miek Gieben <miek@miek.nl>
* Make it compile
Signed-off-by: Miek Gieben <miek@miek.nl>
* Port more plugins
Signed-off-by: Miek Gieben <miek@miek.nl>
* golint
Signed-off-by: Miek Gieben <miek@miek.nl>
* Fix tests
Signed-off-by: Miek Gieben <miek@miek.nl>
* Fix tests
Signed-off-by: Miek Gieben <miek@miek.nl>
* really fix test
Signed-off-by: Miek Gieben <miek@miek.nl>
* Implement ixfr fallback and unify file and auto for transfering
Signed-off-by: Miek Gieben <miek@miek.nl>
* Add transfer tests
copied and modified from #3452
Signed-off-by: Miek Gieben <miek@miek.nl>
* Test correct selection of plugin
Signed-off-by: Miek Gieben <miek@miek.nl>
* add upstream back in
Signed-off-by: Miek Gieben <miek@miek.nl>
* Implement ixfr fallback and unify file and auto for transfering
Signed-off-by: Miek Gieben <miek@miek.nl>
* fix test
Signed-off-by: Miek Gieben <miek@miek.nl>
* properly merge
Signed-off-by: Miek Gieben <miek@miek.nl>
Diffstat (limited to 'plugin/kubernetes/xfr.go')
-rw-r--r-- | plugin/kubernetes/xfr.go | 252 |
1 files changed, 98 insertions, 154 deletions
diff --git a/plugin/kubernetes/xfr.go b/plugin/kubernetes/xfr.go index a3a0d4a4a..0392f0252 100644 --- a/plugin/kubernetes/xfr.go +++ b/plugin/kubernetes/xfr.go @@ -4,207 +4,151 @@ import ( "context" "math" "net" + "sort" "strings" "github.com/coredns/coredns/plugin" "github.com/coredns/coredns/plugin/etcd/msg" + "github.com/coredns/coredns/plugin/transfer" "github.com/coredns/coredns/request" "github.com/miekg/dns" api "k8s.io/api/core/v1" ) -const transferLength = 2000 - -// Serial implements the Transferer interface. -func (k *Kubernetes) Serial(state request.Request) uint32 { return uint32(k.APIConn.Modified()) } - -// MinTTL implements the Transferer interface. -func (k *Kubernetes) MinTTL(state request.Request) uint32 { return k.ttl } - -// Transfer implements the Transferer interface. -func (k *Kubernetes) Transfer(ctx context.Context, state request.Request) (int, error) { - - if !k.transferAllowed(state) { - return dns.RcodeRefused, nil - } - - // Get all services. - rrs := make(chan dns.RR) - go k.transfer(rrs, state.Zone) - - records := []dns.RR{} - for r := range rrs { - records = append(records, r) - } - - if len(records) == 0 { - return dns.RcodeServerFailure, nil - } - - ch := make(chan *dns.Envelope) - tr := new(dns.Transfer) - - soa, err := plugin.SOA(ctx, k, state.Zone, state, plugin.Options{}) +// Transfer implements the transfer.Transfer interface. +func (k *Kubernetes) Transfer(zone string, serial uint32) (<-chan []dns.RR, error) { + // state is not used here, hence the empty request.Request{] + soa, err := plugin.SOA(context.TODO(), k, zone, request.Request{}, plugin.Options{}) if err != nil { - return dns.RcodeServerFailure, nil + return nil, transfer.ErrNotAuthoritative } - records = append(soa, records...) - records = append(records, soa...) - go func(ch chan *dns.Envelope) { - j, l := 0, 0 - log.Infof("Outgoing transfer of %d records of zone %s to %s started", len(records), state.Zone, state.IP()) - for i, r := range records { - l += dns.Len(r) - if l > transferLength { - ch <- &dns.Envelope{RR: records[j:i]} - l = 0 - j = i - } - } - if j < len(records) { - ch <- &dns.Envelope{RR: records[j:]} - } - close(ch) - }(ch) + ch := make(chan []dns.RR) - tr.Out(state.W, state.Req, ch) - // Defer closing to the client - state.W.Hijack() - return dns.RcodeSuccess, nil -} + zonePath := msg.Path(zone, "coredns") + serviceList := k.APIConn.ServiceList() -// transferAllowed checks if incoming request for transferring the zone is allowed according to the ACLs. -// Note: This is copied from zone.transferAllowed, but should eventually be factored into a common transfer pkg. -func (k *Kubernetes) transferAllowed(state request.Request) bool { - for _, t := range k.TransferTo { - if t == "*" { - return true - } - // If remote IP matches we accept. - remote := state.IP() - to, _, err := net.SplitHostPort(t) - if err != nil { - continue + go func() { + // ixfr fallback + if serial != 0 && soa[0].(*dns.SOA).Serial == serial { + ch <- soa + close(ch) + return } - if to == remote { - return true - } - } - return false -} + ch <- soa -func (k *Kubernetes) transfer(c chan dns.RR, zone string) { + sort.Slice(serviceList, func(i, j int) bool { + return serviceList[i].Name < serviceList[j].Name + }) - defer close(c) + for _, svc := range serviceList { + if !k.namespaceExposed(svc.Namespace) { + continue + } + svcBase := []string{zonePath, Svc, svc.Namespace, svc.Name} + switch svc.Type { - zonePath := msg.Path(zone, "coredns") - serviceList := k.APIConn.ServiceList() - for _, svc := range serviceList { - if !k.namespaceExposed(svc.Namespace) { - continue - } - svcBase := []string{zonePath, Svc, svc.Namespace, svc.Name} - switch svc.Type { - case api.ServiceTypeClusterIP, api.ServiceTypeNodePort, api.ServiceTypeLoadBalancer: - clusterIP := net.ParseIP(svc.ClusterIP) - if clusterIP != nil { - s := msg.Service{Host: svc.ClusterIP, TTL: k.ttl} - s.Key = strings.Join(svcBase, "/") - - // Change host from IP to Name for SRV records - host := emitAddressRecord(c, s) - - for _, p := range svc.Ports { - s := msg.Service{Host: host, Port: int(p.Port), TTL: k.ttl} + case api.ServiceTypeClusterIP, api.ServiceTypeNodePort, api.ServiceTypeLoadBalancer: + clusterIP := net.ParseIP(svc.ClusterIP) + if clusterIP != nil { + s := msg.Service{Host: svc.ClusterIP, TTL: k.ttl} s.Key = strings.Join(svcBase, "/") - // Need to generate this to handle use cases for peer-finder - // ref: https://github.com/coredns/coredns/pull/823 - c <- s.NewSRV(msg.Domain(s.Key), 100) + // Change host from IP to Name for SRV records + host := emitAddressRecord(ch, s) - // As per spec unnamed ports do not have a srv record - // https://github.com/kubernetes/dns/blob/master/docs/specification.md#232---srv-records - if p.Name == "" { - continue - } + for _, p := range svc.Ports { + s := msg.Service{Host: host, Port: int(p.Port), TTL: k.ttl} + s.Key = strings.Join(svcBase, "/") - s.Key = strings.Join(append(svcBase, strings.ToLower("_"+string(p.Protocol)), strings.ToLower("_"+string(p.Name))), "/") + // Need to generate this to handle use cases for peer-finder + // ref: https://github.com/coredns/coredns/pull/823 + ch <- []dns.RR{s.NewSRV(msg.Domain(s.Key), 100)} - c <- s.NewSRV(msg.Domain(s.Key), 100) - } + // As per spec unnamed ports do not have a srv record + // https://github.com/kubernetes/dns/blob/master/docs/specification.md#232---srv-records + if p.Name == "" { + continue + } - // Skip endpoint discovery if clusterIP is defined - continue - } + s.Key = strings.Join(append(svcBase, strings.ToLower("_"+string(p.Protocol)), strings.ToLower("_"+string(p.Name))), "/") - endpointsList := k.APIConn.EpIndex(svc.Name + "." + svc.Namespace) + ch <- []dns.RR{s.NewSRV(msg.Domain(s.Key), 100)} + } - for _, ep := range endpointsList { - if ep.Name != svc.Name || ep.Namespace != svc.Namespace { + // Skip endpoint discovery if clusterIP is defined continue } - for _, eps := range ep.Subsets { - srvWeight := calcSRVWeight(len(eps.Addresses)) - for _, addr := range eps.Addresses { - s := msg.Service{Host: addr.IP, TTL: k.ttl} - s.Key = strings.Join(svcBase, "/") - // We don't need to change the msg.Service host from IP to Name yet - // so disregard the return value here - emitAddressRecord(c, s) - - s.Key = strings.Join(append(svcBase, endpointHostname(addr, k.endpointNameMode)), "/") - // Change host from IP to Name for SRV records - host := emitAddressRecord(c, s) - s.Host = host - - for _, p := range eps.Ports { - // As per spec unnamed ports do not have a srv record - // https://github.com/kubernetes/dns/blob/master/docs/specification.md#232---srv-records - if p.Name == "" { - continue - } + endpointsList := k.APIConn.EpIndex(svc.Name + "." + svc.Namespace) - s.Port = int(p.Port) + for _, ep := range endpointsList { + if ep.Name != svc.Name || ep.Namespace != svc.Namespace { + continue + } - s.Key = strings.Join(append(svcBase, strings.ToLower("_"+string(p.Protocol)), strings.ToLower("_"+string(p.Name))), "/") - c <- s.NewSRV(msg.Domain(s.Key), srvWeight) + for _, eps := range ep.Subsets { + srvWeight := calcSRVWeight(len(eps.Addresses)) + for _, addr := range eps.Addresses { + s := msg.Service{Host: addr.IP, TTL: k.ttl} + s.Key = strings.Join(svcBase, "/") + // We don't need to change the msg.Service host from IP to Name yet + // so disregard the return value here + emitAddressRecord(ch, s) + + s.Key = strings.Join(append(svcBase, endpointHostname(addr, k.endpointNameMode)), "/") + // Change host from IP to Name for SRV records + host := emitAddressRecord(ch, s) + s.Host = host + + for _, p := range eps.Ports { + // As per spec unnamed ports do not have a srv record + // https://github.com/kubernetes/dns/blob/master/docs/specification.md#232---srv-records + if p.Name == "" { + continue + } + + s.Port = int(p.Port) + + s.Key = strings.Join(append(svcBase, strings.ToLower("_"+string(p.Protocol)), strings.ToLower("_"+string(p.Name))), "/") + ch <- []dns.RR{s.NewSRV(msg.Domain(s.Key), srvWeight)} + } } } } - } - case api.ServiceTypeExternalName: + case api.ServiceTypeExternalName: - s := msg.Service{Key: strings.Join(svcBase, "/"), Host: svc.ExternalName, TTL: k.ttl} - if t, _ := s.HostType(); t == dns.TypeCNAME { - c <- s.NewCNAME(msg.Domain(s.Key), s.Host) + s := msg.Service{Key: strings.Join(svcBase, "/"), Host: svc.ExternalName, TTL: k.ttl} + if t, _ := s.HostType(); t == dns.TypeCNAME { + ch <- []dns.RR{s.NewCNAME(msg.Domain(s.Key), s.Host)} + } } } - } + ch <- soa + close(ch) + }() + return ch, nil } -// emitAddressRecord generates a new A or AAAA record based on the msg.Service and writes it to -// a channel. +// emitAddressRecord generates a new A or AAAA record based on the msg.Service and writes it to a channel. // emitAddressRecord returns the host name from the generated record. -func emitAddressRecord(c chan dns.RR, message msg.Service) string { - ip := net.ParseIP(message.Host) - var host string - dnsType, _ := message.HostType() +func emitAddressRecord(c chan<- []dns.RR, s msg.Service) string { + ip := net.ParseIP(s.Host) + dnsType, _ := s.HostType() switch dnsType { case dns.TypeA: - arec := message.NewA(msg.Domain(message.Key), ip) - host = arec.Hdr.Name - c <- arec + r := s.NewA(msg.Domain(s.Key), ip) + c <- []dns.RR{r} + return r.Hdr.Name case dns.TypeAAAA: - arec := message.NewAAAA(msg.Domain(message.Key), ip) - host = arec.Hdr.Name - c <- arec + r := s.NewAAAA(msg.Domain(s.Key), ip) + c <- []dns.RR{r} + return r.Hdr.Name } - return host + return "" } // calcSrvWeight borrows the logic implemented in plugin.SRV for dynamically |