aboutsummaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/stream.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/google.golang.org/grpc/stream.go')
-rw-r--r--vendor/google.golang.org/grpc/stream.go31
1 files changed, 19 insertions, 12 deletions
diff --git a/vendor/google.golang.org/grpc/stream.go b/vendor/google.golang.org/grpc/stream.go
index 9eeaafef8..f91381995 100644
--- a/vendor/google.golang.org/grpc/stream.go
+++ b/vendor/google.golang.org/grpc/stream.go
@@ -163,7 +163,7 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
if ct != encoding.Identity {
comp = encoding.GetCompressor(ct)
if comp == nil {
- return nil, Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct)
+ return nil, status.Errorf(codes.Internal, "grpc: Compressor is not installed for requested grpc-encoding %q", ct)
}
}
} else if cc.dopts.cp != nil {
@@ -232,7 +232,14 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
s, err = t.NewStream(ctx, callHdr)
if err != nil {
if done != nil {
- done(balancer.DoneInfo{Err: err})
+ doneInfo := balancer.DoneInfo{Err: err}
+ if _, ok := err.(transport.ConnectionError); ok {
+ // If error is connection error, transport was sending data on wire,
+ // and we are not sure if anything has been sent on wire.
+ // If error is not connection error, we are sure nothing has been sent.
+ doneInfo.BytesSent = true
+ }
+ done(doneInfo)
done = nil
}
// In the event of any error from NewStream, we never attempted to write
@@ -393,10 +400,10 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {
return err
}
if cs.c.maxSendMessageSize == nil {
- return Errorf(codes.Internal, "callInfo maxSendMessageSize field uninitialized(nil)")
+ return status.Errorf(codes.Internal, "callInfo maxSendMessageSize field uninitialized(nil)")
}
if len(data) > *cs.c.maxSendMessageSize {
- return Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(data), *cs.c.maxSendMessageSize)
+ return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(data), *cs.c.maxSendMessageSize)
}
err = cs.t.Write(cs.s, hdr, data, &transport.Options{Last: false})
if err == nil && outPayload != nil {
@@ -414,7 +421,7 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) {
}
}
if cs.c.maxReceiveMessageSize == nil {
- return Errorf(codes.Internal, "callInfo maxReceiveMessageSize field uninitialized(nil)")
+ return status.Errorf(codes.Internal, "callInfo maxReceiveMessageSize field uninitialized(nil)")
}
if !cs.decompSet {
// Block until we receive headers containing received message encoding.
@@ -456,7 +463,7 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) {
// Special handling for client streaming rpc.
// This recv expects EOF or errors, so we don't collect inPayload.
if cs.c.maxReceiveMessageSize == nil {
- return Errorf(codes.Internal, "callInfo maxReceiveMessageSize field uninitialized(nil)")
+ return status.Errorf(codes.Internal, "callInfo maxReceiveMessageSize field uninitialized(nil)")
}
err = recv(cs.p, cs.codec, cs.s, cs.dc, m, *cs.c.maxReceiveMessageSize, nil, cs.decomp)
cs.closeTransportStream(err)
@@ -529,11 +536,11 @@ func (cs *clientStream) finish(err error) {
o.after(cs.c)
}
if cs.done != nil {
- updateRPCInfoInContext(cs.s.Context(), rpcInfo{
- bytesSent: true,
- bytesReceived: cs.s.BytesReceived(),
+ cs.done(balancer.DoneInfo{
+ Err: err,
+ BytesSent: true,
+ BytesReceived: cs.s.BytesReceived(),
})
- cs.done(balancer.DoneInfo{Err: err})
cs.done = nil
}
if cs.statsHandler != nil {
@@ -653,7 +660,7 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) {
return err
}
if len(data) > ss.maxSendMessageSize {
- return Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(data), ss.maxSendMessageSize)
+ return status.Errorf(codes.ResourceExhausted, "trying to send message larger than max (%d vs. %d)", len(data), ss.maxSendMessageSize)
}
if err := ss.t.Write(ss.s, hdr, data, &transport.Options{Last: false}); err != nil {
return toRPCErr(err)
@@ -693,7 +700,7 @@ func (ss *serverStream) RecvMsg(m interface{}) (err error) {
return err
}
if err == io.ErrUnexpectedEOF {
- err = Errorf(codes.Internal, io.ErrUnexpectedEOF.Error())
+ err = status.Errorf(codes.Internal, io.ErrUnexpectedEOF.Error())
}
return toRPCErr(err)
}