aboutsummaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/server.go
diff options
context:
space:
mode:
authorGravatar Yong Tang <yong.tang.github@outlook.com> 2017-12-18 11:50:56 -0600
committerGravatar GitHub <noreply@github.com> 2017-12-18 11:50:56 -0600
commit4dd40a292c773aad5eee761940840de64ba9090a (patch)
treee7b872bed228fdf06b4ea91db60bf9b7cc8fec2a /vendor/google.golang.org/grpc/server.go
parentba4e77672c9f7b53a8c2f6e197d3f12ede1e46cd (diff)
downloadcoredns-4dd40a292c773aad5eee761940840de64ba9090a.tar.gz
coredns-4dd40a292c773aad5eee761940840de64ba9090a.tar.zst
coredns-4dd40a292c773aad5eee761940840de64ba9090a.zip
Update apache/thrift to 0.11.0 and remove pinning (#1317)
The `apache/thrift` recently released a new version of `0.11.0` several days ago. This release is compatible with other packages and as such, there is no need to pinning the `apache/thrift` to `master` anymore in Gopkg.toml. This fix removes the pinning of `apache/thrift` in Gopkg.toml, and updates all dependencies of coredns. Signed-off-by: Yong Tang <yong.tang.github@outlook.com>
Diffstat (limited to 'vendor/google.golang.org/grpc/server.go')
-rw-r--r--vendor/google.golang.org/grpc/server.go227
1 files changed, 166 insertions, 61 deletions
diff --git a/vendor/google.golang.org/grpc/server.go b/vendor/google.golang.org/grpc/server.go
index 7c882dbe6..e9737fc49 100644
--- a/vendor/google.golang.org/grpc/server.go
+++ b/vendor/google.golang.org/grpc/server.go
@@ -32,11 +32,14 @@ import (
"sync"
"time"
+ "io/ioutil"
+
"golang.org/x/net/context"
"golang.org/x/net/http2"
"golang.org/x/net/trace"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
+ "google.golang.org/grpc/encoding"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/keepalive"
@@ -96,6 +99,11 @@ type Server struct {
cv *sync.Cond
m map[string]*service // service name -> service info
events trace.EventLog
+
+ quit chan struct{}
+ done chan struct{}
+ quitOnce sync.Once
+ doneOnce sync.Once
}
type options struct {
@@ -118,11 +126,13 @@ type options struct {
initialConnWindowSize int32
writeBufferSize int
readBufferSize int
+ connectionTimeout time.Duration
}
var defaultServerOptions = options{
maxReceiveMessageSize: defaultServerMaxReceiveMessageSize,
maxSendMessageSize: defaultServerMaxSendMessageSize,
+ connectionTimeout: 120 * time.Second,
}
// A ServerOption sets options such as credentials, codec and keepalive parameters, etc.
@@ -181,14 +191,24 @@ func CustomCodec(codec Codec) ServerOption {
}
}
-// RPCCompressor returns a ServerOption that sets a compressor for outbound messages.
+// RPCCompressor returns a ServerOption that sets a compressor for outbound
+// messages. For backward compatibility, all outbound messages will be sent
+// using this compressor, regardless of incoming message compression. By
+// default, server messages will be sent using the same compressor with which
+// request messages were sent.
+//
+// Deprecated: use encoding.RegisterCompressor instead.
func RPCCompressor(cp Compressor) ServerOption {
return func(o *options) {
o.cp = cp
}
}
-// RPCDecompressor returns a ServerOption that sets a decompressor for inbound messages.
+// RPCDecompressor returns a ServerOption that sets a decompressor for inbound
+// messages. It has higher priority than decompressors registered via
+// encoding.RegisterCompressor.
+//
+// Deprecated: use encoding.RegisterCompressor instead.
func RPCDecompressor(dc Decompressor) ServerOption {
return func(o *options) {
o.dc = dc
@@ -291,6 +311,18 @@ func UnknownServiceHandler(streamHandler StreamHandler) ServerOption {
}
}
+// ConnectionTimeout returns a ServerOption that sets the timeout for
+// connection establishment (up to and including HTTP/2 handshaking) for all
+// new connections. If this is not set, the default is 120 seconds. A zero or
+// negative value will result in an immediate timeout.
+//
+// This API is EXPERIMENTAL.
+func ConnectionTimeout(d time.Duration) ServerOption {
+ return func(o *options) {
+ o.connectionTimeout = d
+ }
+}
+
// NewServer creates a gRPC server which has no service registered and has not
// started to accept requests yet.
func NewServer(opt ...ServerOption) *Server {
@@ -307,6 +339,8 @@ func NewServer(opt ...ServerOption) *Server {
opts: opts,
conns: make(map[io.Closer]bool),
m: make(map[string]*service),
+ quit: make(chan struct{}),
+ done: make(chan struct{}),
}
s.cv = sync.NewCond(&s.mu)
s.ctx, s.cancel = context.WithCancel(context.Background())
@@ -418,11 +452,9 @@ func (s *Server) GetServiceInfo() map[string]ServiceInfo {
return ret
}
-var (
- // ErrServerStopped indicates that the operation is now illegal because of
- // the server being stopped.
- ErrServerStopped = errors.New("grpc: the server has been stopped")
-)
+// ErrServerStopped indicates that the operation is now illegal because of
+// the server being stopped.
+var ErrServerStopped = errors.New("grpc: the server has been stopped")
func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) {
if s.opts.creds == nil {
@@ -436,7 +468,7 @@ func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credenti
// read gRPC requests and then call the registered handlers to reply to them.
// Serve returns when lis.Accept fails with fatal errors. lis will be closed when
// this method returns.
-// Serve always returns non-nil error.
+// Serve will return a non-nil error unless Stop or GracefulStop is called.
func (s *Server) Serve(lis net.Listener) error {
s.mu.Lock()
s.printf("serving")
@@ -487,6 +519,14 @@ func (s *Server) Serve(lis net.Listener) error {
s.mu.Lock()
s.printf("done serving; Accept = %v", err)
s.mu.Unlock()
+
+ // If Stop or GracefulStop is called, block until they are done and return nil
+ select {
+ case <-s.quit:
+ <-s.done
+ return nil
+ default:
+ }
return err
}
tempDelay = 0
@@ -499,16 +539,18 @@ func (s *Server) Serve(lis net.Listener) error {
// handleRawConn is run in its own goroutine and handles a just-accepted
// connection that has not had any I/O performed on it yet.
func (s *Server) handleRawConn(rawConn net.Conn) {
+ rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
conn, authInfo, err := s.useTransportAuthenticator(rawConn)
if err != nil {
s.mu.Lock()
s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
s.mu.Unlock()
grpclog.Warningf("grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
- // If serverHandShake returns ErrConnDispatched, keep rawConn open.
+ // If serverHandshake returns ErrConnDispatched, keep rawConn open.
if err != credentials.ErrConnDispatched {
rawConn.Close()
}
+ rawConn.SetDeadline(time.Time{})
return
}
@@ -521,18 +563,21 @@ func (s *Server) handleRawConn(rawConn net.Conn) {
s.mu.Unlock()
if s.opts.useHandlerImpl {
+ rawConn.SetDeadline(time.Time{})
s.serveUsingHandler(conn)
} else {
- s.serveHTTP2Transport(conn, authInfo)
+ st := s.newHTTP2Transport(conn, authInfo)
+ if st == nil {
+ return
+ }
+ rawConn.SetDeadline(time.Time{})
+ s.serveStreams(st)
}
}
-// serveHTTP2Transport sets up a http/2 transport (using the
-// gRPC http2 server transport in transport/http2_server.go) and
-// serves streams on it.
-// This is run in its own goroutine (it does network I/O in
-// transport.NewServerTransport).
-func (s *Server) serveHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) {
+// newHTTP2Transport sets up a http/2 transport (using the
+// gRPC http2 server transport in transport/http2_server.go).
+func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) transport.ServerTransport {
config := &transport.ServerConfig{
MaxStreams: s.opts.maxConcurrentStreams,
AuthInfo: authInfo,
@@ -552,13 +597,13 @@ func (s *Server) serveHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo)
s.mu.Unlock()
c.Close()
grpclog.Warningln("grpc: Server.Serve failed to create ServerTransport: ", err)
- return
+ return nil
}
if !s.addConn(st) {
st.Close()
- return
+ return nil
}
- s.serveStreams(st)
+ return st
}
func (s *Server) serveStreams(st transport.ServerTransport) {
@@ -686,18 +731,14 @@ func (s *Server) removeConn(c io.Closer) {
}
}
-func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options) error {
+func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error {
var (
- cbuf *bytes.Buffer
outPayload *stats.OutPayload
)
- if cp != nil {
- cbuf = new(bytes.Buffer)
- }
if s.opts.statsHandler != nil {
outPayload = &stats.OutPayload{}
}
- hdr, data, err := encode(s.opts.codec, msg, cp, cbuf, outPayload)
+ hdr, data, err := encode(s.opts.codec, msg, cp, outPayload, comp)
if err != nil {
grpclog.Errorln("grpc: server failed to encode response: ", err)
return err
@@ -741,10 +782,43 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
}
}()
}
+
+ // comp and cp are used for compression. decomp and dc are used for
+ // decompression. If comp and decomp are both set, they are the same;
+ // however they are kept separate to ensure that at most one of the
+ // compressor/decompressor variable pairs are set for use later.
+ var comp, decomp encoding.Compressor
+ var cp Compressor
+ var dc Decompressor
+
+ // If dc is set and matches the stream's compression, use it. Otherwise, try
+ // to find a matching registered compressor for decomp.
+ if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
+ dc = s.opts.dc
+ } else if rc != "" && rc != encoding.Identity {
+ decomp = encoding.GetCompressor(rc)
+ if decomp == nil {
+ st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
+ t.WriteStatus(stream, st)
+ return st.Err()
+ }
+ }
+
+ // If cp is set, use it. Otherwise, attempt to compress the response using
+ // the incoming message compression method.
+ //
+ // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
if s.opts.cp != nil {
- // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
- stream.SetSendCompress(s.opts.cp.Type())
+ cp = s.opts.cp
+ stream.SetSendCompress(cp.Type())
+ } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
+ // Legacy compressor not specified; attempt to respond with same encoding.
+ comp = encoding.GetCompressor(rc)
+ if comp != nil {
+ stream.SetSendCompress(rc)
+ }
}
+
p := &parser{r: stream}
pf, req, err := p.recvMsg(s.opts.maxReceiveMessageSize)
if err == io.EOF {
@@ -773,19 +847,11 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
}
return err
}
-
- if err := checkRecvPayload(pf, stream.RecvCompress(), s.opts.dc); err != nil {
- if st, ok := status.FromError(err); ok {
- if e := t.WriteStatus(stream, st); e != nil {
- grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
- }
- return err
- }
- if e := t.WriteStatus(stream, status.New(codes.Internal, err.Error())); e != nil {
+ if st := checkRecvPayload(pf, stream.RecvCompress(), dc != nil || decomp != nil); st != nil {
+ if e := t.WriteStatus(stream, st); e != nil {
grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
}
-
- // TODO checkRecvPayload always return RPC error. Add a return here if necessary.
+ return st.Err()
}
var inPayload *stats.InPayload
if sh != nil {
@@ -799,9 +865,17 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
}
if pf == compressionMade {
var err error
- req, err = s.opts.dc.Do(bytes.NewReader(req))
- if err != nil {
- return Errorf(codes.Internal, err.Error())
+ if dc != nil {
+ req, err = dc.Do(bytes.NewReader(req))
+ if err != nil {
+ return Errorf(codes.Internal, err.Error())
+ }
+ } else {
+ tmp, _ := decomp.Decompress(bytes.NewReader(req))
+ req, err = ioutil.ReadAll(tmp)
+ if err != nil {
+ return Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err)
+ }
}
}
if len(req) > s.opts.maxReceiveMessageSize {
@@ -847,7 +921,8 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
Last: true,
Delay: false,
}
- if err := s.sendResponse(t, stream, reply, s.opts.cp, opts); err != nil {
+
+ if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
if err == io.EOF {
// The entire stream is done (for unary RPC only).
return err
@@ -896,21 +971,45 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
sh.HandleRPC(stream.Context(), end)
}()
}
- if s.opts.cp != nil {
- stream.SetSendCompress(s.opts.cp.Type())
- }
ss := &serverStream{
t: t,
s: stream,
p: &parser{r: stream},
codec: s.opts.codec,
- cp: s.opts.cp,
- dc: s.opts.dc,
maxReceiveMessageSize: s.opts.maxReceiveMessageSize,
maxSendMessageSize: s.opts.maxSendMessageSize,
trInfo: trInfo,
statsHandler: sh,
}
+
+ // If dc is set and matches the stream's compression, use it. Otherwise, try
+ // to find a matching registered compressor for decomp.
+ if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc {
+ ss.dc = s.opts.dc
+ } else if rc != "" && rc != encoding.Identity {
+ ss.decomp = encoding.GetCompressor(rc)
+ if ss.decomp == nil {
+ st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc)
+ t.WriteStatus(ss.s, st)
+ return st.Err()
+ }
+ }
+
+ // If cp is set, use it. Otherwise, attempt to compress the response using
+ // the incoming message compression method.
+ //
+ // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686.
+ if s.opts.cp != nil {
+ ss.cp = s.opts.cp
+ stream.SetSendCompress(s.opts.cp.Type())
+ } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity {
+ // Legacy compressor not specified; attempt to respond with same encoding.
+ ss.comp = encoding.GetCompressor(rc)
+ if ss.comp != nil {
+ stream.SetSendCompress(rc)
+ }
+ }
+
if trInfo != nil {
trInfo.tr.LazyLog(&trInfo.firstLine, false)
defer func() {
@@ -1054,6 +1153,16 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
// pending RPCs on the client side will get notified by connection
// errors.
func (s *Server) Stop() {
+ s.quitOnce.Do(func() {
+ close(s.quit)
+ })
+
+ defer func() {
+ s.doneOnce.Do(func() {
+ close(s.done)
+ })
+ }()
+
s.mu.Lock()
listeners := s.lis
s.lis = nil
@@ -1083,6 +1192,16 @@ func (s *Server) Stop() {
// accepting new connections and RPCs and blocks until all the pending RPCs are
// finished.
func (s *Server) GracefulStop() {
+ s.quitOnce.Do(func() {
+ close(s.quit)
+ })
+
+ defer func() {
+ s.doneOnce.Do(func() {
+ close(s.done)
+ })
+ }()
+
s.mu.Lock()
defer s.mu.Unlock()
if s.conns == nil {
@@ -1110,25 +1229,11 @@ func (s *Server) GracefulStop() {
}
func init() {
- internal.TestingCloseConns = func(arg interface{}) {
- arg.(*Server).testingCloseConns()
- }
internal.TestingUseHandlerImpl = func(arg interface{}) {
arg.(*Server).opts.useHandlerImpl = true
}
}
-// testingCloseConns closes all existing transports but keeps s.lis
-// accepting new connections.
-func (s *Server) testingCloseConns() {
- s.mu.Lock()
- for c := range s.conns {
- c.Close()
- delete(s.conns, c)
- }
- s.mu.Unlock()
-}
-
// SetHeader sets the header metadata.
// When called multiple times, all the provided metadata will be merged.
// All the metadata will be sent out when one of the following happens: