aboutsummaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/clientconn.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/clientconn.go')
-rw-r--r--vendor/google.golang.org/grpc/clientconn.go479
1 files changed, 299 insertions, 180 deletions
diff --git a/vendor/google.golang.org/grpc/clientconn.go b/vendor/google.golang.org/grpc/clientconn.go
index f542d8bd0..e3e3140f1 100644
--- a/vendor/google.golang.org/grpc/clientconn.go
+++ b/vendor/google.golang.org/grpc/clientconn.go
@@ -1,33 +1,18 @@
/*
*
- * Copyright 2014, Google Inc.
- * All rights reserved.
+ * Copyright 2014 gRPC authors.
*
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
*
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*
*/
@@ -35,14 +20,14 @@ package grpc
import (
"errors"
- "fmt"
- "math"
"net"
+ "strings"
"sync"
"time"
"golang.org/x/net/context"
"golang.org/x/net/trace"
+ "google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/keepalive"
@@ -56,8 +41,7 @@ var (
ErrClientConnClosing = errors.New("grpc: the client connection is closing")
// ErrClientConnTimeout indicates that the ClientConn cannot establish the
// underlying connections within the specified timeout.
- // DEPRECATED: Please use context.DeadlineExceeded instead. This error will be
- // removed in Q1 2017.
+ // DEPRECATED: Please use context.DeadlineExceeded instead.
ErrClientConnTimeout = errors.New("grpc: timed out when dialing")
// errNoTransportSecurity indicates that there is no transport security
@@ -79,6 +63,8 @@ var (
errConnClosing = errors.New("grpc: the connection is closing")
// errConnUnavailable indicates that the connection is unavailable.
errConnUnavailable = errors.New("grpc: the connection is unavailable")
+ // errBalancerClosed indicates that the balancer is closed.
+ errBalancerClosed = errors.New("grpc: balancer is closed")
// minimum time to give a connection to complete
minConnectTimeout = 20 * time.Second
)
@@ -86,30 +72,54 @@ var (
// dialOptions configure a Dial call. dialOptions are set by the DialOption
// values passed to Dial.
type dialOptions struct {
- unaryInt UnaryClientInterceptor
- streamInt StreamClientInterceptor
- codec Codec
- cp Compressor
- dc Decompressor
- bs backoffStrategy
- balancer Balancer
- block bool
- insecure bool
- timeout time.Duration
- scChan <-chan ServiceConfig
- copts transport.ConnectOptions
- maxMsgSize int
+ unaryInt UnaryClientInterceptor
+ streamInt StreamClientInterceptor
+ codec Codec
+ cp Compressor
+ dc Decompressor
+ bs backoffStrategy
+ balancer Balancer
+ block bool
+ insecure bool
+ timeout time.Duration
+ scChan <-chan ServiceConfig
+ copts transport.ConnectOptions
+ callOptions []CallOption
}
-const defaultClientMaxMsgSize = math.MaxInt32
+const (
+ defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4
+ defaultClientMaxSendMessageSize = 1024 * 1024 * 4
+)
// DialOption configures how we set up the connection.
type DialOption func(*dialOptions)
-// WithMaxMsgSize returns a DialOption which sets the maximum message size the client can receive.
+// WithInitialWindowSize returns a DialOption which sets the value for initial window size on a stream.
+// The lower bound for window size is 64K and any value smaller than that will be ignored.
+func WithInitialWindowSize(s int32) DialOption {
+ return func(o *dialOptions) {
+ o.copts.InitialWindowSize = s
+ }
+}
+
+// WithInitialConnWindowSize returns a DialOption which sets the value for initial window size on a connection.
+// The lower bound for window size is 64K and any value smaller than that will be ignored.
+func WithInitialConnWindowSize(s int32) DialOption {
+ return func(o *dialOptions) {
+ o.copts.InitialConnWindowSize = s
+ }
+}
+
+// WithMaxMsgSize returns a DialOption which sets the maximum message size the client can receive. Deprecated: use WithDefaultCallOptions(MaxCallRecvMsgSize(s)) instead.
func WithMaxMsgSize(s int) DialOption {
+ return WithDefaultCallOptions(MaxCallRecvMsgSize(s))
+}
+
+// WithDefaultCallOptions returns a DialOption which sets the default CallOptions for calls over the connection.
+func WithDefaultCallOptions(cos ...CallOption) DialOption {
return func(o *dialOptions) {
- o.maxMsgSize = s
+ o.callOptions = append(o.callOptions, cos...)
}
}
@@ -204,7 +214,7 @@ func WithTransportCredentials(creds credentials.TransportCredentials) DialOption
}
// WithPerRPCCredentials returns a DialOption which sets
-// credentials which will place auth state on each outbound RPC.
+// credentials and places auth state on each outbound RPC.
func WithPerRPCCredentials(creds credentials.PerRPCCredentials) DialOption {
return func(o *dialOptions) {
o.copts.PerRPCCredentials = append(o.copts.PerRPCCredentials, creds)
@@ -213,6 +223,7 @@ func WithPerRPCCredentials(creds credentials.PerRPCCredentials) DialOption {
// WithTimeout returns a DialOption that configures a timeout for dialing a ClientConn
// initially. This is valid if and only if WithBlock() is present.
+// Deprecated: use DialContext and context.WithTimeout instead.
func WithTimeout(d time.Duration) DialOption {
return func(o *dialOptions) {
o.timeout = d
@@ -241,7 +252,7 @@ func WithStatsHandler(h stats.Handler) DialOption {
}
}
-// FailOnNonTempDialError returns a DialOption that specified if gRPC fails on non-temporary dial errors.
+// FailOnNonTempDialError returns a DialOption that specifies if gRPC fails on non-temporary dial errors.
// If f is true, and dialer returns a non-temporary error, gRPC will fail the connection to the network
// address and won't try to reconnect.
// The default value of FailOnNonTempDialError is false.
@@ -295,17 +306,18 @@ func Dial(target string, opts ...DialOption) (*ClientConn, error) {
}
// DialContext creates a client connection to the given target. ctx can be used to
-// cancel or expire the pending connecting. Once this function returns, the
+// cancel or expire the pending connection. Once this function returns, the
// cancellation and expiration of ctx will be noop. Users should call ClientConn.Close
// to terminate all the pending operations after this function returns.
-// This is the EXPERIMENTAL API.
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
cc := &ClientConn{
target: target,
+ csMgr: &connectivityStateManager{},
conns: make(map[Address]*addrConn),
}
+ cc.csEvltr = &connectivityStateEvaluator{csMgr: cc.csMgr}
cc.ctx, cc.cancel = context.WithCancel(context.Background())
- cc.dopts.maxMsgSize = defaultClientMaxMsgSize
+
for _, opt := range opts {
opt(&cc.dopts)
}
@@ -343,15 +355,16 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
}
}()
+ scSet := false
if cc.dopts.scChan != nil {
- // Wait for the initial service config.
+ // Try to get an initial service config.
select {
case sc, ok := <-cc.dopts.scChan:
if ok {
cc.sc = sc
+ scSet = true
}
- case <-ctx.Done():
- return nil, ctx.Err()
+ default:
}
}
// Set defaults.
@@ -382,6 +395,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
}
config := BalancerConfig{
DialCreds: credsClone,
+ Dialer: cc.dopts.copts.Dialer,
}
if err := cc.dopts.balancer.Start(target, config); err != nil {
waitC <- err
@@ -413,7 +427,17 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
return nil, err
}
}
-
+ if cc.dopts.scChan != nil && !scSet {
+ // Blocking wait for the initial service config.
+ select {
+ case sc, ok := <-cc.dopts.scChan:
+ if ok {
+ cc.sc = sc
+ }
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ }
+ }
if cc.dopts.scChan != nil {
go cc.scWatcher()
}
@@ -421,39 +445,97 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
return cc, nil
}
-// ConnectivityState indicates the state of a client connection.
-type ConnectivityState int
+// connectivityStateEvaluator gets updated by addrConns when their
+// states transition, based on which it evaluates the state of
+// ClientConn.
+// Note: This code will eventually sit in the balancer in the new design.
+type connectivityStateEvaluator struct {
+ csMgr *connectivityStateManager
+ mu sync.Mutex
+ numReady uint64 // Number of addrConns in ready state.
+ numConnecting uint64 // Number of addrConns in connecting state.
+ numTransientFailure uint64 // Number of addrConns in transientFailure.
+}
-const (
- // Idle indicates the ClientConn is idle.
- Idle ConnectivityState = iota
- // Connecting indicates the ClienConn is connecting.
- Connecting
- // Ready indicates the ClientConn is ready for work.
- Ready
- // TransientFailure indicates the ClientConn has seen a failure but expects to recover.
- TransientFailure
- // Shutdown indicates the ClientConn has started shutting down.
- Shutdown
-)
+// recordTransition records state change happening in every addrConn and based on
+// that it evaluates what state the ClientConn is in.
+// It can only transition between connectivity.Ready, connectivity.Connecting and connectivity.TransientFailure. Other states,
+// Idle and connectivity.Shutdown are transitioned into by ClientConn; in the begining of the connection
+// before any addrConn is created ClientConn is in idle state. In the end when ClientConn
+// closes it is in connectivity.Shutdown state.
+// TODO Note that in later releases, a ClientConn with no activity will be put into an Idle state.
+func (cse *connectivityStateEvaluator) recordTransition(oldState, newState connectivity.State) {
+ cse.mu.Lock()
+ defer cse.mu.Unlock()
+
+ // Update counters.
+ for idx, state := range []connectivity.State{oldState, newState} {
+ updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new.
+ switch state {
+ case connectivity.Ready:
+ cse.numReady += updateVal
+ case connectivity.Connecting:
+ cse.numConnecting += updateVal
+ case connectivity.TransientFailure:
+ cse.numTransientFailure += updateVal
+ }
+ }
+
+ // Evaluate.
+ if cse.numReady > 0 {
+ cse.csMgr.updateState(connectivity.Ready)
+ return
+ }
+ if cse.numConnecting > 0 {
+ cse.csMgr.updateState(connectivity.Connecting)
+ return
+ }
+ cse.csMgr.updateState(connectivity.TransientFailure)
+}
-func (s ConnectivityState) String() string {
- switch s {
- case Idle:
- return "IDLE"
- case Connecting:
- return "CONNECTING"
- case Ready:
- return "READY"
- case TransientFailure:
- return "TRANSIENT_FAILURE"
- case Shutdown:
- return "SHUTDOWN"
- default:
- panic(fmt.Sprintf("unknown connectivity state: %d", s))
+// connectivityStateManager keeps the connectivity.State of ClientConn.
+// This struct will eventually be exported so the balancers can access it.
+type connectivityStateManager struct {
+ mu sync.Mutex
+ state connectivity.State
+ notifyChan chan struct{}
+}
+
+// updateState updates the connectivity.State of ClientConn.
+// If there's a change it notifies goroutines waiting on state change to
+// happen.
+func (csm *connectivityStateManager) updateState(state connectivity.State) {
+ csm.mu.Lock()
+ defer csm.mu.Unlock()
+ if csm.state == connectivity.Shutdown {
+ return
+ }
+ if csm.state == state {
+ return
+ }
+ csm.state = state
+ if csm.notifyChan != nil {
+ // There are other goroutines waiting on this channel.
+ close(csm.notifyChan)
+ csm.notifyChan = nil
}
}
+func (csm *connectivityStateManager) getState() connectivity.State {
+ csm.mu.Lock()
+ defer csm.mu.Unlock()
+ return csm.state
+}
+
+func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} {
+ csm.mu.Lock()
+ defer csm.mu.Unlock()
+ if csm.notifyChan == nil {
+ csm.notifyChan = make(chan struct{})
+ }
+ return csm.notifyChan
+}
+
// ClientConn represents a client connection to an RPC server.
type ClientConn struct {
ctx context.Context
@@ -462,14 +544,38 @@ type ClientConn struct {
target string
authority string
dopts dialOptions
+ csMgr *connectivityStateManager
+ csEvltr *connectivityStateEvaluator // This will eventually be part of balancer.
mu sync.RWMutex
sc ServiceConfig
conns map[Address]*addrConn
- // Keepalive parameter can be udated if a GoAway is received.
+ // Keepalive parameter can be updated if a GoAway is received.
mkp keepalive.ClientParameters
}
+// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
+// ctx expires. A true value is returned in former case and false in latter.
+// This is an EXPERIMENTAL API.
+func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool {
+ ch := cc.csMgr.getNotifyChan()
+ if cc.csMgr.getState() != sourceState {
+ return true
+ }
+ select {
+ case <-ctx.Done():
+ return false
+ case <-ch:
+ return true
+ }
+}
+
+// GetState returns the connectivity.State of ClientConn.
+// This is an EXPERIMENTAL API.
+func (cc *ClientConn) GetState() connectivity.State {
+ return cc.csMgr.getState()
+}
+
// lbWatcher watches the Notify channel of the balancer in cc and manages
// connections accordingly. If doneChan is not nil, it is closed after the
// first successfull connection is made.
@@ -500,14 +606,18 @@ func (cc *ClientConn) lbWatcher(doneChan chan struct{}) {
}
cc.mu.Unlock()
for _, a := range add {
+ var err error
if doneChan != nil {
- err := cc.resetAddrConn(a, true, nil)
+ err = cc.resetAddrConn(a, true, nil)
if err == nil {
close(doneChan)
doneChan = nil
}
} else {
- cc.resetAddrConn(a, false, nil)
+ err = cc.resetAddrConn(a, false, nil)
+ }
+ if err != nil {
+ grpclog.Warningf("Error creating connection to %v. Err: %v", a, err)
}
}
for _, c := range del {
@@ -537,17 +647,18 @@ func (cc *ClientConn) scWatcher() {
// resetAddrConn creates an addrConn for addr and adds it to cc.conns.
// If there is an old addrConn for addr, it will be torn down, using tearDownErr as the reason.
// If tearDownErr is nil, errConnDrain will be used instead.
+//
+// We should never need to replace an addrConn with a new one. This function is only used
+// as newAddrConn to create new addrConn.
+// TODO rename this function and clean up the code.
func (cc *ClientConn) resetAddrConn(addr Address, block bool, tearDownErr error) error {
ac := &addrConn{
cc: cc,
addr: addr,
dopts: cc.dopts,
}
- cc.mu.RLock()
- ac.dopts.copts.KeepaliveParams = cc.mkp
- cc.mu.RUnlock()
ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
- ac.stateCV = sync.NewCond(&ac.mu)
+ ac.csEvltr = cc.csEvltr
if EnableTracing {
ac.events = trace.NewEventLog("grpc.ClientConn", ac.addr.Addr)
}
@@ -576,10 +687,7 @@ func (cc *ClientConn) resetAddrConn(addr Address, block bool, tearDownErr error)
cc.mu.Unlock()
if stale != nil {
// There is an addrConn alive on ac.addr already. This could be due to
- // 1) a buggy Balancer notifies duplicated Addresses;
- // 2) goaway was received, a new ac will replace the old ac.
- // The old ac should be deleted from cc.conns, but the
- // underlying transport should drain rather than close.
+ // a buggy Balancer that reports duplicated Addresses.
if tearDownErr == nil {
// tearDownErr is nil if resetAddrConn is called by
// 1) Dial
@@ -610,7 +718,7 @@ func (cc *ClientConn) resetAddrConn(addr Address, block bool, tearDownErr error)
// Start a goroutine connecting to the server asynchronously.
go func() {
if err := ac.resetTransport(false); err != nil {
- grpclog.Printf("Failed to dial %s: %v; please retry.", ac.addr.Addr, err)
+ grpclog.Warningf("Failed to dial %s: %v; please retry.", ac.addr.Addr, err)
if err != errConnClosing {
// Keep this ac in cc.conns, to get the reason it's torn down.
ac.tearDown(err)
@@ -623,12 +731,23 @@ func (cc *ClientConn) resetAddrConn(addr Address, block bool, tearDownErr error)
return nil
}
-// TODO: Avoid the locking here.
-func (cc *ClientConn) getMethodConfig(method string) (m MethodConfig, ok bool) {
+// GetMethodConfig gets the method config of the input method.
+// If there's an exact match for input method (i.e. /service/method), we return
+// the corresponding MethodConfig.
+// If there isn't an exact match for the input method, we look for the default config
+// under the service (i.e /service/). If there is a default MethodConfig for
+// the serivce, we return it.
+// Otherwise, we return an empty MethodConfig.
+func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
+ // TODO: Avoid the locking here.
cc.mu.RLock()
defer cc.mu.RUnlock()
- m, ok = cc.sc.Methods[method]
- return
+ m, ok := cc.sc.Methods[method]
+ if !ok {
+ i := strings.LastIndex(method, "/")
+ m, _ = cc.sc.Methods[method[:i+1]]
+ }
+ return m
}
func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions) (transport.ClientTransport, func(), error) {
@@ -696,6 +815,7 @@ func (cc *ClientConn) Close() error {
}
conns := cc.conns
cc.conns = nil
+ cc.csMgr.updateState(connectivity.Shutdown)
cc.mu.Unlock()
if cc.dopts.balancer != nil {
cc.dopts.balancer.Close()
@@ -716,10 +836,11 @@ type addrConn struct {
dopts dialOptions
events trace.EventLog
- mu sync.Mutex
- state ConnectivityState
- stateCV *sync.Cond
- down func(error) // the handler called when a connection is down.
+ csEvltr *connectivityStateEvaluator
+
+ mu sync.Mutex
+ state connectivity.State
+ down func(error) // the handler called when a connection is down.
// ready is closed and becomes nil when a new transport is up or failed
// due to timeout.
ready chan struct{}
@@ -759,62 +880,41 @@ func (ac *addrConn) errorf(format string, a ...interface{}) {
}
}
-// getState returns the connectivity state of the Conn
-func (ac *addrConn) getState() ConnectivityState {
- ac.mu.Lock()
- defer ac.mu.Unlock()
- return ac.state
-}
-
-// waitForStateChange blocks until the state changes to something other than the sourceState.
-func (ac *addrConn) waitForStateChange(ctx context.Context, sourceState ConnectivityState) (ConnectivityState, error) {
+// resetTransport recreates a transport to the address for ac.
+// For the old transport:
+// - if drain is true, it will be gracefully closed.
+// - otherwise, it will be closed.
+func (ac *addrConn) resetTransport(drain bool) error {
ac.mu.Lock()
- defer ac.mu.Unlock()
- if sourceState != ac.state {
- return ac.state, nil
+ if ac.state == connectivity.Shutdown {
+ ac.mu.Unlock()
+ return errConnClosing
}
- done := make(chan struct{})
- var err error
- go func() {
- select {
- case <-ctx.Done():
- ac.mu.Lock()
- err = ctx.Err()
- ac.stateCV.Broadcast()
- ac.mu.Unlock()
- case <-done:
- }
- }()
- defer close(done)
- for sourceState == ac.state {
- ac.stateCV.Wait()
- if err != nil {
- return ac.state, err
- }
+ ac.printf("connecting")
+ if ac.down != nil {
+ ac.down(downErrorf(false, true, "%v", errNetworkIO))
+ ac.down = nil
}
- return ac.state, nil
-}
-
-func (ac *addrConn) resetTransport(closeTransport bool) error {
+ oldState := ac.state
+ ac.state = connectivity.Connecting
+ ac.csEvltr.recordTransition(oldState, ac.state)
+ t := ac.transport
+ ac.transport = nil
+ ac.mu.Unlock()
+ if t != nil && !drain {
+ t.Close()
+ }
+ ac.cc.mu.RLock()
+ ac.dopts.copts.KeepaliveParams = ac.cc.mkp
+ ac.cc.mu.RUnlock()
for retries := 0; ; retries++ {
ac.mu.Lock()
- ac.printf("connecting")
- if ac.state == Shutdown {
+ if ac.state == connectivity.Shutdown {
// ac.tearDown(...) has been invoked.
ac.mu.Unlock()
return errConnClosing
}
- if ac.down != nil {
- ac.down(downErrorf(false, true, "%v", errNetworkIO))
- ac.down = nil
- }
- ac.state = Connecting
- ac.stateCV.Broadcast()
- t := ac.transport
ac.mu.Unlock()
- if closeTransport && t != nil {
- t.Close()
- }
sleepTime := ac.dopts.bs.backoff(retries)
timeout := minConnectTimeout
if timeout < sleepTime {
@@ -835,39 +935,43 @@ func (ac *addrConn) resetTransport(closeTransport bool) error {
if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() {
return err
}
- grpclog.Printf("grpc: addrConn.resetTransport failed to create client transport: %v; Reconnecting to %v", err, ac.addr)
+ grpclog.Warningf("grpc: addrConn.resetTransport failed to create client transport: %v; Reconnecting to %v", err, ac.addr)
ac.mu.Lock()
- if ac.state == Shutdown {
+ if ac.state == connectivity.Shutdown {
// ac.tearDown(...) has been invoked.
ac.mu.Unlock()
return errConnClosing
}
ac.errorf("transient failure: %v", err)
- ac.state = TransientFailure
- ac.stateCV.Broadcast()
+ oldState = ac.state
+ ac.state = connectivity.TransientFailure
+ ac.csEvltr.recordTransition(oldState, ac.state)
if ac.ready != nil {
close(ac.ready)
ac.ready = nil
}
ac.mu.Unlock()
- closeTransport = false
+ timer := time.NewTimer(sleepTime - time.Since(connectTime))
select {
- case <-time.After(sleepTime - time.Since(connectTime)):
+ case <-timer.C:
case <-ac.ctx.Done():
+ timer.Stop()
return ac.ctx.Err()
}
+ timer.Stop()
continue
}
ac.mu.Lock()
ac.printf("ready")
- if ac.state == Shutdown {
+ if ac.state == connectivity.Shutdown {
// ac.tearDown(...) has been invoked.
ac.mu.Unlock()
newTransport.Close()
return errConnClosing
}
- ac.state = Ready
- ac.stateCV.Broadcast()
+ oldState = ac.state
+ ac.state = connectivity.Ready
+ ac.csEvltr.recordTransition(oldState, ac.state)
ac.transport = newTransport
if ac.ready != nil {
close(ac.ready)
@@ -900,19 +1004,25 @@ func (ac *addrConn) transportMonitor() {
return
case <-t.GoAway():
ac.adjustParams(t.GetGoAwayReason())
- // If GoAway happens without any network I/O error, ac is closed without shutting down the
- // underlying transport (the transport will be closed when all the pending RPCs finished or
- // failed.).
- // If GoAway and some network I/O error happen concurrently, ac and its underlying transport
- // are closed.
- // In both cases, a new ac is created.
+ // If GoAway happens without any network I/O error, the underlying transport
+ // will be gracefully closed, and a new transport will be created.
+ // (The transport will be closed when all the pending RPCs finished or failed.)
+ // If GoAway and some network I/O error happen concurrently, the underlying transport
+ // will be closed, and a new transport will be created.
+ var drain bool
select {
case <-t.Error():
- ac.cc.resetAddrConn(ac.addr, false, errNetworkIO)
default:
- ac.cc.resetAddrConn(ac.addr, false, errConnDrain)
+ drain = true
+ }
+ if err := ac.resetTransport(drain); err != nil {
+ grpclog.Infof("get error from resetTransport %v, transportMonitor returning", err)
+ if err != errConnClosing {
+ // Keep this ac in cc.conns, to get the reason it's torn down.
+ ac.tearDown(err)
+ }
+ return
}
- return
case <-t.Error():
select {
case <-ac.ctx.Done():
@@ -920,24 +1030,32 @@ func (ac *addrConn) transportMonitor() {
return
case <-t.GoAway():
ac.adjustParams(t.GetGoAwayReason())
- ac.cc.resetAddrConn(ac.addr, false, errNetworkIO)
- return
+ if err := ac.resetTransport(false); err != nil {
+ grpclog.Infof("get error from resetTransport %v, transportMonitor returning", err)
+ if err != errConnClosing {
+ // Keep this ac in cc.conns, to get the reason it's torn down.
+ ac.tearDown(err)
+ }
+ return
+ }
default:
}
ac.mu.Lock()
- if ac.state == Shutdown {
+ if ac.state == connectivity.Shutdown {
// ac has been shutdown.
ac.mu.Unlock()
return
}
- ac.state = TransientFailure
- ac.stateCV.Broadcast()
+ oldState := ac.state
+ ac.state = connectivity.TransientFailure
+ ac.csEvltr.recordTransition(oldState, ac.state)
ac.mu.Unlock()
- if err := ac.resetTransport(true); err != nil {
+ if err := ac.resetTransport(false); err != nil {
+ grpclog.Infof("get error from resetTransport %v, transportMonitor returning", err)
ac.mu.Lock()
ac.printf("transport exiting: %v", err)
ac.mu.Unlock()
- grpclog.Printf("grpc: addrConn.transportMonitor exits due to: %v", err)
+ grpclog.Warningf("grpc: addrConn.transportMonitor exits due to: %v", err)
if err != errConnClosing {
// Keep this ac in cc.conns, to get the reason it's torn down.
ac.tearDown(err)
@@ -949,12 +1067,12 @@ func (ac *addrConn) transportMonitor() {
}
// wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed or
-// iv) transport is in TransientFailure and there is a balancer/failfast is true.
+// iv) transport is in connectivity.TransientFailure and there is a balancer/failfast is true.
func (ac *addrConn) wait(ctx context.Context, hasBalancer, failfast bool) (transport.ClientTransport, error) {
for {
ac.mu.Lock()
switch {
- case ac.state == Shutdown:
+ case ac.state == connectivity.Shutdown:
if failfast || !hasBalancer {
// RPC is failfast or balancer is nil. This RPC should fail with ac.tearDownErr.
err := ac.tearDownErr
@@ -963,11 +1081,11 @@ func (ac *addrConn) wait(ctx context.Context, hasBalancer, failfast bool) (trans
}
ac.mu.Unlock()
return nil, errConnClosing
- case ac.state == Ready:
+ case ac.state == connectivity.Ready:
ct := ac.transport
ac.mu.Unlock()
return ct, nil
- case ac.state == TransientFailure:
+ case ac.state == connectivity.TransientFailure:
if failfast || hasBalancer {
ac.mu.Unlock()
return nil, errConnUnavailable
@@ -1009,12 +1127,13 @@ func (ac *addrConn) tearDown(err error) {
// address removal and GoAway.
ac.transport.GracefulClose()
}
- if ac.state == Shutdown {
+ if ac.state == connectivity.Shutdown {
return
}
- ac.state = Shutdown
+ oldState := ac.state
+ ac.state = connectivity.Shutdown
ac.tearDownErr = err
- ac.stateCV.Broadcast()
+ ac.csEvltr.recordTransition(oldState, ac.state)
if ac.events != nil {
ac.events.Finish()
ac.events = nil