aboutsummaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/clientconn.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/clientconn.go')
-rw-r--r--vendor/google.golang.org/grpc/clientconn.go743
1 files changed, 368 insertions, 375 deletions
diff --git a/vendor/google.golang.org/grpc/clientconn.go b/vendor/google.golang.org/grpc/clientconn.go
index e3e3140f1..886bead9d 100644
--- a/vendor/google.golang.org/grpc/clientconn.go
+++ b/vendor/google.golang.org/grpc/clientconn.go
@@ -20,17 +20,22 @@ package grpc
import (
"errors"
+ "fmt"
+ "math"
"net"
+ "reflect"
"strings"
"sync"
"time"
"golang.org/x/net/context"
"golang.org/x/net/trace"
+ "google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/keepalive"
+ "google.golang.org/grpc/resolver"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/transport"
)
@@ -78,23 +83,40 @@ type dialOptions struct {
cp Compressor
dc Decompressor
bs backoffStrategy
- balancer Balancer
block bool
insecure bool
timeout time.Duration
scChan <-chan ServiceConfig
copts transport.ConnectOptions
callOptions []CallOption
+ // This is to support v1 balancer.
+ balancerBuilder balancer.Builder
}
const (
defaultClientMaxReceiveMessageSize = 1024 * 1024 * 4
- defaultClientMaxSendMessageSize = 1024 * 1024 * 4
+ defaultClientMaxSendMessageSize = math.MaxInt32
)
// DialOption configures how we set up the connection.
type DialOption func(*dialOptions)
+// WithWriteBufferSize lets you set the size of write buffer, this determines how much data can be batched
+// before doing a write on the wire.
+func WithWriteBufferSize(s int) DialOption {
+ return func(o *dialOptions) {
+ o.copts.WriteBufferSize = s
+ }
+}
+
+// WithReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most
+// for each read syscall.
+func WithReadBufferSize(s int) DialOption {
+ return func(o *dialOptions) {
+ o.copts.ReadBufferSize = s
+ }
+}
+
// WithInitialWindowSize returns a DialOption which sets the value for initial window size on a stream.
// The lower bound for window size is 64K and any value smaller than that will be ignored.
func WithInitialWindowSize(s int32) DialOption {
@@ -146,10 +168,23 @@ func WithDecompressor(dc Decompressor) DialOption {
}
}
-// WithBalancer returns a DialOption which sets a load balancer.
+// WithBalancer returns a DialOption which sets a load balancer with the v1 API.
+// Name resolver will be ignored if this DialOption is specified.
+// Deprecated: use the new balancer APIs in balancer package instead.
func WithBalancer(b Balancer) DialOption {
return func(o *dialOptions) {
- o.balancer = b
+ o.balancerBuilder = &balancerWrapperBuilder{
+ b: b,
+ }
+ }
+}
+
+// WithBalancerBuilder is for testing only. Users using custom balancers should
+// register their balancer and use service config to choose the balancer to use.
+func WithBalancerBuilder(b balancer.Builder) DialOption {
+ // TODO(bar) remove this when switching balancer is done.
+ return func(o *dialOptions) {
+ o.balancerBuilder = b
}
}
@@ -270,7 +305,7 @@ func WithUserAgent(s string) DialOption {
}
}
-// WithKeepaliveParams returns a DialOption that specifies keepalive paramaters for the client transport.
+// WithKeepaliveParams returns a DialOption that specifies keepalive parameters for the client transport.
func WithKeepaliveParams(kp keepalive.ClientParameters) DialOption {
return func(o *dialOptions) {
o.copts.KeepaliveParams = kp
@@ -313,20 +348,37 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
cc := &ClientConn{
target: target,
csMgr: &connectivityStateManager{},
- conns: make(map[Address]*addrConn),
+ conns: make(map[*addrConn]struct{}),
+
+ blockingpicker: newPickerWrapper(),
}
- cc.csEvltr = &connectivityStateEvaluator{csMgr: cc.csMgr}
cc.ctx, cc.cancel = context.WithCancel(context.Background())
for _, opt := range opts {
opt(&cc.dopts)
}
+
+ if !cc.dopts.insecure {
+ if cc.dopts.copts.TransportCredentials == nil {
+ return nil, errNoTransportSecurity
+ }
+ } else {
+ if cc.dopts.copts.TransportCredentials != nil {
+ return nil, errCredentialsConflict
+ }
+ for _, cd := range cc.dopts.copts.PerRPCCredentials {
+ if cd.RequireTransportSecurity() {
+ return nil, errTransportCredentialsMissing
+ }
+ }
+ }
+
cc.mkp = cc.dopts.copts.KeepaliveParams
if cc.dopts.copts.Dialer == nil {
cc.dopts.copts.Dialer = newProxyDialer(
func(ctx context.Context, addr string) (net.Conn, error) {
- return dialContext(ctx, "tcp", addr)
+ return (&net.Dialer{}).DialContext(ctx, "tcp", addr)
},
)
}
@@ -382,49 +434,41 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
} else {
cc.authority = target
}
- waitC := make(chan error, 1)
- go func() {
- defer close(waitC)
- if cc.dopts.balancer == nil && cc.sc.LB != nil {
- cc.dopts.balancer = cc.sc.LB
+
+ if cc.dopts.balancerBuilder != nil {
+ var credsClone credentials.TransportCredentials
+ if creds != nil {
+ credsClone = creds.Clone()
}
- if cc.dopts.balancer != nil {
- var credsClone credentials.TransportCredentials
- if creds != nil {
- credsClone = creds.Clone()
- }
- config := BalancerConfig{
- DialCreds: credsClone,
- Dialer: cc.dopts.copts.Dialer,
- }
- if err := cc.dopts.balancer.Start(target, config); err != nil {
+ buildOpts := balancer.BuildOptions{
+ DialCreds: credsClone,
+ Dialer: cc.dopts.copts.Dialer,
+ }
+ // Build should not take long time. So it's ok to not have a goroutine for it.
+ // TODO(bar) init balancer after first resolver result to support service config balancer.
+ cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, buildOpts)
+ } else {
+ waitC := make(chan error, 1)
+ go func() {
+ defer close(waitC)
+ // No balancer, or no resolver within the balancer. Connect directly.
+ ac, err := cc.newAddrConn([]resolver.Address{{Addr: target}})
+ if err != nil {
waitC <- err
return
}
- ch := cc.dopts.balancer.Notify()
- if ch != nil {
- if cc.dopts.block {
- doneChan := make(chan struct{})
- go cc.lbWatcher(doneChan)
- <-doneChan
- } else {
- go cc.lbWatcher(nil)
- }
+ if err := ac.connect(cc.dopts.block); err != nil {
+ waitC <- err
return
}
- }
- // No balancer, or no resolver within the balancer. Connect directly.
- if err := cc.resetAddrConn(Address{Addr: target}, cc.dopts.block, nil); err != nil {
- waitC <- err
- return
- }
- }()
- select {
- case <-ctx.Done():
- return nil, ctx.Err()
- case err := <-waitC:
- if err != nil {
- return nil, err
+ }()
+ select {
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ case err := <-waitC:
+ if err != nil {
+ return nil, err
+ }
}
}
if cc.dopts.scChan != nil && !scSet {
@@ -442,55 +486,35 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
go cc.scWatcher()
}
- return cc, nil
-}
+ // Build the resolver.
+ cc.resolverWrapper, err = newCCResolverWrapper(cc)
+ if err != nil {
+ return nil, fmt.Errorf("failed to build resolver: %v", err)
+ }
-// connectivityStateEvaluator gets updated by addrConns when their
-// states transition, based on which it evaluates the state of
-// ClientConn.
-// Note: This code will eventually sit in the balancer in the new design.
-type connectivityStateEvaluator struct {
- csMgr *connectivityStateManager
- mu sync.Mutex
- numReady uint64 // Number of addrConns in ready state.
- numConnecting uint64 // Number of addrConns in connecting state.
- numTransientFailure uint64 // Number of addrConns in transientFailure.
-}
+ if cc.balancerWrapper != nil && cc.resolverWrapper == nil {
+ // TODO(bar) there should always be a resolver (DNS as the default).
+ // Unblock balancer initialization with a fake resolver update if there's no resolver.
+ // The balancer wrapper will not read the addresses, so an empty list works.
+ // TODO(bar) remove this after the real resolver is started.
+ cc.balancerWrapper.handleResolvedAddrs([]resolver.Address{}, nil)
+ }
-// recordTransition records state change happening in every addrConn and based on
-// that it evaluates what state the ClientConn is in.
-// It can only transition between connectivity.Ready, connectivity.Connecting and connectivity.TransientFailure. Other states,
-// Idle and connectivity.Shutdown are transitioned into by ClientConn; in the begining of the connection
-// before any addrConn is created ClientConn is in idle state. In the end when ClientConn
-// closes it is in connectivity.Shutdown state.
-// TODO Note that in later releases, a ClientConn with no activity will be put into an Idle state.
-func (cse *connectivityStateEvaluator) recordTransition(oldState, newState connectivity.State) {
- cse.mu.Lock()
- defer cse.mu.Unlock()
-
- // Update counters.
- for idx, state := range []connectivity.State{oldState, newState} {
- updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new.
- switch state {
- case connectivity.Ready:
- cse.numReady += updateVal
- case connectivity.Connecting:
- cse.numConnecting += updateVal
- case connectivity.TransientFailure:
- cse.numTransientFailure += updateVal
+ // A blocking dial blocks until the clientConn is ready.
+ if cc.dopts.block {
+ for {
+ s := cc.GetState()
+ if s == connectivity.Ready {
+ break
+ }
+ if !cc.WaitForStateChange(ctx, s) {
+ // ctx got timeout or canceled.
+ return nil, ctx.Err()
+ }
}
}
- // Evaluate.
- if cse.numReady > 0 {
- cse.csMgr.updateState(connectivity.Ready)
- return
- }
- if cse.numConnecting > 0 {
- cse.csMgr.updateState(connectivity.Connecting)
- return
- }
- cse.csMgr.updateState(connectivity.TransientFailure)
+ return cc, nil
}
// connectivityStateManager keeps the connectivity.State of ClientConn.
@@ -545,11 +569,15 @@ type ClientConn struct {
authority string
dopts dialOptions
csMgr *connectivityStateManager
- csEvltr *connectivityStateEvaluator // This will eventually be part of balancer.
+
+ balancerWrapper *ccBalancerWrapper
+ resolverWrapper *ccResolverWrapper
+
+ blockingpicker *pickerWrapper
mu sync.RWMutex
sc ServiceConfig
- conns map[Address]*addrConn
+ conns map[*addrConn]struct{}
// Keepalive parameter can be updated if a GoAway is received.
mkp keepalive.ClientParameters
}
@@ -576,56 +604,6 @@ func (cc *ClientConn) GetState() connectivity.State {
return cc.csMgr.getState()
}
-// lbWatcher watches the Notify channel of the balancer in cc and manages
-// connections accordingly. If doneChan is not nil, it is closed after the
-// first successfull connection is made.
-func (cc *ClientConn) lbWatcher(doneChan chan struct{}) {
- for addrs := range cc.dopts.balancer.Notify() {
- var (
- add []Address // Addresses need to setup connections.
- del []*addrConn // Connections need to tear down.
- )
- cc.mu.Lock()
- for _, a := range addrs {
- if _, ok := cc.conns[a]; !ok {
- add = append(add, a)
- }
- }
- for k, c := range cc.conns {
- var keep bool
- for _, a := range addrs {
- if k == a {
- keep = true
- break
- }
- }
- if !keep {
- del = append(del, c)
- delete(cc.conns, c.addr)
- }
- }
- cc.mu.Unlock()
- for _, a := range add {
- var err error
- if doneChan != nil {
- err = cc.resetAddrConn(a, true, nil)
- if err == nil {
- close(doneChan)
- doneChan = nil
- }
- } else {
- err = cc.resetAddrConn(a, false, nil)
- }
- if err != nil {
- grpclog.Warningf("Error creating connection to %v. Err: %v", a, err)
- }
- }
- for _, c := range del {
- c.tearDown(errConnDrain)
- }
- }
-}
-
func (cc *ClientConn) scWatcher() {
for {
select {
@@ -644,67 +622,64 @@ func (cc *ClientConn) scWatcher() {
}
}
-// resetAddrConn creates an addrConn for addr and adds it to cc.conns.
-// If there is an old addrConn for addr, it will be torn down, using tearDownErr as the reason.
-// If tearDownErr is nil, errConnDrain will be used instead.
-//
-// We should never need to replace an addrConn with a new one. This function is only used
-// as newAddrConn to create new addrConn.
-// TODO rename this function and clean up the code.
-func (cc *ClientConn) resetAddrConn(addr Address, block bool, tearDownErr error) error {
+// newAddrConn creates an addrConn for addrs and adds it to cc.conns.
+func (cc *ClientConn) newAddrConn(addrs []resolver.Address) (*addrConn, error) {
ac := &addrConn{
cc: cc,
- addr: addr,
+ addrs: addrs,
dopts: cc.dopts,
}
ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
- ac.csEvltr = cc.csEvltr
- if EnableTracing {
- ac.events = trace.NewEventLog("grpc.ClientConn", ac.addr.Addr)
- }
- if !ac.dopts.insecure {
- if ac.dopts.copts.TransportCredentials == nil {
- return errNoTransportSecurity
- }
- } else {
- if ac.dopts.copts.TransportCredentials != nil {
- return errCredentialsConflict
- }
- for _, cd := range ac.dopts.copts.PerRPCCredentials {
- if cd.RequireTransportSecurity() {
- return errTransportCredentialsMissing
- }
- }
- }
// Track ac in cc. This needs to be done before any getTransport(...) is called.
cc.mu.Lock()
if cc.conns == nil {
cc.mu.Unlock()
- return ErrClientConnClosing
+ return nil, ErrClientConnClosing
}
- stale := cc.conns[ac.addr]
- cc.conns[ac.addr] = ac
+ cc.conns[ac] = struct{}{}
cc.mu.Unlock()
- if stale != nil {
- // There is an addrConn alive on ac.addr already. This could be due to
- // a buggy Balancer that reports duplicated Addresses.
- if tearDownErr == nil {
- // tearDownErr is nil if resetAddrConn is called by
- // 1) Dial
- // 2) lbWatcher
- // In both cases, the stale ac should drain, not close.
- stale.tearDown(errConnDrain)
- } else {
- stale.tearDown(tearDownErr)
- }
+ return ac, nil
+}
+
+// removeAddrConn removes the addrConn in the subConn from clientConn.
+// It also tears down the ac with the given error.
+func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) {
+ cc.mu.Lock()
+ if cc.conns == nil {
+ cc.mu.Unlock()
+ return
+ }
+ delete(cc.conns, ac)
+ cc.mu.Unlock()
+ ac.tearDown(err)
+}
+
+// connect starts to creating transport and also starts the transport monitor
+// goroutine for this ac.
+// It does nothing if the ac is not IDLE.
+// TODO(bar) Move this to the addrConn section.
+// This was part of resetAddrConn, keep it here to make the diff look clean.
+func (ac *addrConn) connect(block bool) error {
+ ac.mu.Lock()
+ if ac.state == connectivity.Shutdown {
+ ac.mu.Unlock()
+ return errConnClosing
+ }
+ if ac.state != connectivity.Idle {
+ ac.mu.Unlock()
+ return nil
+ }
+ ac.state = connectivity.Connecting
+ if ac.cc.balancerWrapper != nil {
+ ac.cc.balancerWrapper.handleSubConnStateChange(ac.acbw, ac.state)
+ } else {
+ ac.cc.csMgr.updateState(ac.state)
}
+ ac.mu.Unlock()
+
if block {
- if err := ac.resetTransport(false); err != nil {
+ if err := ac.resetTransport(); err != nil {
if err != errConnClosing {
- // Tear down ac and delete it from cc.conns.
- cc.mu.Lock()
- delete(cc.conns, ac.addr)
- cc.mu.Unlock()
ac.tearDown(err)
}
if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() {
@@ -717,8 +692,8 @@ func (cc *ClientConn) resetAddrConn(addr Address, block bool, tearDownErr error)
} else {
// Start a goroutine connecting to the server asynchronously.
go func() {
- if err := ac.resetTransport(false); err != nil {
- grpclog.Warningf("Failed to dial %s: %v; please retry.", ac.addr.Addr, err)
+ if err := ac.resetTransport(); err != nil {
+ grpclog.Warningf("Failed to dial %s: %v; please retry.", ac.addrs[0].Addr, err)
if err != errConnClosing {
// Keep this ac in cc.conns, to get the reason it's torn down.
ac.tearDown(err)
@@ -731,6 +706,36 @@ func (cc *ClientConn) resetAddrConn(addr Address, block bool, tearDownErr error)
return nil
}
+// tryUpdateAddrs tries to update ac.addrs with the new addresses list.
+//
+// It checks whether current connected address of ac is in the new addrs list.
+// - If true, it updates ac.addrs and returns true. The ac will keep using
+// the existing connection.
+// - If false, it does nothing and returns false.
+func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
+ ac.mu.Lock()
+ defer ac.mu.Unlock()
+ grpclog.Infof("addrConn: tryUpdateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs)
+ if ac.state == connectivity.Shutdown {
+ ac.addrs = addrs
+ return true
+ }
+
+ var curAddrFound bool
+ for _, a := range addrs {
+ if reflect.DeepEqual(ac.curAddr, a) {
+ curAddrFound = true
+ break
+ }
+ }
+ grpclog.Infof("addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound)
+ if curAddrFound {
+ ac.addrs = addrs
+ }
+
+ return curAddrFound
+}
+
// GetMethodConfig gets the method config of the input method.
// If there's an exact match for input method (i.e. /service/method), we return
// the corresponding MethodConfig.
@@ -750,58 +755,37 @@ func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
return m
}
-func (cc *ClientConn) getTransport(ctx context.Context, opts BalancerGetOptions) (transport.ClientTransport, func(), error) {
- var (
- ac *addrConn
- ok bool
- put func()
- )
- if cc.dopts.balancer == nil {
+func (cc *ClientConn) getTransport(ctx context.Context, failfast bool) (transport.ClientTransport, func(balancer.DoneInfo), error) {
+ if cc.balancerWrapper == nil {
// If balancer is nil, there should be only one addrConn available.
cc.mu.RLock()
if cc.conns == nil {
cc.mu.RUnlock()
+ // TODO this function returns toRPCErr and non-toRPCErr. Clean up
+ // the errors in ClientConn.
return nil, nil, toRPCErr(ErrClientConnClosing)
}
- for _, ac = range cc.conns {
+ var ac *addrConn
+ for ac = range cc.conns {
// Break after the first iteration to get the first addrConn.
- ok = true
break
}
cc.mu.RUnlock()
- } else {
- var (
- addr Address
- err error
- )
- addr, put, err = cc.dopts.balancer.Get(ctx, opts)
- if err != nil {
- return nil, nil, toRPCErr(err)
- }
- cc.mu.RLock()
- if cc.conns == nil {
- cc.mu.RUnlock()
- return nil, nil, toRPCErr(ErrClientConnClosing)
+ if ac == nil {
+ return nil, nil, errConnClosing
}
- ac, ok = cc.conns[addr]
- cc.mu.RUnlock()
- }
- if !ok {
- if put != nil {
- updateRPCInfoInContext(ctx, rpcInfo{bytesSent: false, bytesReceived: false})
- put()
+ t, err := ac.wait(ctx, false /*hasBalancer*/, failfast)
+ if err != nil {
+ return nil, nil, err
}
- return nil, nil, errConnClosing
+ return t, nil, nil
}
- t, err := ac.wait(ctx, cc.dopts.balancer != nil, !opts.BlockingWait)
+
+ t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickOptions{})
if err != nil {
- if put != nil {
- updateRPCInfoInContext(ctx, rpcInfo{bytesSent: false, bytesReceived: false})
- put()
- }
- return nil, nil, err
+ return nil, nil, toRPCErr(err)
}
- return t, put, nil
+ return t, done, nil
}
// Close tears down the ClientConn and all underlying connections.
@@ -817,10 +801,14 @@ func (cc *ClientConn) Close() error {
cc.conns = nil
cc.csMgr.updateState(connectivity.Shutdown)
cc.mu.Unlock()
- if cc.dopts.balancer != nil {
- cc.dopts.balancer.Close()
+ cc.blockingpicker.close()
+ if cc.resolverWrapper != nil {
+ cc.resolverWrapper.close()
+ }
+ if cc.balancerWrapper != nil {
+ cc.balancerWrapper.close()
}
- for _, ac := range conns {
+ for ac := range conns {
ac.tearDown(ErrClientConnClosing)
}
return nil
@@ -831,16 +819,15 @@ type addrConn struct {
ctx context.Context
cancel context.CancelFunc
- cc *ClientConn
- addr Address
- dopts dialOptions
- events trace.EventLog
-
- csEvltr *connectivityStateEvaluator
+ cc *ClientConn
+ curAddr resolver.Address
+ addrs []resolver.Address
+ dopts dialOptions
+ events trace.EventLog
+ acbw balancer.SubConn
mu sync.Mutex
state connectivity.State
- down func(error) // the handler called when a connection is down.
// ready is closed and becomes nil when a new transport is up or failed
// due to timeout.
ready chan struct{}
@@ -880,108 +867,127 @@ func (ac *addrConn) errorf(format string, a ...interface{}) {
}
}
-// resetTransport recreates a transport to the address for ac.
-// For the old transport:
-// - if drain is true, it will be gracefully closed.
-// - otherwise, it will be closed.
-func (ac *addrConn) resetTransport(drain bool) error {
+// resetTransport recreates a transport to the address for ac. The old
+// transport will close itself on error or when the clientconn is closed.
+//
+// TODO(bar) make sure all state transitions are valid.
+func (ac *addrConn) resetTransport() error {
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
return errConnClosing
}
- ac.printf("connecting")
- if ac.down != nil {
- ac.down(downErrorf(false, true, "%v", errNetworkIO))
- ac.down = nil
+ if ac.ready != nil {
+ close(ac.ready)
+ ac.ready = nil
}
- oldState := ac.state
- ac.state = connectivity.Connecting
- ac.csEvltr.recordTransition(oldState, ac.state)
- t := ac.transport
ac.transport = nil
+ ac.curAddr = resolver.Address{}
ac.mu.Unlock()
- if t != nil && !drain {
- t.Close()
- }
ac.cc.mu.RLock()
ac.dopts.copts.KeepaliveParams = ac.cc.mkp
ac.cc.mu.RUnlock()
for retries := 0; ; retries++ {
+ sleepTime := ac.dopts.bs.backoff(retries)
+ timeout := minConnectTimeout
ac.mu.Lock()
+ if timeout < time.Duration(int(sleepTime)/len(ac.addrs)) {
+ timeout = time.Duration(int(sleepTime) / len(ac.addrs))
+ }
+ connectTime := time.Now()
if ac.state == connectivity.Shutdown {
- // ac.tearDown(...) has been invoked.
ac.mu.Unlock()
return errConnClosing
}
- ac.mu.Unlock()
- sleepTime := ac.dopts.bs.backoff(retries)
- timeout := minConnectTimeout
- if timeout < sleepTime {
- timeout = sleepTime
- }
- ctx, cancel := context.WithTimeout(ac.ctx, timeout)
- connectTime := time.Now()
- sinfo := transport.TargetInfo{
- Addr: ac.addr.Addr,
- Metadata: ac.addr.Metadata,
+ ac.printf("connecting")
+ if ac.state != connectivity.Connecting {
+ ac.state = connectivity.Connecting
+ // TODO(bar) remove condition once we always have a balancer.
+ if ac.cc.balancerWrapper != nil {
+ ac.cc.balancerWrapper.handleSubConnStateChange(ac.acbw, ac.state)
+ } else {
+ ac.cc.csMgr.updateState(ac.state)
+ }
}
- newTransport, err := transport.NewClientTransport(ctx, sinfo, ac.dopts.copts)
- // Don't call cancel in success path due to a race in Go 1.6:
- // https://github.com/golang/go/issues/15078.
- if err != nil {
- cancel()
-
- if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() {
- return err
+ // copy ac.addrs in case of race
+ addrsIter := make([]resolver.Address, len(ac.addrs))
+ copy(addrsIter, ac.addrs)
+ copts := ac.dopts.copts
+ ac.mu.Unlock()
+ for _, addr := range addrsIter {
+ ac.mu.Lock()
+ if ac.state == connectivity.Shutdown {
+ // ac.tearDown(...) has been invoked.
+ ac.mu.Unlock()
+ return errConnClosing
+ }
+ ac.mu.Unlock()
+ sinfo := transport.TargetInfo{
+ Addr: addr.Addr,
+ Metadata: addr.Metadata,
+ }
+ newTransport, err := transport.NewClientTransport(ac.cc.ctx, sinfo, copts, timeout)
+ if err != nil {
+ if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() {
+ return err
+ }
+ grpclog.Warningf("grpc: addrConn.resetTransport failed to create client transport: %v; Reconnecting to %v", err, addr)
+ ac.mu.Lock()
+ if ac.state == connectivity.Shutdown {
+ // ac.tearDown(...) has been invoked.
+ ac.mu.Unlock()
+ return errConnClosing
+ }
+ ac.mu.Unlock()
+ continue
}
- grpclog.Warningf("grpc: addrConn.resetTransport failed to create client transport: %v; Reconnecting to %v", err, ac.addr)
ac.mu.Lock()
+ ac.printf("ready")
if ac.state == connectivity.Shutdown {
// ac.tearDown(...) has been invoked.
ac.mu.Unlock()
+ newTransport.Close()
return errConnClosing
}
- ac.errorf("transient failure: %v", err)
- oldState = ac.state
- ac.state = connectivity.TransientFailure
- ac.csEvltr.recordTransition(oldState, ac.state)
+ ac.state = connectivity.Ready
+ if ac.cc.balancerWrapper != nil {
+ ac.cc.balancerWrapper.handleSubConnStateChange(ac.acbw, ac.state)
+ } else {
+ ac.cc.csMgr.updateState(ac.state)
+ }
+ t := ac.transport
+ ac.transport = newTransport
+ if t != nil {
+ t.Close()
+ }
+ ac.curAddr = addr
if ac.ready != nil {
close(ac.ready)
ac.ready = nil
}
ac.mu.Unlock()
- timer := time.NewTimer(sleepTime - time.Since(connectTime))
- select {
- case <-timer.C:
- case <-ac.ctx.Done():
- timer.Stop()
- return ac.ctx.Err()
- }
- timer.Stop()
- continue
+ return nil
}
ac.mu.Lock()
- ac.printf("ready")
- if ac.state == connectivity.Shutdown {
- // ac.tearDown(...) has been invoked.
- ac.mu.Unlock()
- newTransport.Close()
- return errConnClosing
+ ac.state = connectivity.TransientFailure
+ if ac.cc.balancerWrapper != nil {
+ ac.cc.balancerWrapper.handleSubConnStateChange(ac.acbw, ac.state)
+ } else {
+ ac.cc.csMgr.updateState(ac.state)
}
- oldState = ac.state
- ac.state = connectivity.Ready
- ac.csEvltr.recordTransition(oldState, ac.state)
- ac.transport = newTransport
if ac.ready != nil {
close(ac.ready)
ac.ready = nil
}
- if ac.cc.dopts.balancer != nil {
- ac.down = ac.cc.dopts.balancer.Up(ac.addr)
- }
ac.mu.Unlock()
- return nil
+ timer := time.NewTimer(sleepTime - time.Since(connectTime))
+ select {
+ case <-timer.C:
+ case <-ac.ctx.Done():
+ timer.Stop()
+ return ac.ctx.Err()
+ }
+ timer.Stop()
}
}
@@ -992,76 +998,39 @@ func (ac *addrConn) transportMonitor() {
ac.mu.Lock()
t := ac.transport
ac.mu.Unlock()
+ // Block until we receive a goaway or an error occurs.
select {
- // This is needed to detect the teardown when
- // the addrConn is idle (i.e., no RPC in flight).
- case <-ac.ctx.Done():
- select {
- case <-t.Error():
- t.Close()
- default:
- }
- return
case <-t.GoAway():
- ac.adjustParams(t.GetGoAwayReason())
- // If GoAway happens without any network I/O error, the underlying transport
- // will be gracefully closed, and a new transport will be created.
- // (The transport will be closed when all the pending RPCs finished or failed.)
- // If GoAway and some network I/O error happen concurrently, the underlying transport
- // will be closed, and a new transport will be created.
- var drain bool
- select {
- case <-t.Error():
- default:
- drain = true
- }
- if err := ac.resetTransport(drain); err != nil {
- grpclog.Infof("get error from resetTransport %v, transportMonitor returning", err)
- if err != errConnClosing {
- // Keep this ac in cc.conns, to get the reason it's torn down.
- ac.tearDown(err)
- }
- return
- }
case <-t.Error():
- select {
- case <-ac.ctx.Done():
- t.Close()
- return
- case <-t.GoAway():
- ac.adjustParams(t.GetGoAwayReason())
- if err := ac.resetTransport(false); err != nil {
- grpclog.Infof("get error from resetTransport %v, transportMonitor returning", err)
- if err != errConnClosing {
- // Keep this ac in cc.conns, to get the reason it's torn down.
- ac.tearDown(err)
- }
- return
- }
- default:
- }
+ }
+ // If a GoAway happened, regardless of error, adjust our keepalive
+ // parameters as appropriate.
+ select {
+ case <-t.GoAway():
+ ac.adjustParams(t.GetGoAwayReason())
+ default:
+ }
+ ac.mu.Lock()
+ // Set connectivity state to TransientFailure before calling
+ // resetTransport. Transition READY->CONNECTING is not valid.
+ ac.state = connectivity.TransientFailure
+ if ac.cc.balancerWrapper != nil {
+ ac.cc.balancerWrapper.handleSubConnStateChange(ac.acbw, ac.state)
+ } else {
+ ac.cc.csMgr.updateState(ac.state)
+ }
+ ac.curAddr = resolver.Address{}
+ ac.mu.Unlock()
+ if err := ac.resetTransport(); err != nil {
ac.mu.Lock()
- if ac.state == connectivity.Shutdown {
- // ac has been shutdown.
- ac.mu.Unlock()
- return
- }
- oldState := ac.state
- ac.state = connectivity.TransientFailure
- ac.csEvltr.recordTransition(oldState, ac.state)
+ ac.printf("transport exiting: %v", err)
ac.mu.Unlock()
- if err := ac.resetTransport(false); err != nil {
- grpclog.Infof("get error from resetTransport %v, transportMonitor returning", err)
- ac.mu.Lock()
- ac.printf("transport exiting: %v", err)
- ac.mu.Unlock()
- grpclog.Warningf("grpc: addrConn.transportMonitor exits due to: %v", err)
- if err != errConnClosing {
- // Keep this ac in cc.conns, to get the reason it's torn down.
- ac.tearDown(err)
- }
- return
+ grpclog.Warningf("grpc: addrConn.transportMonitor exits due to: %v", err)
+ if err != errConnClosing {
+ // Keep this ac in cc.conns, to get the reason it's torn down.
+ ac.tearDown(err)
}
+ return
}
}
}
@@ -1106,6 +1075,28 @@ func (ac *addrConn) wait(ctx context.Context, hasBalancer, failfast bool) (trans
}
}
+// getReadyTransport returns the transport if ac's state is READY.
+// Otherwise it returns nil, false.
+// If ac's state is IDLE, it will trigger ac to connect.
+func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) {
+ ac.mu.Lock()
+ if ac.state == connectivity.Ready {
+ t := ac.transport
+ ac.mu.Unlock()
+ return t, true
+ }
+ var idle bool
+ if ac.state == connectivity.Idle {
+ idle = true
+ }
+ ac.mu.Unlock()
+ // Trigger idle ac to connect.
+ if idle {
+ ac.connect(false)
+ }
+ return nil, false
+}
+
// tearDown starts to tear down the addrConn.
// TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in
// some edge cases (e.g., the caller opens and closes many addrConn's in a
@@ -1113,13 +1104,9 @@ func (ac *addrConn) wait(ctx context.Context, hasBalancer, failfast bool) (trans
// tearDown doesn't remove ac from ac.cc.conns.
func (ac *addrConn) tearDown(err error) {
ac.cancel()
-
ac.mu.Lock()
+ ac.curAddr = resolver.Address{}
defer ac.mu.Unlock()
- if ac.down != nil {
- ac.down(downErrorf(false, false, "%v", err))
- ac.down = nil
- }
if err == errConnDrain && ac.transport != nil {
// GracefulClose(...) may be executed multiple times when
// i) receiving multiple GoAway frames from the server; or
@@ -1130,10 +1117,13 @@ func (ac *addrConn) tearDown(err error) {
if ac.state == connectivity.Shutdown {
return
}
- oldState := ac.state
ac.state = connectivity.Shutdown
ac.tearDownErr = err
- ac.csEvltr.recordTransition(oldState, ac.state)
+ if ac.cc.balancerWrapper != nil {
+ ac.cc.balancerWrapper.handleSubConnStateChange(ac.acbw, ac.state)
+ } else {
+ ac.cc.csMgr.updateState(ac.state)
+ }
if ac.events != nil {
ac.events.Finish()
ac.events = nil
@@ -1142,8 +1132,11 @@ func (ac *addrConn) tearDown(err error) {
close(ac.ready)
ac.ready = nil
}
- if ac.transport != nil && err != errConnDrain {
- ac.transport.Close()
- }
return
}
+
+func (ac *addrConn) getState() connectivity.State {
+ ac.mu.Lock()
+ defer ac.mu.Unlock()
+ return ac.state
+}