aboutsummaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/clientconn.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/clientconn.go')
-rw-r--r--vendor/google.golang.org/grpc/clientconn.go338
1 files changed, 182 insertions, 156 deletions
diff --git a/vendor/google.golang.org/grpc/clientconn.go b/vendor/google.golang.org/grpc/clientconn.go
index 5462062be..ae605bc32 100644
--- a/vendor/google.golang.org/grpc/clientconn.go
+++ b/vendor/google.golang.org/grpc/clientconn.go
@@ -31,11 +31,14 @@ import (
"golang.org/x/net/context"
"golang.org/x/net/trace"
"google.golang.org/grpc/balancer"
+ _ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin.
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/resolver"
+ _ "google.golang.org/grpc/resolver/dns" // To register dns resolver.
+ _ "google.golang.org/grpc/resolver/passthrough" // To register passthrough resolver.
"google.golang.org/grpc/stats"
"google.golang.org/grpc/transport"
)
@@ -48,7 +51,20 @@ var (
// underlying connections within the specified timeout.
// DEPRECATED: Please use context.DeadlineExceeded instead.
ErrClientConnTimeout = errors.New("grpc: timed out when dialing")
+ // errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
+ errConnDrain = errors.New("grpc: the connection is drained")
+ // errConnClosing indicates that the connection is closing.
+ errConnClosing = errors.New("grpc: the connection is closing")
+ // errConnUnavailable indicates that the connection is unavailable.
+ errConnUnavailable = errors.New("grpc: the connection is unavailable")
+ // errBalancerClosed indicates that the balancer is closed.
+ errBalancerClosed = errors.New("grpc: balancer is closed")
+ // minimum time to give a connection to complete
+ minConnectTimeout = 20 * time.Second
+)
+// The following errors are returned from Dial and DialContext
+var (
// errNoTransportSecurity indicates that there is no transport security
// being set for ClientConn. Users should either set one or explicitly
// call WithInsecure DialOption to disable security.
@@ -62,16 +78,6 @@ var (
errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)")
// errNetworkIO indicates that the connection is down due to some network I/O error.
errNetworkIO = errors.New("grpc: failed with network I/O error")
- // errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
- errConnDrain = errors.New("grpc: the connection is drained")
- // errConnClosing indicates that the connection is closing.
- errConnClosing = errors.New("grpc: the connection is closing")
- // errConnUnavailable indicates that the connection is unavailable.
- errConnUnavailable = errors.New("grpc: the connection is unavailable")
- // errBalancerClosed indicates that the balancer is closed.
- errBalancerClosed = errors.New("grpc: balancer is closed")
- // minimum time to give a connection to complete
- minConnectTimeout = 20 * time.Second
)
// dialOptions configure a Dial call. dialOptions are set by the DialOption
@@ -152,16 +158,26 @@ func WithCodec(c Codec) DialOption {
}
}
-// WithCompressor returns a DialOption which sets a CompressorGenerator for generating message
-// compressor.
+// WithCompressor returns a DialOption which sets a Compressor to use for
+// message compression. It has lower priority than the compressor set by
+// the UseCompressor CallOption.
+//
+// Deprecated: use UseCompressor instead.
func WithCompressor(cp Compressor) DialOption {
return func(o *dialOptions) {
o.cp = cp
}
}
-// WithDecompressor returns a DialOption which sets a DecompressorGenerator for generating
-// message decompressor.
+// WithDecompressor returns a DialOption which sets a Decompressor to use for
+// incoming message decompression. If incoming response messages are encoded
+// using the decompressor's Type(), it will be used. Otherwise, the message
+// encoding will be used to look up the compressor registered via
+// encoding.RegisterCompressor, which will then be used to decompress the
+// message. If no compressor is registered for the encoding, an Unimplemented
+// status error will be returned.
+//
+// Deprecated: use encoding.RegisterCompressor instead.
func WithDecompressor(dc Decompressor) DialOption {
return func(o *dialOptions) {
o.dc = dc
@@ -189,6 +205,8 @@ func WithBalancerBuilder(b balancer.Builder) DialOption {
}
// WithServiceConfig returns a DialOption which has a channel to read the service configuration.
+// DEPRECATED: service config should be received through name resolver, as specified here.
+// https://github.com/grpc/grpc/blob/master/doc/service_config.md
func WithServiceConfig(c <-chan ServiceConfig) DialOption {
return func(o *dialOptions) {
o.scChan = c
@@ -378,7 +396,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
if cc.dopts.copts.Dialer == nil {
cc.dopts.copts.Dialer = newProxyDialer(
func(ctx context.Context, addr string) (net.Conn, error) {
- return (&net.Dialer{}).DialContext(ctx, "tcp", addr)
+ return dialContext(ctx, "tcp", addr)
},
)
}
@@ -426,51 +444,18 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
if cc.dopts.bs == nil {
cc.dopts.bs = DefaultBackoffConfig
}
+ cc.parsedTarget = parseTarget(cc.target)
creds := cc.dopts.copts.TransportCredentials
if creds != nil && creds.Info().ServerName != "" {
cc.authority = creds.Info().ServerName
} else if cc.dopts.insecure && cc.dopts.copts.Authority != "" {
cc.authority = cc.dopts.copts.Authority
} else {
- cc.authority = target
+ // Use endpoint from "scheme://authority/endpoint" as the default
+ // authority for ClientConn.
+ cc.authority = cc.parsedTarget.Endpoint
}
- if cc.dopts.balancerBuilder != nil {
- var credsClone credentials.TransportCredentials
- if creds != nil {
- credsClone = creds.Clone()
- }
- buildOpts := balancer.BuildOptions{
- DialCreds: credsClone,
- Dialer: cc.dopts.copts.Dialer,
- }
- // Build should not take long time. So it's ok to not have a goroutine for it.
- // TODO(bar) init balancer after first resolver result to support service config balancer.
- cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, buildOpts)
- } else {
- waitC := make(chan error, 1)
- go func() {
- defer close(waitC)
- // No balancer, or no resolver within the balancer. Connect directly.
- ac, err := cc.newAddrConn([]resolver.Address{{Addr: target}})
- if err != nil {
- waitC <- err
- return
- }
- if err := ac.connect(cc.dopts.block); err != nil {
- waitC <- err
- return
- }
- }()
- select {
- case <-ctx.Done():
- return nil, ctx.Err()
- case err := <-waitC:
- if err != nil {
- return nil, err
- }
- }
- }
if cc.dopts.scChan != nil && !scSet {
// Blocking wait for the initial service config.
select {
@@ -486,20 +471,27 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
go cc.scWatcher()
}
+ var credsClone credentials.TransportCredentials
+ if creds := cc.dopts.copts.TransportCredentials; creds != nil {
+ credsClone = creds.Clone()
+ }
+ cc.balancerBuildOpts = balancer.BuildOptions{
+ DialCreds: credsClone,
+ Dialer: cc.dopts.copts.Dialer,
+ }
+
+ if cc.dopts.balancerBuilder != nil {
+ cc.customBalancer = true
+ // Build should not take long time. So it's ok to not have a goroutine for it.
+ cc.balancerWrapper = newCCBalancerWrapper(cc, cc.dopts.balancerBuilder, cc.balancerBuildOpts)
+ }
+
// Build the resolver.
cc.resolverWrapper, err = newCCResolverWrapper(cc)
if err != nil {
return nil, fmt.Errorf("failed to build resolver: %v", err)
}
- if cc.balancerWrapper != nil && cc.resolverWrapper == nil {
- // TODO(bar) there should always be a resolver (DNS as the default).
- // Unblock balancer initialization with a fake resolver update if there's no resolver.
- // The balancer wrapper will not read the addresses, so an empty list works.
- // TODO(bar) remove this after the real resolver is started.
- cc.balancerWrapper.handleResolvedAddrs([]resolver.Address{}, nil)
- }
-
// A blocking dial blocks until the clientConn is ready.
if cc.dopts.block {
for {
@@ -565,21 +557,26 @@ type ClientConn struct {
ctx context.Context
cancel context.CancelFunc
- target string
- authority string
- dopts dialOptions
- csMgr *connectivityStateManager
-
- balancerWrapper *ccBalancerWrapper
- resolverWrapper *ccResolverWrapper
+ target string
+ parsedTarget resolver.Target
+ authority string
+ dopts dialOptions
+ csMgr *connectivityStateManager
- blockingpicker *pickerWrapper
+ customBalancer bool // If this is true, switching balancer will be disabled.
+ balancerBuildOpts balancer.BuildOptions
+ resolverWrapper *ccResolverWrapper
+ blockingpicker *pickerWrapper
mu sync.RWMutex
sc ServiceConfig
+ scRaw string
conns map[*addrConn]struct{}
// Keepalive parameter can be updated if a GoAway is received.
- mkp keepalive.ClientParameters
+ mkp keepalive.ClientParameters
+ curBalancerName string
+ curAddresses []resolver.Address
+ balancerWrapper *ccBalancerWrapper
}
// WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or
@@ -615,6 +612,7 @@ func (cc *ClientConn) scWatcher() {
// TODO: load balance policy runtime change is ignored.
// We may revist this decision in the future.
cc.sc = sc
+ cc.scRaw = ""
cc.mu.Unlock()
case <-cc.ctx.Done():
return
@@ -622,6 +620,69 @@ func (cc *ClientConn) scWatcher() {
}
}
+func (cc *ClientConn) handleResolvedAddrs(addrs []resolver.Address, err error) {
+ cc.mu.Lock()
+ defer cc.mu.Unlock()
+ if cc.conns == nil {
+ return
+ }
+
+ // TODO(bar switching) when grpclb is submitted, check address type and start grpclb.
+ if !cc.customBalancer && cc.balancerWrapper == nil {
+ // No customBalancer was specified by DialOption, and this is the first
+ // time handling resolved addresses, create a pickfirst balancer.
+ builder := newPickfirstBuilder()
+ cc.curBalancerName = builder.Name()
+ cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)
+ }
+
+ // TODO(bar switching) compare addresses, if there's no update, don't notify balancer.
+ cc.curAddresses = addrs
+ cc.balancerWrapper.handleResolvedAddrs(addrs, nil)
+}
+
+// switchBalancer starts the switching from current balancer to the balancer with name.
+func (cc *ClientConn) switchBalancer(name string) {
+ if cc.conns == nil {
+ return
+ }
+ grpclog.Infof("ClientConn switching balancer to %q", name)
+
+ if cc.customBalancer {
+ grpclog.Infoln("ignoring service config balancer configuration: WithBalancer DialOption used instead")
+ return
+ }
+
+ if cc.curBalancerName == name {
+ return
+ }
+
+ // TODO(bar switching) change this to two steps: drain and close.
+ // Keep track of sc in wrapper.
+ cc.balancerWrapper.close()
+
+ builder := balancer.Get(name)
+ if builder == nil {
+ grpclog.Infof("failed to get balancer builder for: %v (this should never happen...)", name)
+ builder = newPickfirstBuilder()
+ }
+ cc.curBalancerName = builder.Name()
+ cc.balancerWrapper = newCCBalancerWrapper(cc, builder, cc.balancerBuildOpts)
+ cc.balancerWrapper.handleResolvedAddrs(cc.curAddresses, nil)
+}
+
+func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State) {
+ cc.mu.Lock()
+ if cc.conns == nil {
+ cc.mu.Unlock()
+ return
+ }
+ // TODO(bar switching) send updates to all balancer wrappers when balancer
+ // gracefully switching is supported.
+ cc.balancerWrapper.handleSubConnStateChange(sc, s)
+ cc.mu.Unlock()
+}
+
// newAddrConn creates an addrConn for addrs and adds it to cc.conns.
func (cc *ClientConn) newAddrConn(addrs []resolver.Address) (*addrConn, error) {
ac := &addrConn{
@@ -659,7 +720,7 @@ func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) {
// It does nothing if the ac is not IDLE.
// TODO(bar) Move this to the addrConn section.
// This was part of resetAddrConn, keep it here to make the diff look clean.
-func (ac *addrConn) connect(block bool) error {
+func (ac *addrConn) connect() error {
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
@@ -670,39 +731,21 @@ func (ac *addrConn) connect(block bool) error {
return nil
}
ac.state = connectivity.Connecting
- if ac.cc.balancerWrapper != nil {
- ac.cc.balancerWrapper.handleSubConnStateChange(ac.acbw, ac.state)
- } else {
- ac.cc.csMgr.updateState(ac.state)
- }
+ ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
ac.mu.Unlock()
- if block {
+ // Start a goroutine connecting to the server asynchronously.
+ go func() {
if err := ac.resetTransport(); err != nil {
+ grpclog.Warningf("Failed to dial %s: %v; please retry.", ac.addrs[0].Addr, err)
if err != errConnClosing {
+ // Keep this ac in cc.conns, to get the reason it's torn down.
ac.tearDown(err)
}
- if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() {
- return e.Origin()
- }
- return err
+ return
}
- // Start to monitor the error status of transport.
- go ac.transportMonitor()
- } else {
- // Start a goroutine connecting to the server asynchronously.
- go func() {
- if err := ac.resetTransport(); err != nil {
- grpclog.Warningf("Failed to dial %s: %v; please retry.", ac.addrs[0].Addr, err)
- if err != errConnClosing {
- // Keep this ac in cc.conns, to get the reason it's torn down.
- ac.tearDown(err)
- }
- return
- }
- ac.transportMonitor()
- }()
- }
+ ac.transportMonitor()
+ }()
return nil
}
@@ -756,31 +799,6 @@ func (cc *ClientConn) GetMethodConfig(method string) MethodConfig {
}
func (cc *ClientConn) getTransport(ctx context.Context, failfast bool) (transport.ClientTransport, func(balancer.DoneInfo), error) {
- if cc.balancerWrapper == nil {
- // If balancer is nil, there should be only one addrConn available.
- cc.mu.RLock()
- if cc.conns == nil {
- cc.mu.RUnlock()
- // TODO this function returns toRPCErr and non-toRPCErr. Clean up
- // the errors in ClientConn.
- return nil, nil, toRPCErr(ErrClientConnClosing)
- }
- var ac *addrConn
- for ac = range cc.conns {
- // Break after the first iteration to get the first addrConn.
- break
- }
- cc.mu.RUnlock()
- if ac == nil {
- return nil, nil, errConnClosing
- }
- t, err := ac.wait(ctx, false /*hasBalancer*/, failfast)
- if err != nil {
- return nil, nil, err
- }
- return t, nil, nil
- }
-
t, done, err := cc.blockingpicker.pick(ctx, failfast, balancer.PickOptions{})
if err != nil {
return nil, nil, toRPCErr(err)
@@ -788,6 +806,23 @@ func (cc *ClientConn) getTransport(ctx context.Context, failfast bool) (transpor
return t, done, nil
}
+// handleServiceConfig parses the service config string in JSON format to Go native
+// struct ServiceConfig, and store both the struct and the JSON string in ClientConn.
+func (cc *ClientConn) handleServiceConfig(js string) error {
+ sc, err := parseServiceConfig(js)
+ if err != nil {
+ return err
+ }
+ cc.mu.Lock()
+ cc.scRaw = js
+ cc.sc = sc
+ if sc.LB != nil {
+ cc.switchBalancer(*sc.LB)
+ }
+ cc.mu.Unlock()
+ return nil
+}
+
// Close tears down the ClientConn and all underlying connections.
func (cc *ClientConn) Close() error {
cc.cancel()
@@ -800,13 +835,18 @@ func (cc *ClientConn) Close() error {
conns := cc.conns
cc.conns = nil
cc.csMgr.updateState(connectivity.Shutdown)
+
+ rWrapper := cc.resolverWrapper
+ cc.resolverWrapper = nil
+ bWrapper := cc.balancerWrapper
+ cc.balancerWrapper = nil
cc.mu.Unlock()
cc.blockingpicker.close()
- if cc.resolverWrapper != nil {
- cc.resolverWrapper.close()
+ if rWrapper != nil {
+ rWrapper.close()
}
- if cc.balancerWrapper != nil {
- cc.balancerWrapper.close()
+ if bWrapper != nil {
+ bWrapper.close()
}
for ac := range conns {
ac.tearDown(ErrClientConnClosing)
@@ -841,7 +881,7 @@ type addrConn struct {
// receiving a GoAway.
func (ac *addrConn) adjustParams(r transport.GoAwayReason) {
switch r {
- case transport.TooManyPings:
+ case transport.GoAwayTooManyPings:
v := 2 * ac.dopts.copts.KeepaliveParams.Time
ac.cc.mu.Lock()
if v > ac.cc.mkp.Time {
@@ -902,12 +942,7 @@ func (ac *addrConn) resetTransport() error {
ac.printf("connecting")
if ac.state != connectivity.Connecting {
ac.state = connectivity.Connecting
- // TODO(bar) remove condition once we always have a balancer.
- if ac.cc.balancerWrapper != nil {
- ac.cc.balancerWrapper.handleSubConnStateChange(ac.acbw, ac.state)
- } else {
- ac.cc.csMgr.updateState(ac.state)
- }
+ ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
}
// copy ac.addrs in case of race
addrsIter := make([]resolver.Address, len(ac.addrs))
@@ -923,12 +958,19 @@ func (ac *addrConn) resetTransport() error {
}
ac.mu.Unlock()
sinfo := transport.TargetInfo{
- Addr: addr.Addr,
- Metadata: addr.Metadata,
+ Addr: addr.Addr,
+ Metadata: addr.Metadata,
+ Authority: ac.cc.authority,
}
newTransport, err := transport.NewClientTransport(ac.cc.ctx, sinfo, copts, timeout)
if err != nil {
if e, ok := err.(transport.ConnectionError); ok && !e.Temporary() {
+ ac.mu.Lock()
+ if ac.state != connectivity.Shutdown {
+ ac.state = connectivity.TransientFailure
+ ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
+ }
+ ac.mu.Unlock()
return err
}
grpclog.Warningf("grpc: addrConn.resetTransport failed to create client transport: %v; Reconnecting to %v", err, addr)
@@ -950,11 +992,7 @@ func (ac *addrConn) resetTransport() error {
return errConnClosing
}
ac.state = connectivity.Ready
- if ac.cc.balancerWrapper != nil {
- ac.cc.balancerWrapper.handleSubConnStateChange(ac.acbw, ac.state)
- } else {
- ac.cc.csMgr.updateState(ac.state)
- }
+ ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
t := ac.transport
ac.transport = newTransport
if t != nil {
@@ -970,11 +1008,7 @@ func (ac *addrConn) resetTransport() error {
}
ac.mu.Lock()
ac.state = connectivity.TransientFailure
- if ac.cc.balancerWrapper != nil {
- ac.cc.balancerWrapper.handleSubConnStateChange(ac.acbw, ac.state)
- } else {
- ac.cc.csMgr.updateState(ac.state)
- }
+ ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
if ac.ready != nil {
close(ac.ready)
ac.ready = nil
@@ -1018,11 +1052,7 @@ func (ac *addrConn) transportMonitor() {
// Set connectivity state to TransientFailure before calling
// resetTransport. Transition READY->CONNECTING is not valid.
ac.state = connectivity.TransientFailure
- if ac.cc.balancerWrapper != nil {
- ac.cc.balancerWrapper.handleSubConnStateChange(ac.acbw, ac.state)
- } else {
- ac.cc.csMgr.updateState(ac.state)
- }
+ ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
ac.curAddr = resolver.Address{}
ac.mu.Unlock()
if err := ac.resetTransport(); err != nil {
@@ -1096,7 +1126,7 @@ func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) {
ac.mu.Unlock()
// Trigger idle ac to connect.
if idle {
- ac.connect(false)
+ ac.connect()
}
return nil, false
}
@@ -1109,8 +1139,8 @@ func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) {
func (ac *addrConn) tearDown(err error) {
ac.cancel()
ac.mu.Lock()
- ac.curAddr = resolver.Address{}
defer ac.mu.Unlock()
+ ac.curAddr = resolver.Address{}
if err == errConnDrain && ac.transport != nil {
// GracefulClose(...) may be executed multiple times when
// i) receiving multiple GoAway frames from the server; or
@@ -1123,11 +1153,7 @@ func (ac *addrConn) tearDown(err error) {
}
ac.state = connectivity.Shutdown
ac.tearDownErr = err
- if ac.cc.balancerWrapper != nil {
- ac.cc.balancerWrapper.handleSubConnStateChange(ac.acbw, ac.state)
- } else {
- ac.cc.csMgr.updateState(ac.state)
- }
+ ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
if ac.events != nil {
ac.events.Finish()
ac.events = nil