grpc(go) 요청 해석handleRawConn 원본 읽기

78103 단어

grpc-go 요청handleRawConn 원본 읽기

  • grpc(go)handleRawConn 원본 읽기 요청
  • 방법
  • newHTTP2Transport
  • serveStreams

  • grpc(go) 요청 해석handleRawConn 원본 읽기


    메서드


    코드를 먼저 붙여주세요.
    // handleRawConn forks a goroutine to handle a just-accepted connection that
    // has not had any I/O performed on it yet.
    func (s *Server) handleRawConn(rawConn net.Conn) {
    	if s.quit.HasFired() {
    		rawConn.Close()
    		return
    	}
    	rawConn.SetDeadline(time.Now().Add(s.opts.connectionTimeout))
    	conn, authInfo, err := s.useTransportAuthenticator(rawConn)
    	if err != nil {
    		// ErrConnDispatched means that the connection was dispatched away from
    		// gRPC; those connections should be left open.
    		if err != credentials.ErrConnDispatched {
    			s.mu.Lock()
    			s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
    			s.mu.Unlock()
    			channelz.Warningf(s.channelzID, "grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err)
    			rawConn.Close()
    		}
    		rawConn.SetDeadline(time.Time{})
    		return
    	}
    
    	// Finish handshaking (HTTP2)
    	st := s.newHTTP2Transport(conn, authInfo)
    	if st == nil {
    		return
    	}
    
    	rawConn.SetDeadline(time.Time{})
    	if !s.addConn(st) {
    		return
    	}
    	go func() {
    		s.serveStreams(st)
    		s.removeConn(st)
    	}()
    }
    

    여기는 주로 세 가지 일을 했다. (1) 검사 요청(앞에 인증서가 설치되어 있다면)과 만료 시간 설정 등 일부 매개 변수(2)http2 요청을 분석하여 하나의transport를 얻는다.ServerTransport(3) ServerTransport를 통해 요청 처리
    우리 는 상관하지 않고, 안 에서 무슨 처리 를 했는지 보았다

    newHTTP2Transport

    // newHTTP2Transport sets up a http/2 transport (using the
    // gRPC http2 server transport in transport/http2_server.go).
    func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) transport.ServerTransport {
    	config := &transport.ServerConfig{
    		MaxStreams:            s.opts.maxConcurrentStreams,
    		AuthInfo:              authInfo,
    		InTapHandle:           s.opts.inTapHandle,
    		StatsHandler:          s.opts.statsHandler,
    		KeepaliveParams:       s.opts.keepaliveParams,
    		KeepalivePolicy:       s.opts.keepalivePolicy,
    		InitialWindowSize:     s.opts.initialWindowSize,
    		InitialConnWindowSize: s.opts.initialConnWindowSize,
    		WriteBufferSize:       s.opts.writeBufferSize,
    		ReadBufferSize:        s.opts.readBufferSize,
    		ChannelzParentID:      s.channelzID,
    		MaxHeaderListSize:     s.opts.maxHeaderListSize,
    		HeaderTableSize:       s.opts.headerTableSize,
    	}
    	st, err := transport.NewServerTransport("http2", c, config)
    	if err != nil {
    		s.mu.Lock()
    		s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
    		s.mu.Unlock()
    		c.Close()
    		channelz.Warning(s.channelzID, "grpc: Server.Serve failed to create ServerTransport: ", err)
    		return nil
    	}
    
    	return st
    }
    

    grpc가 시작되기 전에, 우리는 서버를 생성하여 일련의 옵션을 설정합니다. http2의 config, 예를 들어readBufferSize (기본 4m) 를 설정하고, New ServerTransport를 통해 서버Transport를 가져오는 데 사용할 부분을 꺼냅니다.
    // NewServerTransport creates a ServerTransport with conn or non-nil error
    // if it fails.
    func NewServerTransport(protocol string, conn net.Conn, config *ServerConfig) (ServerTransport, error) {
    	return newHTTP2Server(conn, config)
    }
    
    // newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
    // returned if something goes wrong.
    func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
    	writeBufSize := config.WriteBufferSize
    	readBufSize := config.ReadBufferSize
    	maxHeaderListSize := defaultServerMaxHeaderListSize
    	if config.MaxHeaderListSize != nil {
    		maxHeaderListSize = *config.MaxHeaderListSize
    	}
    	framer := newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize)
    	// Send initial settings as connection preface to client.
    	//                 
    	isettings := []http2.Setting{{
    		ID:  http2.SettingMaxFrameSize,
    		Val: http2MaxFrameLen,
    	}}
    	// TODO(zhaoq): Have a better way to signal "no limit" because 0 is
    	// permitted in the HTTP2 spec.
    	maxStreams := config.MaxStreams
    	if maxStreams == 0 {
    		maxStreams = math.MaxUint32
    	} else {
    		isettings = append(isettings, http2.Setting{
    			ID:  http2.SettingMaxConcurrentStreams,
    			Val: maxStreams,
    		})
    	}
    	dynamicWindow := true
    	iwz := int32(initialWindowSize)
    	if config.InitialWindowSize >= defaultWindowSize {
    		iwz = config.InitialWindowSize
    		dynamicWindow = false
    	}
    	icwz := int32(initialWindowSize)
    	if config.InitialConnWindowSize >= defaultWindowSize {
    		icwz = config.InitialConnWindowSize
    		dynamicWindow = false
    	}
    	if iwz != defaultWindowSize {
    		isettings = append(isettings, http2.Setting{
    			ID:  http2.SettingInitialWindowSize,
    			Val: uint32(iwz)})
    	}
    	if config.MaxHeaderListSize != nil {
    		isettings = append(isettings, http2.Setting{
    			ID:  http2.SettingMaxHeaderListSize,
    			Val: *config.MaxHeaderListSize,
    		})
    	}
    	if config.HeaderTableSize != nil {
    		isettings = append(isettings, http2.Setting{
    			ID:  http2.SettingHeaderTableSize,
    			Val: *config.HeaderTableSize,
    		})
    	}
    	if err := framer.fr.WriteSettings(isettings...); err != nil {
    		return nil, connectionErrorf(false, err, "transport: %v", err)
    	}
    	// Adjust the connection flow control window if needed.
    	if delta := uint32(icwz - defaultWindowSize); delta > 0 {
    		if err := framer.fr.WriteWindowUpdate(0, delta); err != nil {
    			return nil, connectionErrorf(false, err, "transport: %v", err)
    		}
    	}
    	kp := config.KeepaliveParams
    	if kp.MaxConnectionIdle == 0 {
    		kp.MaxConnectionIdle = defaultMaxConnectionIdle
    	}
    	if kp.MaxConnectionAge == 0 {
    		kp.MaxConnectionAge = defaultMaxConnectionAge
    	}
    	// Add a jitter to MaxConnectionAge.
    	kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge)
    	if kp.MaxConnectionAgeGrace == 0 {
    		kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace
    	}
    	if kp.Time == 0 {
    		kp.Time = defaultServerKeepaliveTime
    	}
    	if kp.Timeout == 0 {
    		kp.Timeout = defaultServerKeepaliveTimeout
    	}
    	kep := config.KeepalivePolicy
    	if kep.MinTime == 0 {
    		kep.MinTime = defaultKeepalivePolicyMinTime
    	}
    	done := make(chan struct{})
    	t := &http2Server{
    		ctx:               context.Background(),
    		done:              done,
    		conn:              conn,
    		remoteAddr:        conn.RemoteAddr(),
    		localAddr:         conn.LocalAddr(),
    		authInfo:          config.AuthInfo,
    		framer:            framer,
    		readerDone:        make(chan struct{}),
    		writerDone:        make(chan struct{}),
    		maxStreams:        maxStreams,
    		inTapHandle:       config.InTapHandle,
    		fc:                &trInFlow{limit: uint32(icwz)},
    		state:             reachable,
    		activeStreams:     make(map[uint32]*Stream),
    		stats:             config.StatsHandler,
    		kp:                kp,
    		idle:              time.Now(),
    		kep:               kep,
    		initialWindowSize: iwz,
    		czData:            new(channelzData),
    		bufferPool:        newBufferPool(),
    	}
    	t.controlBuf = newControlBuffer(t.done)
    	if dynamicWindow {
    		t.bdpEst = &bdpEstimator{
    			bdp:               initialWindowSize,
    			updateFlowControl: t.updateFlowControl,
    		}
    	}
    	if t.stats != nil {
    		t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{
    			RemoteAddr: t.remoteAddr,
    			LocalAddr:  t.localAddr,
    		})
    		connBegin := &stats.ConnBegin{}
    		t.stats.HandleConn(t.ctx, connBegin)
    	}
    	if channelz.IsOn() {
    		t.channelzID = channelz.RegisterNormalSocket(t, config.ChannelzParentID, fmt.Sprintf("%s -> %s", t.remoteAddr, t.localAddr))
    	}
    
    	t.connectionID = atomic.AddUint64(&serverConnectionCounter, 1)
    
    	t.framer.writer.Flush()
    
    	defer func() {
    		if err != nil {
    			t.Close()
    		}
    	}()
    
    	// Check the validity of client preface.
    	preface := make([]byte, len(clientPreface))
    	if _, err := io.ReadFull(t.conn, preface); err != nil {
    		return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to receive the preface from client: %v", err)
    	}
    	if !bytes.Equal(preface, clientPreface) {
    		return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams received bogus greeting from client: %q", preface)
    	}
    
    	frame, err := t.framer.fr.ReadFrame()
    	if err == io.EOF || err == io.ErrUnexpectedEOF {
    		return nil, err
    	}
    	if err != nil {
    		return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to read initial settings frame: %v", err)
    	}
    	atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
    	sf, ok := frame.(*http2.SettingsFrame)
    	if !ok {
    		return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
    	}
    	t.handleSettings(sf)
    
    	go func() {
    		t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst)
    		t.loopy.ssGoAwayHandler = t.outgoingGoAwayHandler
    		if err := t.loopy.run(); err != nil {
    			errorf("transport: loopyWriter.run returning. Err: %v", err)
    		}
    		t.conn.Close()
    		close(t.writerDone)
    	}()
    	go t.keepalive()
    	return t, nil
    }
    

    사실 이 코드는 기본적으로http2 프로토콜의 실행 과정을 따른다.아래의 http2 프로토콜 규범을 참고할 수 있습니다 http2 프로토콜 규범

    serveStreams


    serveStreams 코드는 다음과 같습니다.
    func (s *Server) serveStreams(st transport.ServerTransport) {
    	defer st.Close()
    	var wg sync.WaitGroup
    	st.HandleStreams(func(stream *transport.Stream) {
    		wg.Add(1)
    		go func() {
    			defer wg.Done()
    			s.handleStream(st, stream, s.traceInfo(st, stream))
    		}()
    	}, func(ctx context.Context, method string) context.Context {
    		if !EnableTracing {
    			return ctx
    		}
    		tr := trace.New("grpc.Recv."+methodFamily(method), method)
    		return trace.NewContext(ctx, tr)
    	})
    	wg.Wait()
    }
    
    
    

    HandleStreams는 주로 서로 다른 프레임을 가져와 처리합니다
    // HandleStreams receives incoming streams using the given handler. This is
    // typically run in a separate goroutine.
    // traceCtx attaches trace to ctx and returns the new context.
    func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
    	defer close(t.readerDone)
    	for {
    		t.controlBuf.throttle()
    		frame, err := t.framer.fr.ReadFrame()
    		atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
    		if err != nil {
    			if se, ok := err.(http2.StreamError); ok {
    				warningf("transport: http2Server.HandleStreams encountered http2.StreamError: %v", se)
    				t.mu.Lock()
    				s := t.activeStreams[se.StreamID]
    				t.mu.Unlock()
    				if s != nil {
    					t.closeStream(s, true, se.Code, false)
    				} else {
    					t.controlBuf.put(&cleanupStream{
    						streamID: se.StreamID,
    						rst:      true,
    						rstCode:  se.Code,
    						onWrite:  func() {},
    					})
    				}
    				continue
    			}
    			if err == io.EOF || err == io.ErrUnexpectedEOF {
    				t.Close()
    				return
    			}
    			warningf("transport: http2Server.HandleStreams failed to read frame: %v", err)
    			t.Close()
    			return
    		}
    		switch frame := frame.(type) {
    		case *http2.MetaHeadersFrame:
    			if t.operateHeaders(frame, handle, traceCtx) {
    				t.Close()
    				break
    			}
    		case *http2.DataFrame:
    			t.handleData(frame)
    		case *http2.RSTStreamFrame:
    			t.handleRSTStream(frame)
    		case *http2.SettingsFrame:
    			t.handleSettings(frame)
    		case *http2.PingFrame:
    			t.handlePing(frame)
    		case *http2.WindowUpdateFrame:
    			t.handleWindowUpdate(frame)
    		case *http2.GoAwayFrame:
    			// TODO: Handle GoAway from the client appropriately.
    		default:
    			errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
    		}
    	}
    }
    
    

    다시 한 번 s.handle Stream(st,stream,s.trace Info(st,stream))를 살펴보자. 이것은 비교적 중요한 것이다.
    
    func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
    	sm := stream.Method()
    	if sm != "" && sm[0] == '/' {
    		sm = sm[1:]
    	}
    	pos := strings.LastIndex(sm, "/")
    	if pos == -1 {
    		if trInfo != nil {
    			trInfo.tr.LazyLog(&fmtStringer{"Malformed method name %q", []interface{}{sm}}, true)
    			trInfo.tr.SetError()
    		}
    		errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
    		if err := t.WriteStatus(stream, status.New(codes.ResourceExhausted, errDesc)); err != nil {
    			if trInfo != nil {
    				trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
    				trInfo.tr.SetError()
    			}
    			channelz.Warningf(s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
    		}
    		if trInfo != nil {
    			trInfo.tr.Finish()
    		}
    		return
    	}
    	service := sm[:pos]
    	method := sm[pos+1:]
    
    	srv, knownService := s.m[service]
    	if knownService {
    		if md, ok := srv.md[method]; ok {
    			s.processUnaryRPC(t, stream, srv, md, trInfo)
    			return
    		}
    		if sd, ok := srv.sd[method]; ok {
    			s.processStreamingRPC(t, stream, srv, sd, trInfo)
    			return
    		}
    	}
    	// Unknown service, or known server unknown method.
    	if unknownDesc := s.opts.unknownStreamDesc; unknownDesc != nil {
    		s.processStreamingRPC(t, stream, nil, unknownDesc, trInfo)
    		return
    	}
    	var errDesc string
    	if !knownService {
    		errDesc = fmt.Sprintf("unknown service %v", service)
    	} else {
    		errDesc = fmt.Sprintf("unknown method %v for service %v", method, service)
    	}
    	if trInfo != nil {
    		trInfo.tr.LazyPrintf("%s", errDesc)
    		trInfo.tr.SetError()
    	}
    	if err := t.WriteStatus(stream, status.New(codes.Unimplemented, errDesc)); err != nil {
    		if trInfo != nil {
    			trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
    			trInfo.tr.SetError()
    		}
    		channelz.Warningf(s.channelzID, "grpc: Server.handleStream failed to write status: %v", err)
    	}
    	if trInfo != nil {
    		trInfo.tr.Finish()
    	}
    }
    
    
    

    그 중
    if knownService {
    		if md, ok := srv.md[method]; ok {
    			s.processUnaryRPC(t, stream, srv, md, trInfo)
    			return
    		}
    		if sd, ok := srv.sd[method]; ok {
    			s.processStreamingRPC(t, stream, srv, sd, trInfo)
    			return
    		}
    	}
    

    서비스 이름과 방법명을 통해 서비스의 호출을 완성하였다

    좋은 웹페이지 즐겨찾기