aboutsummaryrefslogtreecommitdiff
path: root/plugin/kubernetes/xfr.go
diff options
context:
space:
mode:
authorGravatar Miek Gieben <miek@miek.nl> 2020-07-07 21:38:07 +0200
committerGravatar GitHub <noreply@github.com> 2020-07-07 12:38:07 -0700
commit68f1dd5ddf0451cc3a1b24a72c2965b8d896ffba (patch)
treedfac4d4c60f1e3a088737e15119c7648f056e407 /plugin/kubernetes/xfr.go
parent435d27b58d813ccc01ecc6d8620b78929c516c25 (diff)
downloadcoredns-68f1dd5ddf0451cc3a1b24a72c2965b8d896ffba.tar.gz
coredns-68f1dd5ddf0451cc3a1b24a72c2965b8d896ffba.tar.zst
coredns-68f1dd5ddf0451cc3a1b24a72c2965b8d896ffba.zip
Implement notifies for transfer plugin (#3972)
* Fix notifies in transfer plugin Signed-off-by: Miek Gieben <miek@miek.nl> * Make it compile Signed-off-by: Miek Gieben <miek@miek.nl> * Port more plugins Signed-off-by: Miek Gieben <miek@miek.nl> * golint Signed-off-by: Miek Gieben <miek@miek.nl> * Fix tests Signed-off-by: Miek Gieben <miek@miek.nl> * Fix notifies in transfer plugin Signed-off-by: Miek Gieben <miek@miek.nl> * Make it compile Signed-off-by: Miek Gieben <miek@miek.nl> * Port more plugins Signed-off-by: Miek Gieben <miek@miek.nl> * golint Signed-off-by: Miek Gieben <miek@miek.nl> * Fix tests Signed-off-by: Miek Gieben <miek@miek.nl> * Fix tests Signed-off-by: Miek Gieben <miek@miek.nl> * really fix test Signed-off-by: Miek Gieben <miek@miek.nl> * Implement ixfr fallback and unify file and auto for transfering Signed-off-by: Miek Gieben <miek@miek.nl> * Add transfer tests copied and modified from #3452 Signed-off-by: Miek Gieben <miek@miek.nl> * Test correct selection of plugin Signed-off-by: Miek Gieben <miek@miek.nl> * add upstream back in Signed-off-by: Miek Gieben <miek@miek.nl> * Implement ixfr fallback and unify file and auto for transfering Signed-off-by: Miek Gieben <miek@miek.nl> * fix test Signed-off-by: Miek Gieben <miek@miek.nl> * properly merge Signed-off-by: Miek Gieben <miek@miek.nl>
Diffstat (limited to 'plugin/kubernetes/xfr.go')
-rw-r--r--plugin/kubernetes/xfr.go252
1 files changed, 98 insertions, 154 deletions
diff --git a/plugin/kubernetes/xfr.go b/plugin/kubernetes/xfr.go
index a3a0d4a4a..0392f0252 100644
--- a/plugin/kubernetes/xfr.go
+++ b/plugin/kubernetes/xfr.go
@@ -4,207 +4,151 @@ import (
"context"
"math"
"net"
+ "sort"
"strings"
"github.com/coredns/coredns/plugin"
"github.com/coredns/coredns/plugin/etcd/msg"
+ "github.com/coredns/coredns/plugin/transfer"
"github.com/coredns/coredns/request"
"github.com/miekg/dns"
api "k8s.io/api/core/v1"
)
-const transferLength = 2000
-
-// Serial implements the Transferer interface.
-func (k *Kubernetes) Serial(state request.Request) uint32 { return uint32(k.APIConn.Modified()) }
-
-// MinTTL implements the Transferer interface.
-func (k *Kubernetes) MinTTL(state request.Request) uint32 { return k.ttl }
-
-// Transfer implements the Transferer interface.
-func (k *Kubernetes) Transfer(ctx context.Context, state request.Request) (int, error) {
-
- if !k.transferAllowed(state) {
- return dns.RcodeRefused, nil
- }
-
- // Get all services.
- rrs := make(chan dns.RR)
- go k.transfer(rrs, state.Zone)
-
- records := []dns.RR{}
- for r := range rrs {
- records = append(records, r)
- }
-
- if len(records) == 0 {
- return dns.RcodeServerFailure, nil
- }
-
- ch := make(chan *dns.Envelope)
- tr := new(dns.Transfer)
-
- soa, err := plugin.SOA(ctx, k, state.Zone, state, plugin.Options{})
+// Transfer implements the transfer.Transfer interface.
+func (k *Kubernetes) Transfer(zone string, serial uint32) (<-chan []dns.RR, error) {
+ // state is not used here, hence the empty request.Request{]
+ soa, err := plugin.SOA(context.TODO(), k, zone, request.Request{}, plugin.Options{})
if err != nil {
- return dns.RcodeServerFailure, nil
+ return nil, transfer.ErrNotAuthoritative
}
- records = append(soa, records...)
- records = append(records, soa...)
- go func(ch chan *dns.Envelope) {
- j, l := 0, 0
- log.Infof("Outgoing transfer of %d records of zone %s to %s started", len(records), state.Zone, state.IP())
- for i, r := range records {
- l += dns.Len(r)
- if l > transferLength {
- ch <- &dns.Envelope{RR: records[j:i]}
- l = 0
- j = i
- }
- }
- if j < len(records) {
- ch <- &dns.Envelope{RR: records[j:]}
- }
- close(ch)
- }(ch)
+ ch := make(chan []dns.RR)
- tr.Out(state.W, state.Req, ch)
- // Defer closing to the client
- state.W.Hijack()
- return dns.RcodeSuccess, nil
-}
+ zonePath := msg.Path(zone, "coredns")
+ serviceList := k.APIConn.ServiceList()
-// transferAllowed checks if incoming request for transferring the zone is allowed according to the ACLs.
-// Note: This is copied from zone.transferAllowed, but should eventually be factored into a common transfer pkg.
-func (k *Kubernetes) transferAllowed(state request.Request) bool {
- for _, t := range k.TransferTo {
- if t == "*" {
- return true
- }
- // If remote IP matches we accept.
- remote := state.IP()
- to, _, err := net.SplitHostPort(t)
- if err != nil {
- continue
+ go func() {
+ // ixfr fallback
+ if serial != 0 && soa[0].(*dns.SOA).Serial == serial {
+ ch <- soa
+ close(ch)
+ return
}
- if to == remote {
- return true
- }
- }
- return false
-}
+ ch <- soa
-func (k *Kubernetes) transfer(c chan dns.RR, zone string) {
+ sort.Slice(serviceList, func(i, j int) bool {
+ return serviceList[i].Name < serviceList[j].Name
+ })
- defer close(c)
+ for _, svc := range serviceList {
+ if !k.namespaceExposed(svc.Namespace) {
+ continue
+ }
+ svcBase := []string{zonePath, Svc, svc.Namespace, svc.Name}
+ switch svc.Type {
- zonePath := msg.Path(zone, "coredns")
- serviceList := k.APIConn.ServiceList()
- for _, svc := range serviceList {
- if !k.namespaceExposed(svc.Namespace) {
- continue
- }
- svcBase := []string{zonePath, Svc, svc.Namespace, svc.Name}
- switch svc.Type {
- case api.ServiceTypeClusterIP, api.ServiceTypeNodePort, api.ServiceTypeLoadBalancer:
- clusterIP := net.ParseIP(svc.ClusterIP)
- if clusterIP != nil {
- s := msg.Service{Host: svc.ClusterIP, TTL: k.ttl}
- s.Key = strings.Join(svcBase, "/")
-
- // Change host from IP to Name for SRV records
- host := emitAddressRecord(c, s)
-
- for _, p := range svc.Ports {
- s := msg.Service{Host: host, Port: int(p.Port), TTL: k.ttl}
+ case api.ServiceTypeClusterIP, api.ServiceTypeNodePort, api.ServiceTypeLoadBalancer:
+ clusterIP := net.ParseIP(svc.ClusterIP)
+ if clusterIP != nil {
+ s := msg.Service{Host: svc.ClusterIP, TTL: k.ttl}
s.Key = strings.Join(svcBase, "/")
- // Need to generate this to handle use cases for peer-finder
- // ref: https://github.com/coredns/coredns/pull/823
- c <- s.NewSRV(msg.Domain(s.Key), 100)
+ // Change host from IP to Name for SRV records
+ host := emitAddressRecord(ch, s)
- // As per spec unnamed ports do not have a srv record
- // https://github.com/kubernetes/dns/blob/master/docs/specification.md#232---srv-records
- if p.Name == "" {
- continue
- }
+ for _, p := range svc.Ports {
+ s := msg.Service{Host: host, Port: int(p.Port), TTL: k.ttl}
+ s.Key = strings.Join(svcBase, "/")
- s.Key = strings.Join(append(svcBase, strings.ToLower("_"+string(p.Protocol)), strings.ToLower("_"+string(p.Name))), "/")
+ // Need to generate this to handle use cases for peer-finder
+ // ref: https://github.com/coredns/coredns/pull/823
+ ch <- []dns.RR{s.NewSRV(msg.Domain(s.Key), 100)}
- c <- s.NewSRV(msg.Domain(s.Key), 100)
- }
+ // As per spec unnamed ports do not have a srv record
+ // https://github.com/kubernetes/dns/blob/master/docs/specification.md#232---srv-records
+ if p.Name == "" {
+ continue
+ }
- // Skip endpoint discovery if clusterIP is defined
- continue
- }
+ s.Key = strings.Join(append(svcBase, strings.ToLower("_"+string(p.Protocol)), strings.ToLower("_"+string(p.Name))), "/")
- endpointsList := k.APIConn.EpIndex(svc.Name + "." + svc.Namespace)
+ ch <- []dns.RR{s.NewSRV(msg.Domain(s.Key), 100)}
+ }
- for _, ep := range endpointsList {
- if ep.Name != svc.Name || ep.Namespace != svc.Namespace {
+ // Skip endpoint discovery if clusterIP is defined
continue
}
- for _, eps := range ep.Subsets {
- srvWeight := calcSRVWeight(len(eps.Addresses))
- for _, addr := range eps.Addresses {
- s := msg.Service{Host: addr.IP, TTL: k.ttl}
- s.Key = strings.Join(svcBase, "/")
- // We don't need to change the msg.Service host from IP to Name yet
- // so disregard the return value here
- emitAddressRecord(c, s)
-
- s.Key = strings.Join(append(svcBase, endpointHostname(addr, k.endpointNameMode)), "/")
- // Change host from IP to Name for SRV records
- host := emitAddressRecord(c, s)
- s.Host = host
-
- for _, p := range eps.Ports {
- // As per spec unnamed ports do not have a srv record
- // https://github.com/kubernetes/dns/blob/master/docs/specification.md#232---srv-records
- if p.Name == "" {
- continue
- }
+ endpointsList := k.APIConn.EpIndex(svc.Name + "." + svc.Namespace)
- s.Port = int(p.Port)
+ for _, ep := range endpointsList {
+ if ep.Name != svc.Name || ep.Namespace != svc.Namespace {
+ continue
+ }
- s.Key = strings.Join(append(svcBase, strings.ToLower("_"+string(p.Protocol)), strings.ToLower("_"+string(p.Name))), "/")
- c <- s.NewSRV(msg.Domain(s.Key), srvWeight)
+ for _, eps := range ep.Subsets {
+ srvWeight := calcSRVWeight(len(eps.Addresses))
+ for _, addr := range eps.Addresses {
+ s := msg.Service{Host: addr.IP, TTL: k.ttl}
+ s.Key = strings.Join(svcBase, "/")
+ // We don't need to change the msg.Service host from IP to Name yet
+ // so disregard the return value here
+ emitAddressRecord(ch, s)
+
+ s.Key = strings.Join(append(svcBase, endpointHostname(addr, k.endpointNameMode)), "/")
+ // Change host from IP to Name for SRV records
+ host := emitAddressRecord(ch, s)
+ s.Host = host
+
+ for _, p := range eps.Ports {
+ // As per spec unnamed ports do not have a srv record
+ // https://github.com/kubernetes/dns/blob/master/docs/specification.md#232---srv-records
+ if p.Name == "" {
+ continue
+ }
+
+ s.Port = int(p.Port)
+
+ s.Key = strings.Join(append(svcBase, strings.ToLower("_"+string(p.Protocol)), strings.ToLower("_"+string(p.Name))), "/")
+ ch <- []dns.RR{s.NewSRV(msg.Domain(s.Key), srvWeight)}
+ }
}
}
}
- }
- case api.ServiceTypeExternalName:
+ case api.ServiceTypeExternalName:
- s := msg.Service{Key: strings.Join(svcBase, "/"), Host: svc.ExternalName, TTL: k.ttl}
- if t, _ := s.HostType(); t == dns.TypeCNAME {
- c <- s.NewCNAME(msg.Domain(s.Key), s.Host)
+ s := msg.Service{Key: strings.Join(svcBase, "/"), Host: svc.ExternalName, TTL: k.ttl}
+ if t, _ := s.HostType(); t == dns.TypeCNAME {
+ ch <- []dns.RR{s.NewCNAME(msg.Domain(s.Key), s.Host)}
+ }
}
}
- }
+ ch <- soa
+ close(ch)
+ }()
+ return ch, nil
}
-// emitAddressRecord generates a new A or AAAA record based on the msg.Service and writes it to
-// a channel.
+// emitAddressRecord generates a new A or AAAA record based on the msg.Service and writes it to a channel.
// emitAddressRecord returns the host name from the generated record.
-func emitAddressRecord(c chan dns.RR, message msg.Service) string {
- ip := net.ParseIP(message.Host)
- var host string
- dnsType, _ := message.HostType()
+func emitAddressRecord(c chan<- []dns.RR, s msg.Service) string {
+ ip := net.ParseIP(s.Host)
+ dnsType, _ := s.HostType()
switch dnsType {
case dns.TypeA:
- arec := message.NewA(msg.Domain(message.Key), ip)
- host = arec.Hdr.Name
- c <- arec
+ r := s.NewA(msg.Domain(s.Key), ip)
+ c <- []dns.RR{r}
+ return r.Hdr.Name
case dns.TypeAAAA:
- arec := message.NewAAAA(msg.Domain(message.Key), ip)
- host = arec.Hdr.Name
- c <- arec
+ r := s.NewAAAA(msg.Domain(s.Key), ip)
+ c <- []dns.RR{r}
+ return r.Hdr.Name
}
- return host
+ return ""
}
// calcSrvWeight borrows the logic implemented in plugin.SRV for dynamically