diff options
Diffstat (limited to 'vendor/google.golang.org/grpc/stream.go')
-rw-r--r-- | vendor/google.golang.org/grpc/stream.go | 31 |
1 files changed, 19 insertions, 12 deletions
diff --git a/vendor/google.golang.org/grpc/stream.go b/vendor/google.golang.org/grpc/stream.go index 9eeaafef8..f91381995 100644 --- a/vendor/google.golang.org/grpc/stream.go +++ b/vendor/google.golang.org/grpc/stream.go @@ -163,7 +163,7 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth if ct != encoding.Identity { comp = encoding.GetCompressor(ct) if comp == nil { - return nil, Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct) + return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct) } } } else if cc.dopts.cp != nil { @@ -232,7 +232,14 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth s, err = t.NewStream(ctx, callHdr) if err != nil { if done != nil { - done(balancer.DoneInfo{Err: err}) + doneInfo := balancer.DoneInfo{Err: err} + if _, ok := err.(transport.ConnectionError); ok { + // If error is connection error, transport was sending data on wire, + // and we are not sure if anything has been sent on wire. + // If error is not connection error, we are sure nothing has been sent. + doneInfo.BytesSent = true + } + done(doneInfo) done = nil } // In the event of any error from NewStream, we never attempted to write @@ -393,10 +400,10 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) { return err } if cs.c.maxSendMessageSize == nil { - return Errorf(codes.Internal, "callInfo maxSendMessageSize field uninitialized(nil)") + return status.Errorf(codes.Internal, "callInfo maxSendMessageSize field uninitialized(nil)") } if len(data) > *cs.c.maxSendMessageSize { - return Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(data), *cs.c.maxSendMessageSize) + return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(data), *cs.c.maxSendMessageSize) } err = cs.t.Write(cs.s, hdr, data, &transport.Options{Last: false}) if err == nil && outPayload != nil { @@ -414,7 +421,7 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) { } } if cs.c.maxReceiveMessageSize == nil { - return Errorf(codes.Internal, "callInfo maxReceiveMessageSize field uninitialized(nil)") + return status.Errorf(codes.Internal, "callInfo maxReceiveMessageSize field uninitialized(nil)") } if !cs.decompSet { // Block until we receive headers containing received message encoding. @@ -456,7 +463,7 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) { // Special handling for client streaming rpc. // This recv expects EOF or errors, so we don't collect inPayload. if cs.c.maxReceiveMessageSize == nil { - return Errorf(codes.Internal, "callInfo maxReceiveMessageSize field uninitialized(nil)") + return status.Errorf(codes.Internal, "callInfo maxReceiveMessageSize field uninitialized(nil)") } err = recv(cs.p, cs.codec, cs.s, cs.dc, m, *cs.c.maxReceiveMessageSize, nil, cs.decomp) cs.closeTransportStream(err) @@ -529,11 +536,11 @@ func (cs *clientStream) finish(err error) { o.after(cs.c) } if cs.done != nil { - updateRPCInfoInContext(cs.s.Context(), rpcInfo{ - bytesSent: true, - bytesReceived: cs.s.BytesReceived(), + cs.done(balancer.DoneInfo{ + Err: err, + BytesSent: true, + BytesReceived: cs.s.BytesReceived(), }) - cs.done(balancer.DoneInfo{Err: err}) cs.done = nil } if cs.statsHandler != nil { @@ -653,7 +660,7 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) { return err } if len(data) > ss.maxSendMessageSize { - return Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(data), ss.maxSendMessageSize) + return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(data), ss.maxSendMessageSize) } if err := ss.t.Write(ss.s, hdr, data, &transport.Options{Last: false}); err != nil { return toRPCErr(err) @@ -693,7 +700,7 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) { return err } if err == io.ErrUnexpectedEOF { - err = Errorf(codes.Internal, io.ErrUnexpectedEOF.Error()) + err = status.Errorf(codes.Internal, io.ErrUnexpectedEOF.Error()) } return toRPCErr(err) } |