aboutsummaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/grpclb.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/grpclb.go')
-rw-r--r--vendor/google.golang.org/grpc/grpclb.go151
1 files changed, 49 insertions, 102 deletions
diff --git a/vendor/google.golang.org/grpc/grpclb.go b/vendor/google.golang.org/grpc/grpclb.go
index 619985e60..db56ff362 100644
--- a/vendor/google.golang.org/grpc/grpclb.go
+++ b/vendor/google.golang.org/grpc/grpclb.go
@@ -28,7 +28,7 @@ import (
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
- lbpb "google.golang.org/grpc/grpclb/grpc_lb_v1"
+ lbmpb "google.golang.org/grpc/grpclb/grpc_lb_v1/messages"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/naming"
@@ -59,41 +59,21 @@ type balanceLoadClientStream struct {
ClientStream
}
-func (x *balanceLoadClientStream) Send(m *lbpb.LoadBalanceRequest) error {
+func (x *balanceLoadClientStream) Send(m *lbmpb.LoadBalanceRequest) error {
return x.ClientStream.SendMsg(m)
}
-func (x *balanceLoadClientStream) Recv() (*lbpb.LoadBalanceResponse, error) {
- m := new(lbpb.LoadBalanceResponse)
+func (x *balanceLoadClientStream) Recv() (*lbmpb.LoadBalanceResponse, error) {
+ m := new(lbmpb.LoadBalanceResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
-// AddressType indicates the address type returned by name resolution.
-type AddressType uint8
-
-const (
- // Backend indicates the server is a backend server.
- Backend AddressType = iota
- // GRPCLB indicates the server is a grpclb load balancer.
- GRPCLB
-)
-
-// AddrMetadataGRPCLB contains the information the name resolver for grpclb should provide. The
-// name resolver used by the grpclb balancer is required to provide this type of metadata in
-// its address updates.
-type AddrMetadataGRPCLB struct {
- // AddrType is the type of server (grpc load balancer or backend).
- AddrType AddressType
- // ServerName is the name of the grpc load balancer. Used for authentication.
- ServerName string
-}
-
// NewGRPCLBBalancer creates a grpclb load balancer.
func NewGRPCLBBalancer(r naming.Resolver) Balancer {
- return &balancer{
+ return &grpclbBalancer{
r: r,
}
}
@@ -116,25 +96,24 @@ type grpclbAddrInfo struct {
dropForLoadBalancing bool
}
-type balancer struct {
- r naming.Resolver
- target string
- mu sync.Mutex
- seq int // a sequence number to make sure addrCh does not get stale addresses.
- w naming.Watcher
- addrCh chan []Address
- rbs []remoteBalancerInfo
- addrs []*grpclbAddrInfo
- next int
- waitCh chan struct{}
- done bool
- expTimer *time.Timer
- rand *rand.Rand
-
- clientStats lbpb.ClientStats
+type grpclbBalancer struct {
+ r naming.Resolver
+ target string
+ mu sync.Mutex
+ seq int // a sequence number to make sure addrCh does not get stale addresses.
+ w naming.Watcher
+ addrCh chan []Address
+ rbs []remoteBalancerInfo
+ addrs []*grpclbAddrInfo
+ next int
+ waitCh chan struct{}
+ done bool
+ rand *rand.Rand
+
+ clientStats lbmpb.ClientStats
}
-func (b *balancer) watchAddrUpdates(w naming.Watcher, ch chan []remoteBalancerInfo) error {
+func (b *grpclbBalancer) watchAddrUpdates(w naming.Watcher, ch chan []remoteBalancerInfo) error {
updates, err := w.Next()
if err != nil {
grpclog.Warningf("grpclb: failed to get next addr update from watcher: %v", err)
@@ -159,18 +138,18 @@ func (b *balancer) watchAddrUpdates(w naming.Watcher, ch chan []remoteBalancerIn
if exist {
continue
}
- md, ok := update.Metadata.(*AddrMetadataGRPCLB)
+ md, ok := update.Metadata.(*naming.AddrMetadataGRPCLB)
if !ok {
// TODO: Revisit the handling here and may introduce some fallback mechanism.
grpclog.Errorf("The name resolution contains unexpected metadata %v", update.Metadata)
continue
}
switch md.AddrType {
- case Backend:
+ case naming.Backend:
// TODO: Revisit the handling here and may introduce some fallback mechanism.
grpclog.Errorf("The name resolution does not give grpclb addresses")
continue
- case GRPCLB:
+ case naming.GRPCLB:
b.rbs = append(b.rbs, remoteBalancerInfo{
addr: update.Addr,
name: md.ServerName,
@@ -201,34 +180,18 @@ func (b *balancer) watchAddrUpdates(w naming.Watcher, ch chan []remoteBalancerIn
return nil
}
-func (b *balancer) serverListExpire(seq int) {
- b.mu.Lock()
- defer b.mu.Unlock()
- // TODO: gRPC interanls do not clear the connections when the server list is stale.
- // This means RPCs will keep using the existing server list until b receives new
- // server list even though the list is expired. Revisit this behavior later.
- if b.done || seq < b.seq {
- return
- }
- b.next = 0
- b.addrs = nil
- // Ask grpc internals to close all the corresponding connections.
- b.addrCh <- nil
-}
-
-func convertDuration(d *lbpb.Duration) time.Duration {
+func convertDuration(d *lbmpb.Duration) time.Duration {
if d == nil {
return 0
}
return time.Duration(d.Seconds)*time.Second + time.Duration(d.Nanos)*time.Nanosecond
}
-func (b *balancer) processServerList(l *lbpb.ServerList, seq int) {
+func (b *grpclbBalancer) processServerList(l *lbmpb.ServerList, seq int) {
if l == nil {
return
}
servers := l.GetServers()
- expiration := convertDuration(l.GetExpirationInterval())
var (
sl []*grpclbAddrInfo
addrs []Address
@@ -263,20 +226,11 @@ func (b *balancer) processServerList(l *lbpb.ServerList, seq int) {
b.next = 0
b.addrs = sl
b.addrCh <- addrs
- if b.expTimer != nil {
- b.expTimer.Stop()
- b.expTimer = nil
- }
- if expiration > 0 {
- b.expTimer = time.AfterFunc(expiration, func() {
- b.serverListExpire(seq)
- })
- }
}
return
}
-func (b *balancer) sendLoadReport(s *balanceLoadClientStream, interval time.Duration, done <-chan struct{}) {
+func (b *grpclbBalancer) sendLoadReport(s *balanceLoadClientStream, interval time.Duration, done <-chan struct{}) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
@@ -287,15 +241,15 @@ func (b *balancer) sendLoadReport(s *balanceLoadClientStream, interval time.Dura
}
b.mu.Lock()
stats := b.clientStats
- b.clientStats = lbpb.ClientStats{} // Clear the stats.
+ b.clientStats = lbmpb.ClientStats{} // Clear the stats.
b.mu.Unlock()
t := time.Now()
- stats.Timestamp = &lbpb.Timestamp{
+ stats.Timestamp = &lbmpb.Timestamp{
Seconds: t.Unix(),
Nanos: int32(t.Nanosecond()),
}
- if err := s.Send(&lbpb.LoadBalanceRequest{
- LoadBalanceRequestType: &lbpb.LoadBalanceRequest_ClientStats{
+ if err := s.Send(&lbmpb.LoadBalanceRequest{
+ LoadBalanceRequestType: &lbmpb.LoadBalanceRequest_ClientStats{
ClientStats: &stats,
},
}); err != nil {
@@ -305,7 +259,7 @@ func (b *balancer) sendLoadReport(s *balanceLoadClientStream, interval time.Dura
}
}
-func (b *balancer) callRemoteBalancer(lbc *loadBalancerClient, seq int) (retry bool) {
+func (b *grpclbBalancer) callRemoteBalancer(lbc *loadBalancerClient, seq int) (retry bool) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream, err := lbc.BalanceLoad(ctx)
@@ -319,9 +273,9 @@ func (b *balancer) callRemoteBalancer(lbc *loadBalancerClient, seq int) (retry b
return
}
b.mu.Unlock()
- initReq := &lbpb.LoadBalanceRequest{
- LoadBalanceRequestType: &lbpb.LoadBalanceRequest_InitialRequest{
- InitialRequest: &lbpb.InitialLoadBalanceRequest{
+ initReq := &lbmpb.LoadBalanceRequest{
+ LoadBalanceRequestType: &lbmpb.LoadBalanceRequest_InitialRequest{
+ InitialRequest: &lbmpb.InitialLoadBalanceRequest{
Name: b.target,
},
},
@@ -351,7 +305,7 @@ func (b *balancer) callRemoteBalancer(lbc *loadBalancerClient, seq int) (retry b
streamDone := make(chan struct{})
defer close(streamDone)
b.mu.Lock()
- b.clientStats = lbpb.ClientStats{} // Clear client stats.
+ b.clientStats = lbmpb.ClientStats{} // Clear client stats.
b.mu.Unlock()
if d := convertDuration(initResp.ClientStatsReportInterval); d > 0 {
go b.sendLoadReport(stream, d, streamDone)
@@ -378,7 +332,7 @@ func (b *balancer) callRemoteBalancer(lbc *loadBalancerClient, seq int) (retry b
return true
}
-func (b *balancer) Start(target string, config BalancerConfig) error {
+func (b *grpclbBalancer) Start(target string, config BalancerConfig) error {
b.rand = rand.New(rand.NewSource(time.Now().Unix()))
// TODO: Fall back to the basic direct connection if there is no name resolver.
if b.r == nil {
@@ -507,8 +461,11 @@ func (b *balancer) Start(target string, config BalancerConfig) error {
// WithDialer takes a different type of function, so we instead use a special DialOption here.
dopts = append(dopts, func(o *dialOptions) { o.copts.Dialer = dialer })
}
+ dopts = append(dopts, WithBlock())
ccError = make(chan struct{})
- cc, err = Dial(rb.addr, dopts...)
+ ctx, cancel := context.WithTimeout(context.Background(), time.Second)
+ cc, err = DialContext(ctx, rb.addr, dopts...)
+ cancel()
if err != nil {
grpclog.Warningf("grpclb: failed to setup a connection to the remote balancer %v: %v", rb.addr, err)
close(ccError)
@@ -534,7 +491,7 @@ func (b *balancer) Start(target string, config BalancerConfig) error {
return nil
}
-func (b *balancer) down(addr Address, err error) {
+func (b *grpclbBalancer) down(addr Address, err error) {
b.mu.Lock()
defer b.mu.Unlock()
for _, a := range b.addrs {
@@ -545,7 +502,7 @@ func (b *balancer) down(addr Address, err error) {
}
}
-func (b *balancer) Up(addr Address) func(error) {
+func (b *grpclbBalancer) Up(addr Address) func(error) {
b.mu.Lock()
defer b.mu.Unlock()
if b.done {
@@ -573,7 +530,7 @@ func (b *balancer) Up(addr Address) func(error) {
}
}
-func (b *balancer) Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error) {
+func (b *grpclbBalancer) Get(ctx context.Context, opts BalancerGetOptions) (addr Address, put func(), err error) {
var ch chan struct{}
b.mu.Lock()
if b.done {
@@ -643,17 +600,10 @@ func (b *balancer) Get(ctx context.Context, opts BalancerGetOptions) (addr Addre
}
}
if !opts.BlockingWait {
- if len(b.addrs) == 0 {
- b.clientStats.NumCallsFinished++
- b.clientStats.NumCallsFinishedWithClientFailedToSend++
- b.mu.Unlock()
- err = Errorf(codes.Unavailable, "there is no address available")
- return
- }
- // Returns the next addr on b.addrs for a failfast RPC.
- addr = b.addrs[b.next].addr
- b.next++
+ b.clientStats.NumCallsFinished++
+ b.clientStats.NumCallsFinishedWithClientFailedToSend++
b.mu.Unlock()
+ err = Errorf(codes.Unavailable, "there is no address available")
return
}
// Wait on b.waitCh for non-failfast RPCs.
@@ -730,20 +680,17 @@ func (b *balancer) Get(ctx context.Context, opts BalancerGetOptions) (addr Addre
}
}
-func (b *balancer) Notify() <-chan []Address {
+func (b *grpclbBalancer) Notify() <-chan []Address {
return b.addrCh
}
-func (b *balancer) Close() error {
+func (b *grpclbBalancer) Close() error {
b.mu.Lock()
defer b.mu.Unlock()
if b.done {
return errBalancerClosed
}
b.done = true
- if b.expTimer != nil {
- b.expTimer.Stop()
- }
if b.waitCh != nil {
close(b.waitCh)
}