diff options
author | 2017-12-18 11:50:56 -0600 | |
---|---|---|
committer | 2017-12-18 11:50:56 -0600 | |
commit | 4dd40a292c773aad5eee761940840de64ba9090a (patch) | |
tree | e7b872bed228fdf06b4ea91db60bf9b7cc8fec2a /vendor/google.golang.org/grpc/server.go | |
parent | ba4e77672c9f7b53a8c2f6e197d3f12ede1e46cd (diff) | |
download | coredns-4dd40a292c773aad5eee761940840de64ba9090a.tar.gz coredns-4dd40a292c773aad5eee761940840de64ba9090a.tar.zst coredns-4dd40a292c773aad5eee761940840de64ba9090a.zip |
Update apache/thrift to 0.11.0 and remove pinning (#1317)
The `apache/thrift` recently released a new version of `0.11.0`
several days ago. This release is compatible with other packages
and as such, there is no need to pinning the `apache/thrift`
to `master` anymore in Gopkg.toml.
This fix removes the pinning of `apache/thrift` in Gopkg.toml,
and updates all dependencies of coredns.
Signed-off-by: Yong Tang <yong.tang.github@outlook.com>
Diffstat (limited to 'vendor/google.golang.org/grpc/server.go')
-rw-r--r-- | vendor/google.golang.org/grpc/server.go | 227 |
1 files changed, 166 insertions, 61 deletions
diff --git a/vendor/google.golang.org/grpc/server.go b/vendor/google.golang.org/grpc/server.go index 7c882dbe6..e9737fc49 100644 --- a/vendor/google.golang.org/grpc/server.go +++ b/vendor/google.golang.org/grpc/server.go @@ -32,11 +32,14 @@ import ( "sync" "time" + "io/ioutil" + "golang.org/x/net/context" "golang.org/x/net/http2" "golang.org/x/net/trace" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" + "google.golang.org/grpc/encoding" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/internal" "google.golang.org/grpc/keepalive" @@ -96,6 +99,11 @@ type Server struct { cv *sync.Cond m map[string]*service // service name -> service info events trace.EventLog + + quit chan struct{} + done chan struct{} + quitOnce sync.Once + doneOnce sync.Once } type options struct { @@ -118,11 +126,13 @@ type options struct { initialConnWindowSize int32 writeBufferSize int readBufferSize int + connectionTimeout time.Duration } var defaultServerOptions = options{ maxReceiveMessageSize: defaultServerMaxReceiveMessageSize, maxSendMessageSize: defaultServerMaxSendMessageSize, + connectionTimeout: 120 * time.Second, } // A ServerOption sets options such as credentials, codec and keepalive parameters, etc. @@ -181,14 +191,24 @@ func CustomCodec(codec Codec) ServerOption { } } -// RPCCompressor returns a ServerOption that sets a compressor for outbound messages. +// RPCCompressor returns a ServerOption that sets a compressor for outbound +// messages. For backward compatibility, all outbound messages will be sent +// using this compressor, regardless of incoming message compression. By +// default, server messages will be sent using the same compressor with which +// request messages were sent. +// +// Deprecated: use encoding.RegisterCompressor instead. func RPCCompressor(cp Compressor) ServerOption { return func(o *options) { o.cp = cp } } -// RPCDecompressor returns a ServerOption that sets a decompressor for inbound messages. +// RPCDecompressor returns a ServerOption that sets a decompressor for inbound +// messages. It has higher priority than decompressors registered via +// encoding.RegisterCompressor. +// +// Deprecated: use encoding.RegisterCompressor instead. func RPCDecompressor(dc Decompressor) ServerOption { return func(o *options) { o.dc = dc @@ -291,6 +311,18 @@ func UnknownServiceHandler(streamHandler StreamHandler) ServerOption { } } +// ConnectionTimeout returns a ServerOption that sets the timeout for +// connection establishment (up to and including HTTP/2 handshaking) for all +// new connections. If this is not set, the default is 120 seconds. A zero or +// negative value will result in an immediate timeout. +// +// This API is EXPERIMENTAL. +func ConnectionTimeout(d time.Duration) ServerOption { + return func(o *options) { + o.connectionTimeout = d + } +} + // NewServer creates a gRPC server which has no service registered and has not // started to accept requests yet. func NewServer(opt ...ServerOption) *Server { @@ -307,6 +339,8 @@ func NewServer(opt ...ServerOption) *Server { opts: opts, conns: make(map[io.Closer]bool), m: make(map[string]*service), + quit: make(chan struct{}), + done: make(chan struct{}), } s.cv = sync.NewCond(&s.mu) s.ctx, s.cancel = context.WithCancel(context.Background()) @@ -418,11 +452,9 @@ func (s *Server) GetServiceInfo() map[string]ServiceInfo { return ret } -var ( - // ErrServerStopped indicates that the operation is now illegal because of - // the server being stopped. - ErrServerStopped = errors.New("grpc: the server has been stopped") -) +// ErrServerStopped indicates that the operation is now illegal because of +// the server being stopped. +var ErrServerStopped = errors.New("grpc: the server has been stopped") func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) { if s.opts.creds == nil { @@ -436,7 +468,7 @@ func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credenti // read gRPC requests and then call the registered handlers to reply to them. // Serve returns when lis.Accept fails with fatal errors. lis will be closed when // this method returns. -// Serve always returns non-nil error. +// Serve will return a non-nil error unless Stop or GracefulStop is called. func (s *Server) Serve(lis net.Listener) error { s.mu.Lock() s.printf("serving") @@ -487,6 +519,14 @@ func (s *Server) Serve(lis net.Listener) error { s.mu.Lock() s.printf("done serving; Accept = %v", err) s.mu.Unlock() + + // If Stop or GracefulStop is called, block until they are done and return nil + select { + case <-s.quit: + <-s.done + return nil + default: + } return err } tempDelay = 0 @@ -499,16 +539,18 @@ func (s *Server) Serve(lis net.Listener) error { // handleRawConn is run in its own goroutine and handles a just-accepted // connection that has not had any I/O performed on it yet. func (s *Server) handleRawConn(rawConn net.Conn) { + rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout)) conn, authInfo, err := s.useTransportAuthenticator(rawConn) if err != nil { s.mu.Lock() s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err) s.mu.Unlock() grpclog.Warningf("grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err) - // If serverHandShake returns ErrConnDispatched, keep rawConn open. + // If serverHandshake returns ErrConnDispatched, keep rawConn open. if err != credentials.ErrConnDispatched { rawConn.Close() } + rawConn.SetDeadline(time.Time{}) return } @@ -521,18 +563,21 @@ func (s *Server) handleRawConn(rawConn net.Conn) { s.mu.Unlock() if s.opts.useHandlerImpl { + rawConn.SetDeadline(time.Time{}) s.serveUsingHandler(conn) } else { - s.serveHTTP2Transport(conn, authInfo) + st := s.newHTTP2Transport(conn, authInfo) + if st == nil { + return + } + rawConn.SetDeadline(time.Time{}) + s.serveStreams(st) } } -// serveHTTP2Transport sets up a http/2 transport (using the -// gRPC http2 server transport in transport/http2_server.go) and -// serves streams on it. -// This is run in its own goroutine (it does network I/O in -// transport.NewServerTransport). -func (s *Server) serveHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) { +// newHTTP2Transport sets up a http/2 transport (using the +// gRPC http2 server transport in transport/http2_server.go). +func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) transport.ServerTransport { config := &transport.ServerConfig{ MaxStreams: s.opts.maxConcurrentStreams, AuthInfo: authInfo, @@ -552,13 +597,13 @@ func (s *Server) serveHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) s.mu.Unlock() c.Close() grpclog.Warningln("grpc: Server.Serve failed to create ServerTransport: ", err) - return + return nil } if !s.addConn(st) { st.Close() - return + return nil } - s.serveStreams(st) + return st } func (s *Server) serveStreams(st transport.ServerTransport) { @@ -686,18 +731,14 @@ func (s *Server) removeConn(c io.Closer) { } } -func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options) error { +func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error { var ( - cbuf *bytes.Buffer outPayload *stats.OutPayload ) - if cp != nil { - cbuf = new(bytes.Buffer) - } if s.opts.statsHandler != nil { outPayload = &stats.OutPayload{} } - hdr, data, err := encode(s.opts.codec, msg, cp, cbuf, outPayload) + hdr, data, err := encode(s.opts.codec, msg, cp, outPayload, comp) if err != nil { grpclog.Errorln("grpc: server failed to encode response: ", err) return err @@ -741,10 +782,43 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. } }() } + + // comp and cp are used for compression. decomp and dc are used for + // decompression. If comp and decomp are both set, they are the same; + // however they are kept separate to ensure that at most one of the + // compressor/decompressor variable pairs are set for use later. + var comp, decomp encoding.Compressor + var cp Compressor + var dc Decompressor + + // If dc is set and matches the stream's compression, use it. Otherwise, try + // to find a matching registered compressor for decomp. + if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc { + dc = s.opts.dc + } else if rc != "" && rc != encoding.Identity { + decomp = encoding.GetCompressor(rc) + if decomp == nil { + st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc) + t.WriteStatus(stream, st) + return st.Err() + } + } + + // If cp is set, use it. Otherwise, attempt to compress the response using + // the incoming message compression method. + // + // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686. if s.opts.cp != nil { - // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686. - stream.SetSendCompress(s.opts.cp.Type()) + cp = s.opts.cp + stream.SetSendCompress(cp.Type()) + } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity { + // Legacy compressor not specified; attempt to respond with same encoding. + comp = encoding.GetCompressor(rc) + if comp != nil { + stream.SetSendCompress(rc) + } } + p := &parser{r: stream} pf, req, err := p.recvMsg(s.opts.maxReceiveMessageSize) if err == io.EOF { @@ -773,19 +847,11 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. } return err } - - if err := checkRecvPayload(pf, stream.RecvCompress(), s.opts.dc); err != nil { - if st, ok := status.FromError(err); ok { - if e := t.WriteStatus(stream, st); e != nil { - grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e) - } - return err - } - if e := t.WriteStatus(stream, status.New(codes.Internal, err.Error())); e != nil { + if st := checkRecvPayload(pf, stream.RecvCompress(), dc != nil || decomp != nil); st != nil { + if e := t.WriteStatus(stream, st); e != nil { grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e) } - - // TODO checkRecvPayload always return RPC error. Add a return here if necessary. + return st.Err() } var inPayload *stats.InPayload if sh != nil { @@ -799,9 +865,17 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. } if pf == compressionMade { var err error - req, err = s.opts.dc.Do(bytes.NewReader(req)) - if err != nil { - return Errorf(codes.Internal, err.Error()) + if dc != nil { + req, err = dc.Do(bytes.NewReader(req)) + if err != nil { + return Errorf(codes.Internal, err.Error()) + } + } else { + tmp, _ := decomp.Decompress(bytes.NewReader(req)) + req, err = ioutil.ReadAll(tmp) + if err != nil { + return Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err) + } } } if len(req) > s.opts.maxReceiveMessageSize { @@ -847,7 +921,8 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. Last: true, Delay: false, } - if err := s.sendResponse(t, stream, reply, s.opts.cp, opts); err != nil { + + if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil { if err == io.EOF { // The entire stream is done (for unary RPC only). return err @@ -896,21 +971,45 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp sh.HandleRPC(stream.Context(), end) }() } - if s.opts.cp != nil { - stream.SetSendCompress(s.opts.cp.Type()) - } ss := &serverStream{ t: t, s: stream, p: &parser{r: stream}, codec: s.opts.codec, - cp: s.opts.cp, - dc: s.opts.dc, maxReceiveMessageSize: s.opts.maxReceiveMessageSize, maxSendMessageSize: s.opts.maxSendMessageSize, trInfo: trInfo, statsHandler: sh, } + + // If dc is set and matches the stream's compression, use it. Otherwise, try + // to find a matching registered compressor for decomp. + if rc := stream.RecvCompress(); s.opts.dc != nil && s.opts.dc.Type() == rc { + ss.dc = s.opts.dc + } else if rc != "" && rc != encoding.Identity { + ss.decomp = encoding.GetCompressor(rc) + if ss.decomp == nil { + st := status.Newf(codes.Unimplemented, "grpc: Decompressor is not installed for grpc-encoding %q", rc) + t.WriteStatus(ss.s, st) + return st.Err() + } + } + + // If cp is set, use it. Otherwise, attempt to compress the response using + // the incoming message compression method. + // + // NOTE: this needs to be ahead of all handling, https://github.com/grpc/grpc-go/issues/686. + if s.opts.cp != nil { + ss.cp = s.opts.cp + stream.SetSendCompress(s.opts.cp.Type()) + } else if rc := stream.RecvCompress(); rc != "" && rc != encoding.Identity { + // Legacy compressor not specified; attempt to respond with same encoding. + ss.comp = encoding.GetCompressor(rc) + if ss.comp != nil { + stream.SetSendCompress(rc) + } + } + if trInfo != nil { trInfo.tr.LazyLog(&trInfo.firstLine, false) defer func() { @@ -1054,6 +1153,16 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str // pending RPCs on the client side will get notified by connection // errors. func (s *Server) Stop() { + s.quitOnce.Do(func() { + close(s.quit) + }) + + defer func() { + s.doneOnce.Do(func() { + close(s.done) + }) + }() + s.mu.Lock() listeners := s.lis s.lis = nil @@ -1083,6 +1192,16 @@ func (s *Server) Stop() { // accepting new connections and RPCs and blocks until all the pending RPCs are // finished. func (s *Server) GracefulStop() { + s.quitOnce.Do(func() { + close(s.quit) + }) + + defer func() { + s.doneOnce.Do(func() { + close(s.done) + }) + }() + s.mu.Lock() defer s.mu.Unlock() if s.conns == nil { @@ -1110,25 +1229,11 @@ func (s *Server) GracefulStop() { } func init() { - internal.TestingCloseConns = func(arg interface{}) { - arg.(*Server).testingCloseConns() - } internal.TestingUseHandlerImpl = func(arg interface{}) { arg.(*Server).opts.useHandlerImpl = true } } -// testingCloseConns closes all existing transports but keeps s.lis -// accepting new connections. -func (s *Server) testingCloseConns() { - s.mu.Lock() - for c := range s.conns { - c.Close() - delete(s.conns, c) - } - s.mu.Unlock() -} - // SetHeader sets the header metadata. // When called multiple times, all the provided metadata will be merged. // All the metadata will be sent out when one of the following happens: |