aboutsummaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/call.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/call.go')
-rw-r--r--vendor/google.golang.org/grpc/call.go46
1 files changed, 22 insertions, 24 deletions
diff --git a/vendor/google.golang.org/grpc/call.go b/vendor/google.golang.org/grpc/call.go
index 797190f14..1ef2507c3 100644
--- a/vendor/google.golang.org/grpc/call.go
+++ b/vendor/google.golang.org/grpc/call.go
@@ -25,6 +25,7 @@ import (
"golang.org/x/net/context"
"golang.org/x/net/trace"
+ "google.golang.org/grpc/balancer"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/stats"
@@ -99,17 +100,17 @@ func sendRequest(ctx context.Context, dopts dialOptions, compressor Compressor,
Client: true,
}
}
- outBuf, err := encode(dopts.codec, args, compressor, cbuf, outPayload)
+ hdr, data, err := encode(dopts.codec, args, compressor, cbuf, outPayload)
if err != nil {
return err
}
if c.maxSendMessageSize == nil {
return Errorf(codes.Internal, "callInfo maxSendMessageSize field uninitialized(nil)")
}
- if len(outBuf) > *c.maxSendMessageSize {
- return Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(outBuf), *c.maxSendMessageSize)
+ if len(data) > *c.maxSendMessageSize {
+ return Errorf(codes.ResourceExhausted, "grpc: trying to send message larger than max (%d vs. %d)", len(data), *c.maxSendMessageSize)
}
- err = t.Write(stream, outBuf, opts)
+ err = t.Write(stream, hdr, data, opts)
if err == nil && outPayload != nil {
outPayload.SentTime = time.Now()
dopts.copts.StatsHandler.HandleRPC(ctx, outPayload)
@@ -135,7 +136,7 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
}
func invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) (e error) {
- c := defaultCallInfo
+ c := defaultCallInfo()
mc := cc.GetMethodConfig(method)
if mc.WaitForReady != nil {
c.failFast = !*mc.WaitForReady
@@ -149,13 +150,13 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
opts = append(cc.dopts.callOptions, opts...)
for _, o := range opts {
- if err := o.before(&c); err != nil {
+ if err := o.before(c); err != nil {
return toRPCErr(err)
}
}
defer func() {
for _, o := range opts {
- o.after(&c)
+ o.after(c)
}
}()
@@ -178,7 +179,7 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
}
}()
}
- ctx = newContextWithRPCInfo(ctx)
+ ctx = newContextWithRPCInfo(ctx, c.failFast)
sh := cc.dopts.copts.StatsHandler
if sh != nil {
ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: c.failFast})
@@ -206,9 +207,9 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
err error
t transport.ClientTransport
stream *transport.Stream
- // Record the put handler from Balancer.Get(...). It is called once the
+ // Record the done handler from Balancer.Get(...). It is called once the
// RPC has completed or failed.
- put func()
+ done func(balancer.DoneInfo)
)
// TODO(zhaoq): Need a formal spec of fail-fast.
callHdr := &transport.CallHdr{
@@ -222,10 +223,7 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
callHdr.Creds = c.creds
}
- gopts := BalancerGetOptions{
- BlockingWait: !c.failFast,
- }
- t, put, err = cc.getTransport(ctx, gopts)
+ t, done, err = cc.getTransport(ctx, c.failFast)
if err != nil {
// TODO(zhaoq): Probably revisit the error handling.
if _, ok := status.FromError(err); ok {
@@ -245,14 +243,14 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
}
stream, err = t.NewStream(ctx, callHdr)
if err != nil {
- if put != nil {
+ if done != nil {
if _, ok := err.(transport.ConnectionError); ok {
// If error is connection error, transport was sending data on wire,
// and we are not sure if anything has been sent on wire.
// If error is not connection error, we are sure nothing has been sent.
updateRPCInfoInContext(ctx, rpcInfo{bytesSent: true, bytesReceived: false})
}
- put()
+ done(balancer.DoneInfo{Err: err})
}
if _, ok := err.(transport.ConnectionError); (ok || err == transport.ErrStreamDrain) && !c.failFast {
continue
@@ -262,14 +260,14 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
if peer, ok := peer.FromContext(stream.Context()); ok {
c.peer = peer
}
- err = sendRequest(ctx, cc.dopts, cc.dopts.cp, &c, callHdr, stream, t, args, topts)
+ err = sendRequest(ctx, cc.dopts, cc.dopts.cp, c, callHdr, stream, t, args, topts)
if err != nil {
- if put != nil {
+ if done != nil {
updateRPCInfoInContext(ctx, rpcInfo{
bytesSent: stream.BytesSent(),
bytesReceived: stream.BytesReceived(),
})
- put()
+ done(balancer.DoneInfo{Err: err})
}
// Retry a non-failfast RPC when
// i) there is a connection error; or
@@ -279,14 +277,14 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
}
return toRPCErr(err)
}
- err = recvResponse(ctx, cc.dopts, t, &c, stream, reply)
+ err = recvResponse(ctx, cc.dopts, t, c, stream, reply)
if err != nil {
- if put != nil {
+ if done != nil {
updateRPCInfoInContext(ctx, rpcInfo{
bytesSent: stream.BytesSent(),
bytesReceived: stream.BytesReceived(),
})
- put()
+ done(balancer.DoneInfo{Err: err})
}
if _, ok := err.(transport.ConnectionError); (ok || err == transport.ErrStreamDrain) && !c.failFast {
continue
@@ -297,12 +295,12 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
c.traceInfo.tr.LazyLog(&payload{sent: false, msg: reply}, true)
}
t.CloseStream(stream, nil)
- if put != nil {
+ if done != nil {
updateRPCInfoInContext(ctx, rpcInfo{
bytesSent: stream.BytesSent(),
bytesReceived: stream.BytesReceived(),
})
- put()
+ done(balancer.DoneInfo{Err: err})
}
return stream.Status().Err()
}