diff options
Diffstat (limited to 'vendor/google.golang.org/grpc/clientconn.go')
-rw-r--r-- | vendor/google.golang.org/grpc/clientconn.go | 479 |
1 files changed, 299 insertions, 180 deletions
diff --git a/vendor/google.golang.org/grpc/clientconn.go b/vendor/google.golang.org/grpc/clientconn.go index f542d8bd0..e3e3140f1 100644 --- a/vendor/google.golang.org/grpc/clientconn.go +++ b/vendor/google.golang.org/grpc/clientconn.go @@ -1,33 +1,18 @@ /* * - * Copyright 2014, Google Inc. - * All rights reserved. + * Copyright 2014 gRPC authors. * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. + * http://www.apache.org/licenses/LICENSE-2.0 * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. * */ @@ -35,14 +20,14 @@ package grpc import ( "errors" - "fmt" - "math" "net" + "strings" "sync" "time" "golang.org/x/net/context" "golang.org/x/net/trace" + "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/keepalive" @@ -56,8 +41,7 @@ var ( ErrClientConnClosing = errors.New("grpc: the client connection is closing") // ErrClientConnTimeout indicates that the ClientConn cannot establish the // underlying connections within the specified timeout. - // DEPRECATED: Please use context.DeadlineExceeded instead. This error will be - // removed in Q1 2017. + // DEPRECATED: Please use context.DeadlineExceeded instead. ErrClientConnTimeout = errors.New("grpc: timed out when dialing") // errNoTransportSecurity indicates that there is no transport security @@ -79,6 +63,8 @@ var ( errConnClosing = errors.New("grpc: the connection is closing") // errConnUnavailable indicates that the connection is unavailable. errConnUnavailable = errors.New("grpc: the connection is unavailable") + // errBalancerClosed indicates that the balancer is closed. + errBalancerClosed = errors.New("grpc: balancer is closed") // minimum time to give a connection to complete minConnectTimeout = 20 * time.Second ) @@ -86,30 +72,54 @@ var ( // dialOptions configure a Dial call. dialOptions are set by the DialOption // values passed to Dial. type dialOptions struct { - unaryInt UnaryClientInterceptor - streamInt StreamClientInterceptor - codec Codec - cp Compressor - dc Decompressor - bs backoffStrategy - balancer Balancer - block bool - insecure bool - timeout time.Duration - scChan <-chan ServiceConfig - copts transport.ConnectOptions - maxMsgSize int + unaryInt UnaryClientInterceptor + streamInt StreamClientInterceptor + codec Codec + cp Compressor + dc Decompressor + bs backoffStrategy + balancer Balancer + block bool + insecure bool + timeout time.Duration + scChan <-chan ServiceConfig + copts transport.ConnectOptions + callOptions []CallOption } -const defaultClientMaxMsgSize = math.MaxInt32 +const ( + defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4 + defaultClientMaxSendMessageSize = 1024 * 1024 * 4 +) // DialOption configures how we set up the connection. type DialOption func(*dialOptions) -// WithMaxMsgSize returns a DialOption which sets the maximum message size the client can receive. +// WithInitialWindowSize returns a DialOption which sets the value for initial window size on a stream. +// The lower bound for window size is 64K and any value smaller than that will be ignored. +func WithInitialWindowSize(s int32) DialOption { + return func(o *dialOptions) { + o.copts.InitialWindowSize = s + } +} + +// WithInitialConnWindowSize returns a DialOption which sets the value for initial window size on a connection. +// The lower bound for window size is 64K and any value smaller than that will be ignored. +func WithInitialConnWindowSize(s int32) DialOption { + return func(o *dialOptions) { + o.copts.InitialConnWindowSize = s + } +} + +// WithMaxMsgSize returns a DialOption which sets the maximum message size the client can receive. Deprecated: use WithDefaultCallOptions(MaxCallRecvMsgSize(s)) instead. func WithMaxMsgSize(s int) DialOption { + return WithDefaultCallOptions(MaxCallRecvMsgSize(s)) +} + +// WithDefaultCallOptions returns a DialOption which sets the default CallOptions for calls over the connection. +func WithDefaultCallOptions(cos ...CallOption) DialOption { return func(o *dialOptions) { - o.maxMsgSize = s + o.callOptions = append(o.callOptions, cos...) } } @@ -204,7 +214,7 @@ func WithTransportCredentials(creds credentials.TransportCredentials) DialOption } // WithPerRPCCredentials returns a DialOption which sets -// credentials which will place auth state on each outbound RPC. +// credentials and places auth state on each outbound RPC. func WithPerRPCCredentials(creds credentials.PerRPCCredentials) DialOption { return func(o *dialOptions) { o.copts.PerRPCCredentials = append(o.copts.PerRPCCredentials, creds) @@ -213,6 +223,7 @@ func WithPerRPCCredentials(creds credentials.PerRPCCredentials) DialOption { // WithTimeout returns a DialOption that configures a timeout for dialing a ClientConn // initially. This is valid if and only if WithBlock() is present. +// Deprecated: use DialContext and context.WithTimeout instead. func WithTimeout(d time.Duration) DialOption { return func(o *dialOptions) { o.timeout = d @@ -241,7 +252,7 @@ func WithStatsHandler(h stats.Handler) DialOption { } } -// FailOnNonTempDialError returns a DialOption that specified if gRPC fails on non-temporary dial errors. +// FailOnNonTempDialError returns a DialOption that specifies if gRPC fails on non-temporary dial errors. // If f is true, and dialer returns a non-temporary error, gRPC will fail the connection to the network // address and won't try to reconnect. // The default value of FailOnNonTempDialError is false. @@ -295,17 +306,18 @@ func Dial(target string, opts ...DialOption) (*ClientConn, error) { } // DialContext creates a client connection to the given target. ctx can be used to -// cancel or expire the pending connecting. Once this function returns, the +// cancel or expire the pending connection. Once this function returns, the // cancellation and expiration of ctx will be noop. Users should call ClientConn.Close // to terminate all the pending operations after this function returns. -// This is the EXPERIMENTAL API. func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) { cc := &ClientConn{ target: target, + csMgr: &connectivityStateManager{}, conns: make(map[Address]*addrConn), } + cc.csEvltr = &connectivityStateEvaluator{csMgr: cc.csMgr} cc.ctx, cc.cancel = context.WithCancel(context.Background()) - cc.dopts.maxMsgSize = defaultClientMaxMsgSize + for _, opt := range opts { opt(&cc.dopts) } @@ -343,15 +355,16 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * } }() + scSet := false if cc.dopts.scChan != nil { - // Wait for the initial service config. + // Try to get an initial service config. select { case sc, ok := <-cc.dopts.scChan: if ok { cc.sc = sc + scSet = true } - case <-ctx.Done(): - return nil, ctx.Err() + default: } } // Set defaults. @@ -382,6 +395,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * } config := BalancerConfig{ DialCreds: credsClone, + Dialer: cc.dopts.copts.Dialer, } if err := cc.dopts.balancer.Start(target, config); err != nil { waitC <- err @@ -413,7 +427,17 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * return nil, err } } - + if cc.dopts.scChan != nil && !scSet { + // Blocking wait for the initial service config. + select { + case sc, ok := <-cc.dopts.scChan: + if ok { + cc.sc = sc + } + case <-ctx.Done(): + return nil, ctx.Err() + } + } if cc.dopts.scChan != nil { go cc.scWatcher() } @@ -421,39 +445,97 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * return cc, nil } -// ConnectivityState indicates the state of a client connection. -type ConnectivityState int +// connectivityStateEvaluator gets updated by addrConns when their +// states transition, based on which it evaluates the state of +// ClientConn. +// Note: This code will eventually sit in the balancer in the new design. +type connectivityStateEvaluator struct { + csMgr *connectivityStateManager + mu sync.Mutex + numReady uint64 // Number of addrConns in ready state. + numConnecting uint64 // Number of addrConns in connecting state. + numTransientFailure uint64 // Number of addrConns in transientFailure. +} -const ( - // Idle indicates the ClientConn is idle. - Idle ConnectivityState = iota - // Connecting indicates the ClienConn is connecting. - Connecting - // Ready indicates the ClientConn is ready for work. - Ready - // TransientFailure indicates the ClientConn has seen a failure but expects to recover. - TransientFailure - // Shutdown indicates the ClientConn has started shutting down. - Shutdown -) +// recordTransition records state change happening in every addrConn and based on +// that it evaluates what state the ClientConn is in. +// It can only transition between connectivity.Ready, connectivity.Connecting and connectivity.TransientFailure. Other states, +// Idle and connectivity.Shutdown are transitioned into by ClientConn; in the begining of the connection +// before any addrConn is created ClientConn is in idle state. In the end when ClientConn +// closes it is in connectivity.Shutdown state. +// TODO Note that in later releases, a ClientConn with no activity will be put into an Idle state. +func (cse *connectivityStateEvaluator) recordTransition(oldState, newState connectivity.State) { + cse.mu.Lock() + defer cse.mu.Unlock() + + // Update counters. + for idx, state := range []connectivity.State{oldState, newState} { + updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new. + switch state { + case connectivity.Ready: + cse.numReady += updateVal + case connectivity.Connecting: + cse.numConnecting += updateVal + case connectivity.TransientFailure: + cse.numTransientFailure += updateVal + } + } + + // Evaluate. + if cse.numReady > 0 { + cse.csMgr.updateState(connectivity.Ready) + return + } + if cse.numConnecting > 0 { + cse.csMgr.updateState(connectivity.Connecting) + return + } + cse.csMgr.updateState(connectivity.TransientFailure) +} -func (s ConnectivityState) String() string { - switch s { - case Idle: - return "IDLE" - case Connecting: - return "CONNECTING" - case Ready: - return "READY" - case TransientFailure: - return "TRANSIENT_FAILURE" - case Shutdown: - return "SHUTDOWN" - default: - panic(fmt.Sprintf("unknown connectivity state: %d", s)) +// connectivityStateManager keeps the connectivity.State of ClientConn. +// This struct will eventually be exported so the balancers can access it. +type connectivityStateManager struct { + mu sync.Mutex + state connectivity.State + notifyChan chan struct{} +} + +// updateState updates the connectivity.State of ClientConn. +// If there's a change it notifies goroutines waiting on state change to +// happen. +func (csm *connectivityStateManager) updateState(state connectivity.State) { + csm.mu.Lock() + defer csm.mu.Unlock() + if csm.state == connectivity.Shutdown { + return + } + if csm.state == state { + return + } + csm.state = state + if csm.notifyChan != nil { + // There are other goroutines waiting on this channel. + close(csm.notifyChan) + csm.notifyChan = nil } } +func (csm *connectivityStateManager) getState() connectivity.State { + csm.mu.Lock() + defer csm.mu.Unlock() + return csm.state +} + +func (csm *connectivityStateManager) getNotifyChan() <-chan struct{} { + csm.mu.Lock() + defer csm.mu.Unlock() + if csm.notifyChan == nil { + csm.notifyChan = make(chan struct{}) + } + return csm.notifyChan +} + // ClientConn represents a client connection to an RPC server. type ClientConn struct { ctx context.Context @@ -462,14 +544,38 @@ type ClientConn struct { target string authority string dopts dialOptions + csMgr *connectivityStateManager + csEvltr *connectivityStateEvaluator // This will eventually be part of balancer. mu sync.RWMutex sc ServiceConfig conns map[Address]*addrConn - // Keepalive parameter can be udated if a GoAway is received. + // Keepalive parameter can be updated if a GoAway is received. mkp keepalive.ClientParameters } +// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or +// ctx expires. A true value is returned in former case and false in latter. +// This is an EXPERIMENTAL API. +func (cc *ClientConn) WaitForStateChange(ctx context.Context, sourceState connectivity.State) bool { + ch := cc.csMgr.getNotifyChan() + if cc.csMgr.getState() != sourceState { + return true + } + select { + case <-ctx.Done(): + return false + case <-ch: + return true + } +} + +// GetState returns the connectivity.State of ClientConn. +// This is an EXPERIMENTAL API. +func (cc *ClientConn) GetState() connectivity.State { + return cc.csMgr.getState() +} + // lbWatcher watches the Notify channel of the balancer in cc and manages // connections accordingly. If doneChan is not nil, it is closed after the // first successfull connection is made. @@ -500,14 +606,18 @@ func (cc *ClientConn) lbWatcher(doneChan chan struct{}) { } cc.mu.Unlock() for _, a := range add { + var err error if doneChan != nil { - err := cc.resetAddrConn(a, true, nil) + err = cc.resetAddrConn(a, true, nil) if err == nil { close(doneChan) doneChan = nil } } else { - cc.resetAddrConn(a, false, nil) + err = cc.resetAddrConn(a, false, nil) + } + if err != nil { + grpclog.Warningf("Error creating connection to %v. Err: %v", a, err) } } for _, c := range del { @@ -537,17 +647,18 @@ func (cc *ClientConn) scWatcher() { // resetAddrConn creates an addrConn for addr and adds it to cc.conns. // If there is an old addrConn for addr, it will be torn down, using tearDownErr as the reason. // If tearDownErr is nil, errConnDrain will be used instead. +// +// We should never need to replace an addrConn with a new one. This function is only used +// as newAddrConn to create new addrConn. +// TODO rename this function and clean up the code. func (cc *ClientConn) resetAddrConn(addr Address, block bool, tearDownErr error) error { ac := &addrConn{ cc: cc, addr: addr, dopts: cc.dopts, } - cc.mu.RLock() - ac.dopts.copts.KeepaliveParams = cc.mkp - cc.mu.RUnlock() ac.ctx, ac.cancel = context.WithCancel(cc.ctx) - ac.stateCV = sync.NewCond(&ac.mu) + ac.csEvltr = cc.csEvltr if EnableTracing { ac.events = trace.NewEventLog("grpc.ClientConn", ac.addr.Addr) } @@ -576,10 +687,7 @@ func (cc *ClientConn) resetAddrConn(addr Address, block bool, tearDownErr error) cc.mu.Unlock() if stale != nil { // There is an addrConn alive on ac.addr already. This could be due to - // 1) a buggy Balancer notifies duplicated Addresses; - // 2) goaway was received, a new ac will replace the old ac. - // The old ac should be deleted from cc.conns, but the - // underlying transport should drain rather than close. + // a buggy Balancer that reports duplicated Addresses. if tearDownErr == nil { // tearDownErr is nil if resetAddrConn is called by // 1) Dial @@ -610,7 +718,7 @@ func (cc *ClientConn) resetAddrConn(addr Address, block bool, tearDownErr error) // Start a goroutine connecting to the server asynchronously. go func() { if err := ac.resetTransport(false); err != nil { - grpclog.Printf("Failed to dial %s: %v; please retry.", ac.addr.Addr, err) + grpclog.Warningf("Failed to dial %s: %v; please retry.", ac.addr.Addr, err) if err != errConnClosing { // Keep this ac in cc.conns, to get the reason it's torn down. ac.tearDown(err) @@ -623,12 +731,23 @@ func (cc *ClientConn) resetAddrConn(addr Address, block bool, tearDownErr error) return nil } -// TODO: Avoid the locking here. -func (cc *ClientConn) getMethodConfig(method string) (m MethodConfig, ok bool) { +// GetMethodConfig gets the method config of the input method. +// If there's an exact match for input method (i.e. /service/method), we return +// the corresponding MethodConfig. +// If there isn't an exact match for the input method, we look for the default config +// under the service (i.e /service/). If there is a default MethodConfig for +// the serivce, we return it. +// Otherwise, we return an empty MethodConfig. +func (cc *ClientConn) GetMethodConfig(method string) MethodConfig { + // TODO: Avoid the locking here. cc.mu.RLock() defer cc.mu.RUnlock() - m, ok = cc.sc.Methods[method] - return + m, ok := cc.sc.Methods[method] + if !ok { + i := strings.LastIndex(method, "/") + m, _ = cc.sc.Methods[method[:i+1]] + } + return m } func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions) (transport.ClientTransport, func(), error) { @@ -696,6 +815,7 @@ func (cc *ClientConn) Close() error { } conns := cc.conns cc.conns = nil + cc.csMgr.updateState(connectivity.Shutdown) cc.mu.Unlock() if cc.dopts.balancer != nil { cc.dopts.balancer.Close() @@ -716,10 +836,11 @@ type addrConn struct { dopts dialOptions events trace.EventLog - mu sync.Mutex - state ConnectivityState - stateCV *sync.Cond - down func(error) // the handler called when a connection is down. + csEvltr *connectivityStateEvaluator + + mu sync.Mutex + state connectivity.State + down func(error) // the handler called when a connection is down. // ready is closed and becomes nil when a new transport is up or failed // due to timeout. ready chan struct{} @@ -759,62 +880,41 @@ func (ac *addrConn) errorf(format string, a ...interface{}) { } } -// getState returns the connectivity state of the Conn -func (ac *addrConn) getState() ConnectivityState { - ac.mu.Lock() - defer ac.mu.Unlock() - return ac.state -} - -// waitForStateChange blocks until the state changes to something other than the sourceState. -func (ac *addrConn) waitForStateChange(ctx context.Context, sourceState ConnectivityState) (ConnectivityState, error) { +// resetTransport recreates a transport to the address for ac. +// For the old transport: +// - if drain is true, it will be gracefully closed. +// - otherwise, it will be closed. +func (ac *addrConn) resetTransport(drain bool) error { ac.mu.Lock() - defer ac.mu.Unlock() - if sourceState != ac.state { - return ac.state, nil + if ac.state == connectivity.Shutdown { + ac.mu.Unlock() + return errConnClosing } - done := make(chan struct{}) - var err error - go func() { - select { - case <-ctx.Done(): - ac.mu.Lock() - err = ctx.Err() - ac.stateCV.Broadcast() - ac.mu.Unlock() - case <-done: - } - }() - defer close(done) - for sourceState == ac.state { - ac.stateCV.Wait() - if err != nil { - return ac.state, err - } + ac.printf("connecting") + if ac.down != nil { + ac.down(downErrorf(false, true, "%v", errNetworkIO)) + ac.down = nil } - return ac.state, nil -} - -func (ac *addrConn) resetTransport(closeTransport bool) error { + oldState := ac.state + ac.state = connectivity.Connecting + ac.csEvltr.recordTransition(oldState, ac.state) + t := ac.transport + ac.transport = nil + ac.mu.Unlock() + if t != nil && !drain { + t.Close() + } + ac.cc.mu.RLock() + ac.dopts.copts.KeepaliveParams = ac.cc.mkp + ac.cc.mu.RUnlock() for retries := 0; ; retries++ { ac.mu.Lock() - ac.printf("connecting") - if ac.state == Shutdown { + if ac.state == connectivity.Shutdown { // ac.tearDown(...) has been invoked. ac.mu.Unlock() return errConnClosing } - if ac.down != nil { - ac.down(downErrorf(false, true, "%v", errNetworkIO)) - ac.down = nil - } - ac.state = Connecting - ac.stateCV.Broadcast() - t := ac.transport ac.mu.Unlock() - if closeTransport && t != nil { - t.Close() - } sleepTime := ac.dopts.bs.backoff(retries) timeout := minConnectTimeout if timeout < sleepTime { @@ -835,39 +935,43 @@ func (ac *addrConn) resetTransport(closeTransport bool) error { if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() { return err } - grpclog.Printf("grpc: addrConn.resetTransport failed to create client transport: %v; Reconnecting to %v", err, ac.addr) + grpclog.Warningf("grpc: addrConn.resetTransport failed to create client transport: %v; Reconnecting to %v", err, ac.addr) ac.mu.Lock() - if ac.state == Shutdown { + if ac.state == connectivity.Shutdown { // ac.tearDown(...) has been invoked. ac.mu.Unlock() return errConnClosing } ac.errorf("transient failure: %v", err) - ac.state = TransientFailure - ac.stateCV.Broadcast() + oldState = ac.state + ac.state = connectivity.TransientFailure + ac.csEvltr.recordTransition(oldState, ac.state) if ac.ready != nil { close(ac.ready) ac.ready = nil } ac.mu.Unlock() - closeTransport = false + timer := time.NewTimer(sleepTime - time.Since(connectTime)) select { - case <-time.After(sleepTime - time.Since(connectTime)): + case <-timer.C: case <-ac.ctx.Done(): + timer.Stop() return ac.ctx.Err() } + timer.Stop() continue } ac.mu.Lock() ac.printf("ready") - if ac.state == Shutdown { + if ac.state == connectivity.Shutdown { // ac.tearDown(...) has been invoked. ac.mu.Unlock() newTransport.Close() return errConnClosing } - ac.state = Ready - ac.stateCV.Broadcast() + oldState = ac.state + ac.state = connectivity.Ready + ac.csEvltr.recordTransition(oldState, ac.state) ac.transport = newTransport if ac.ready != nil { close(ac.ready) @@ -900,19 +1004,25 @@ func (ac *addrConn) transportMonitor() { return case <-t.GoAway(): ac.adjustParams(t.GetGoAwayReason()) - // If GoAway happens without any network I/O error, ac is closed without shutting down the - // underlying transport (the transport will be closed when all the pending RPCs finished or - // failed.). - // If GoAway and some network I/O error happen concurrently, ac and its underlying transport - // are closed. - // In both cases, a new ac is created. + // If GoAway happens without any network I/O error, the underlying transport + // will be gracefully closed, and a new transport will be created. + // (The transport will be closed when all the pending RPCs finished or failed.) + // If GoAway and some network I/O error happen concurrently, the underlying transport + // will be closed, and a new transport will be created. + var drain bool select { case <-t.Error(): - ac.cc.resetAddrConn(ac.addr, false, errNetworkIO) default: - ac.cc.resetAddrConn(ac.addr, false, errConnDrain) + drain = true + } + if err := ac.resetTransport(drain); err != nil { + grpclog.Infof("get error from resetTransport %v, transportMonitor returning", err) + if err != errConnClosing { + // Keep this ac in cc.conns, to get the reason it's torn down. + ac.tearDown(err) + } + return } - return case <-t.Error(): select { case <-ac.ctx.Done(): @@ -920,24 +1030,32 @@ func (ac *addrConn) transportMonitor() { return case <-t.GoAway(): ac.adjustParams(t.GetGoAwayReason()) - ac.cc.resetAddrConn(ac.addr, false, errNetworkIO) - return + if err := ac.resetTransport(false); err != nil { + grpclog.Infof("get error from resetTransport %v, transportMonitor returning", err) + if err != errConnClosing { + // Keep this ac in cc.conns, to get the reason it's torn down. + ac.tearDown(err) + } + return + } default: } ac.mu.Lock() - if ac.state == Shutdown { + if ac.state == connectivity.Shutdown { // ac has been shutdown. ac.mu.Unlock() return } - ac.state = TransientFailure - ac.stateCV.Broadcast() + oldState := ac.state + ac.state = connectivity.TransientFailure + ac.csEvltr.recordTransition(oldState, ac.state) ac.mu.Unlock() - if err := ac.resetTransport(true); err != nil { + if err := ac.resetTransport(false); err != nil { + grpclog.Infof("get error from resetTransport %v, transportMonitor returning", err) ac.mu.Lock() ac.printf("transport exiting: %v", err) ac.mu.Unlock() - grpclog.Printf("grpc: addrConn.transportMonitor exits due to: %v", err) + grpclog.Warningf("grpc: addrConn.transportMonitor exits due to: %v", err) if err != errConnClosing { // Keep this ac in cc.conns, to get the reason it's torn down. ac.tearDown(err) @@ -949,12 +1067,12 @@ func (ac *addrConn) transportMonitor() { } // wait blocks until i) the new transport is up or ii) ctx is done or iii) ac is closed or -// iv) transport is in TransientFailure and there is a balancer/failfast is true. +// iv) transport is in connectivity.TransientFailure and there is a balancer/failfast is true. func (ac *addrConn) wait(ctx context.Context, hasBalancer, failfast bool) (transport.ClientTransport, error) { for { ac.mu.Lock() switch { - case ac.state == Shutdown: + case ac.state == connectivity.Shutdown: if failfast || !hasBalancer { // RPC is failfast or balancer is nil. This RPC should fail with ac.tearDownErr. err := ac.tearDownErr @@ -963,11 +1081,11 @@ func (ac *addrConn) wait(ctx context.Context, hasBalancer, failfast bool) (trans } ac.mu.Unlock() return nil, errConnClosing - case ac.state == Ready: + case ac.state == connectivity.Ready: ct := ac.transport ac.mu.Unlock() return ct, nil - case ac.state == TransientFailure: + case ac.state == connectivity.TransientFailure: if failfast || hasBalancer { ac.mu.Unlock() return nil, errConnUnavailable @@ -1009,12 +1127,13 @@ func (ac *addrConn) tearDown(err error) { // address removal and GoAway. ac.transport.GracefulClose() } - if ac.state == Shutdown { + if ac.state == connectivity.Shutdown { return } - ac.state = Shutdown + oldState := ac.state + ac.state = connectivity.Shutdown ac.tearDownErr = err - ac.stateCV.Broadcast() + ac.csEvltr.recordTransition(oldState, ac.state) if ac.events != nil { ac.events.Finish() ac.events = nil |