diff options
Diffstat (limited to 'vendor/google.golang.org/grpc/call.go')
-rw-r--r-- | vendor/google.golang.org/grpc/call.go | 163 |
1 files changed, 100 insertions, 63 deletions
diff --git a/vendor/google.golang.org/grpc/call.go b/vendor/google.golang.org/grpc/call.go index 1ef2507c3..0854f84b9 100644 --- a/vendor/google.golang.org/grpc/call.go +++ b/vendor/google.golang.org/grpc/call.go @@ -19,7 +19,6 @@ package grpc import ( - "bytes" "io" "time" @@ -27,9 +26,9 @@ import ( "golang.org/x/net/trace" "google.golang.org/grpc/balancer" "google.golang.org/grpc/codes" + "google.golang.org/grpc/encoding" "google.golang.org/grpc/peer" "google.golang.org/grpc/stats" - "google.golang.org/grpc/status" "google.golang.org/grpc/transport" ) @@ -62,7 +61,17 @@ func recvResponse(ctx context.Context, dopts dialOptions, t transport.ClientTran if c.maxReceiveMessageSize == nil { return Errorf(codes.Internal, "callInfo maxReceiveMessageSize field uninitialized(nil)") } - if err = recv(p, dopts.codec, stream, dopts.dc, reply, *c.maxReceiveMessageSize, inPayload); err != nil { + + // Set dc if it exists and matches the message compression type used, + // otherwise set comp if a registered compressor exists for it. + var comp encoding.Compressor + var dc Decompressor + if rc := stream.RecvCompress(); dopts.dc != nil && dopts.dc.Type() == rc { + dc = dopts.dc + } else if rc != "" && rc != encoding.Identity { + comp = encoding.GetCompressor(rc) + } + if err = recv(p, dopts.codec, stream, dc, reply, *c.maxReceiveMessageSize, inPayload, comp); err != nil { if err == io.EOF { break } @@ -89,18 +98,25 @@ func sendRequest(ctx context.Context, dopts dialOptions, compressor Compressor, } }() var ( - cbuf *bytes.Buffer outPayload *stats.OutPayload ) - if compressor != nil { - cbuf = new(bytes.Buffer) - } if dopts.copts.StatsHandler != nil { outPayload = &stats.OutPayload{ Client: true, } } - hdr, data, err := encode(dopts.codec, args, compressor, cbuf, outPayload) + // Set comp and clear compressor if a registered compressor matches the type + // specified via UseCompressor. (And error if a matching compressor is not + // registered.) + var comp encoding.Compressor + if ct := c.compressorType; ct != "" && ct != encoding.Identity { + compressor = nil // Disable the legacy compressor. + comp = encoding.GetCompressor(ct) + if comp == nil { + return Errorf(codes.Internal, "grpc: Compressor is not installed for grpc-encoding %q", ct) + } + } + hdr, data, err := encode(dopts.codec, args, compressor, outPayload, comp) if err != nil { return err } @@ -125,16 +141,23 @@ func sendRequest(ctx context.Context, dopts dialOptions, compressor Compressor, return nil } -// Invoke sends the RPC request on the wire and returns after response is received. -// Invoke is called by generated code. Also users can call Invoke directly when it -// is really needed in their use cases. -func Invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) error { +// Invoke sends the RPC request on the wire and returns after response is +// received. This is typically called by generated code. +func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts ...CallOption) error { if cc.dopts.unaryInt != nil { return cc.dopts.unaryInt(ctx, method, args, reply, cc, invoke, opts...) } return invoke(ctx, method, args, reply, cc, opts...) } +// Invoke sends the RPC request on the wire and returns after response is +// received. This is typically called by generated code. +// +// DEPRECATED: Use ClientConn.Invoke instead. +func Invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) error { + return cc.Invoke(ctx, method, args, reply, opts...) +} + func invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) (e error) { c := defaultCallInfo() mc := cc.GetMethodConfig(method) @@ -202,57 +225,45 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli Last: true, Delay: false, } + callHdr := &transport.CallHdr{ + Host: cc.authority, + Method: method, + } + if c.creds != nil { + callHdr.Creds = c.creds + } + if c.compressorType != "" { + callHdr.SendCompress = c.compressorType + } else if cc.dopts.cp != nil { + callHdr.SendCompress = cc.dopts.cp.Type() + } + firstAttempt := true + for { - var ( - err error - t transport.ClientTransport - stream *transport.Stream - // Record the done handler from Balancer.Get(...). It is called once the - // RPC has completed or failed. - done func(balancer.DoneInfo) - ) - // TODO(zhaoq): Need a formal spec of fail-fast. - callHdr := &transport.CallHdr{ - Host: cc.authority, - Method: method, - } - if cc.dopts.cp != nil { - callHdr.SendCompress = cc.dopts.cp.Type() - } - if c.creds != nil { - callHdr.Creds = c.creds + // Check to make sure the context has expired. This will prevent us from + // looping forever if an error occurs for wait-for-ready RPCs where no data + // is sent on the wire. + select { + case <-ctx.Done(): + return toRPCErr(ctx.Err()) + default: } - t, done, err = cc.getTransport(ctx, c.failFast) + // Record the done handler from Balancer.Get(...). It is called once the + // RPC has completed or failed. + t, done, err := cc.getTransport(ctx, c.failFast) if err != nil { - // TODO(zhaoq): Probably revisit the error handling. - if _, ok := status.FromError(err); ok { - return err - } - if err == errConnClosing || err == errConnUnavailable { - if c.failFast { - return Errorf(codes.Unavailable, "%v", err) - } - continue - } - // All the other errors are treated as Internal errors. - return Errorf(codes.Internal, "%v", err) + return err } - if c.traceInfo.tr != nil { - c.traceInfo.tr.LazyLog(&payload{sent: true, msg: args}, true) - } - stream, err = t.NewStream(ctx, callHdr) + stream, err := t.NewStream(ctx, callHdr) if err != nil { if done != nil { - if _, ok := err.(transport.ConnectionError); ok { - // If error is connection error, transport was sending data on wire, - // and we are not sure if anything has been sent on wire. - // If error is not connection error, we are sure nothing has been sent. - updateRPCInfoInContext(ctx, rpcInfo{bytesSent: true, bytesReceived: false}) - } done(balancer.DoneInfo{Err: err}) } - if _, ok := err.(transport.ConnectionError); (ok || err == transport.ErrStreamDrain) && !c.failFast { + // In the event of any error from NewStream, we never attempted to write + // anything to the wire, so we can retry indefinitely for non-fail-fast + // RPCs. + if !c.failFast { continue } return toRPCErr(err) @@ -260,20 +271,30 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli if peer, ok := peer.FromContext(stream.Context()); ok { c.peer = peer } + if c.traceInfo.tr != nil { + c.traceInfo.tr.LazyLog(&payload{sent: true, msg: args}, true) + } err = sendRequest(ctx, cc.dopts, cc.dopts.cp, c, callHdr, stream, t, args, topts) if err != nil { if done != nil { updateRPCInfoInContext(ctx, rpcInfo{ - bytesSent: stream.BytesSent(), + bytesSent: true, bytesReceived: stream.BytesReceived(), }) done(balancer.DoneInfo{Err: err}) } // Retry a non-failfast RPC when - // i) there is a connection error; or - // ii) the server started to drain before this RPC was initiated. - if _, ok := err.(transport.ConnectionError); (ok || err == transport.ErrStreamDrain) && !c.failFast { - continue + // i) the server started to drain before this RPC was initiated. + // ii) the server refused the stream. + if !c.failFast && stream.Unprocessed() { + // In this case, the server did not receive the data, but we still + // created wire traffic, so we should not retry indefinitely. + if firstAttempt { + // TODO: Add a field to header for grpc-transparent-retry-attempts + firstAttempt = false + continue + } + // Otherwise, give up and return an error anyway. } return toRPCErr(err) } @@ -281,13 +302,20 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli if err != nil { if done != nil { updateRPCInfoInContext(ctx, rpcInfo{ - bytesSent: stream.BytesSent(), + bytesSent: true, bytesReceived: stream.BytesReceived(), }) done(balancer.DoneInfo{Err: err}) } - if _, ok := err.(transport.ConnectionError); (ok || err == transport.ErrStreamDrain) && !c.failFast { - continue + if !c.failFast && stream.Unprocessed() { + // In these cases, the server did not receive the data, but we still + // created wire traffic, so we should not retry indefinitely. + if firstAttempt { + // TODO: Add a field to header for grpc-transparent-retry-attempts + firstAttempt = false + continue + } + // Otherwise, give up and return an error anyway. } return toRPCErr(err) } @@ -297,11 +325,20 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli t.CloseStream(stream, nil) if done != nil { updateRPCInfoInContext(ctx, rpcInfo{ - bytesSent: stream.BytesSent(), + bytesSent: true, bytesReceived: stream.BytesReceived(), }) done(balancer.DoneInfo{Err: err}) } + if !c.failFast && stream.Unprocessed() { + // In these cases, the server did not receive the data, but we still + // created wire traffic, so we should not retry indefinitely. + if firstAttempt { + // TODO: Add a field to header for grpc-transparent-retry-attempts + firstAttempt = false + continue + } + } return stream.Status().Err() } } |