aboutsummaryrefslogtreecommitdiff
path: root/vendor/google.golang.org/grpc/server.go
diff options
context:
space:
mode:
authorGravatar Yong Tang <yong.tang.github@outlook.com> 2018-01-15 09:59:29 -0800
committerGravatar GitHub <noreply@github.com> 2018-01-15 09:59:29 -0800
commit584dd87c70e29abc373f88be52bd2eee287ecace (patch)
tree6b4ac5286a5345c796071e4e9f7a9e6fce47a5ca /vendor/google.golang.org/grpc/server.go
parentd699b89063843d81cee35f128aaef9881439151f (diff)
downloadcoredns-584dd87c70e29abc373f88be52bd2eee287ecace.tar.gz
coredns-584dd87c70e29abc373f88be52bd2eee287ecace.tar.zst
coredns-584dd87c70e29abc373f88be52bd2eee287ecace.zip
Add route53 plugin (#1390)
* Update vendor Signed-off-by: Yong Tang <yong.tang.github@outlook.com> * Add route53 plugin This fix adds route53 plugin so that it is possible to query route53 record through CoreDNS. Signed-off-by: Yong Tang <yong.tang.github@outlook.com>
Diffstat (limited to 'vendor/google.golang.org/grpc/server.go')
-rw-r--r--vendor/google.golang.org/grpc/server.go108
1 files changed, 68 insertions, 40 deletions
diff --git a/vendor/google.golang.org/grpc/server.go b/vendor/google.golang.org/grpc/server.go
index e9737fc49..f65162168 100644
--- a/vendor/google.golang.org/grpc/server.go
+++ b/vendor/google.golang.org/grpc/server.go
@@ -92,11 +92,7 @@ type Server struct {
conns map[io.Closer]bool
serve bool
drain bool
- ctx context.Context
- cancel context.CancelFunc
- // A CondVar to let GracefulStop() blocks until all the pending RPCs are finished
- // and all the transport goes away.
- cv *sync.Cond
+ cv *sync.Cond // signaled when connections close for GracefulStop
m map[string]*service // service name -> service info
events trace.EventLog
@@ -104,6 +100,7 @@ type Server struct {
done chan struct{}
quitOnce sync.Once
doneOnce sync.Once
+ serveWG sync.WaitGroup // counts active Serve goroutines for GracefulStop
}
type options struct {
@@ -343,7 +340,6 @@ func NewServer(opt ...ServerOption) *Server {
done: make(chan struct{}),
}
s.cv = sync.NewCond(&s.mu)
- s.ctx, s.cancel = context.WithCancel(context.Background())
if EnableTracing {
_, file, line, _ := runtime.Caller(1)
s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
@@ -474,10 +470,23 @@ func (s *Server) Serve(lis net.Listener) error {
s.printf("serving")
s.serve = true
if s.lis == nil {
+ // Serve called after Stop or GracefulStop.
s.mu.Unlock()
lis.Close()
return ErrServerStopped
}
+
+ s.serveWG.Add(1)
+ defer func() {
+ s.serveWG.Done()
+ select {
+ // Stop or GracefulStop called; block until done and return nil.
+ case <-s.quit:
+ <-s.done
+ default:
+ }
+ }()
+
s.lis[lis] = true
s.mu.Unlock()
defer func() {
@@ -511,33 +520,39 @@ func (s *Server) Serve(lis net.Listener) error {
timer := time.NewTimer(tempDelay)
select {
case <-timer.C:
- case <-s.ctx.Done():
+ case <-s.quit:
+ timer.Stop()
+ return nil
}
- timer.Stop()
continue
}
s.mu.Lock()
s.printf("done serving; Accept = %v", err)
s.mu.Unlock()
- // If Stop or GracefulStop is called, block until they are done and return nil
select {
case <-s.quit:
- <-s.done
return nil
default:
}
return err
}
tempDelay = 0
- // Start a new goroutine to deal with rawConn
- // so we don't stall this Accept loop goroutine.
- go s.handleRawConn(rawConn)
+ // Start a new goroutine to deal with rawConn so we don't stall this Accept
+ // loop goroutine.
+ //
+ // Make sure we account for the goroutine so GracefulStop doesn't nil out
+ // s.conns before this conn can be added.
+ s.serveWG.Add(1)
+ go func() {
+ s.handleRawConn(rawConn)
+ s.serveWG.Done()
+ }()
}
}
-// handleRawConn is run in its own goroutine and handles a just-accepted
-// connection that has not had any I/O performed on it yet.
+// handleRawConn forks a goroutine to handle a just-accepted connection that
+// has not had any I/O performed on it yet.
func (s *Server) handleRawConn(rawConn net.Conn) {
rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
conn, authInfo, err := s.useTransportAuthenticator(rawConn)
@@ -562,17 +577,28 @@ func (s *Server) handleRawConn(rawConn net.Conn) {
}
s.mu.Unlock()
+ var serve func()
+ c := conn.(io.Closer)
if s.opts.useHandlerImpl {
- rawConn.SetDeadline(time.Time{})
- s.serveUsingHandler(conn)
+ serve = func() { s.serveUsingHandler(conn) }
} else {
+ // Finish handshaking (HTTP2)
st := s.newHTTP2Transport(conn, authInfo)
if st == nil {
return
}
- rawConn.SetDeadline(time.Time{})
- s.serveStreams(st)
+ c = st
+ serve = func() { s.serveStreams(st) }
+ }
+
+ rawConn.SetDeadline(time.Time{})
+ if !s.addConn(c) {
+ return
}
+ go func() {
+ serve()
+ s.removeConn(c)
+ }()
}
// newHTTP2Transport sets up a http/2 transport (using the
@@ -599,15 +625,10 @@ func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) tr
grpclog.Warningln("grpc: Server.Serve failed to create ServerTransport: ", err)
return nil
}
- if !s.addConn(st) {
- st.Close()
- return nil
- }
return st
}
func (s *Server) serveStreams(st transport.ServerTransport) {
- defer s.removeConn(st)
defer st.Close()
var wg sync.WaitGroup
st.HandleStreams(func(stream *transport.Stream) {
@@ -641,11 +662,6 @@ var _ http.Handler = (*Server)(nil)
//
// conn is the *tls.Conn that's already been authenticated.
func (s *Server) serveUsingHandler(conn net.Conn) {
- if !s.addConn(conn) {
- conn.Close()
- return
- }
- defer s.removeConn(conn)
h2s := &http2.Server{
MaxConcurrentStreams: s.opts.maxConcurrentStreams,
}
@@ -685,7 +701,6 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
return
}
if !s.addConn(st) {
- st.Close()
return
}
defer s.removeConn(st)
@@ -715,9 +730,15 @@ func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Strea
func (s *Server) addConn(c io.Closer) bool {
s.mu.Lock()
defer s.mu.Unlock()
- if s.conns == nil || s.drain {
+ if s.conns == nil {
+ c.Close()
return false
}
+ if s.drain {
+ // Transport added after we drained our existing conns: drain it
+ // immediately.
+ c.(transport.ServerTransport).Drain()
+ }
s.conns[c] = true
return true
}
@@ -826,7 +847,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
return err
}
if err == io.ErrUnexpectedEOF {
- err = Errorf(codes.Internal, io.ErrUnexpectedEOF.Error())
+ err = status.Errorf(codes.Internal, io.ErrUnexpectedEOF.Error())
}
if err != nil {
if st, ok := status.FromError(err); ok {
@@ -868,13 +889,13 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
if dc != nil {
req, err = dc.Do(bytes.NewReader(req))
if err != nil {
- return Errorf(codes.Internal, err.Error())
+ return status.Errorf(codes.Internal, err.Error())
}
} else {
tmp, _ := decomp.Decompress(bytes.NewReader(req))
req, err = ioutil.ReadAll(tmp)
if err != nil {
- return Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err)
+ return status.Errorf(codes.Internal, "grpc: failed to decompress the received message %v", err)
}
}
}
@@ -1158,6 +1179,7 @@ func (s *Server) Stop() {
})
defer func() {
+ s.serveWG.Wait()
s.doneOnce.Do(func() {
close(s.done)
})
@@ -1180,7 +1202,6 @@ func (s *Server) Stop() {
}
s.mu.Lock()
- s.cancel()
if s.events != nil {
s.events.Finish()
s.events = nil
@@ -1203,21 +1224,27 @@ func (s *Server) GracefulStop() {
}()
s.mu.Lock()
- defer s.mu.Unlock()
if s.conns == nil {
+ s.mu.Unlock()
return
}
for lis := range s.lis {
lis.Close()
}
s.lis = nil
- s.cancel()
if !s.drain {
for c := range s.conns {
c.(transport.ServerTransport).Drain()
}
s.drain = true
}
+
+ // Wait for serving threads to be ready to exit. Only then can we be sure no
+ // new conns will be created.
+ s.mu.Unlock()
+ s.serveWG.Wait()
+ s.mu.Lock()
+
for len(s.conns) != 0 {
s.cv.Wait()
}
@@ -1226,6 +1253,7 @@ func (s *Server) GracefulStop() {
s.events.Finish()
s.events = nil
}
+ s.mu.Unlock()
}
func init() {
@@ -1246,7 +1274,7 @@ func SetHeader(ctx context.Context, md metadata.MD) error {
}
stream, ok := transport.StreamFromContext(ctx)
if !ok {
- return Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
+ return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
}
return stream.SetHeader(md)
}
@@ -1256,7 +1284,7 @@ func SetHeader(ctx context.Context, md metadata.MD) error {
func SendHeader(ctx context.Context, md metadata.MD) error {
stream, ok := transport.StreamFromContext(ctx)
if !ok {
- return Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
+ return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
}
t := stream.ServerTransport()
if t == nil {
@@ -1276,7 +1304,7 @@ func SetTrailer(ctx context.Context, md metadata.MD) error {
}
stream, ok := transport.StreamFromContext(ctx)
if !ok {
- return Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
+ return status.Errorf(codes.Internal, "grpc: failed to fetch the stream from the context %v", ctx)
}
return stream.SetTrailer(md)
}