btcd 원본 분석 시리즈: 4 - p2p 네트워크의peer

11864 단어
참조: btcd
  • btc는 p2p 네트워크에서 모든 노드와의 연결은 하나의peer 대상으로 간주되고 이 노드와의 메시지 교환은 이peer를 통해 이루어진다.본고는 주로peer 대상의 창설과wire 프로토콜 메시지의 수발 메커니즘을 분석한다.btcd에서 패키지 peer에 대응하여 이 패키지는 다른 노드와 연결되어 구축된 후peer의 창설, 협상, 메시지 처리,close 기능을 제공합니다.

  • 1. peer 객체 만들기


    peer는 구체적으로 두 가지로 나뉜다.
  • inbound는 connm 관리자가 다른 노드의 요청을 감청할 때 생성
  • outbound는 connm 관리자가 다른 노드를 주동적으로 연결할 때 생성
  • 둘 다 newPeerBase () 함수를 사용했지만 outbound는addr와na 속성에 대한 초기화를 증가시켰다
  • // NewInboundPeer returns a new inbound bitcoin peer. Use Start to begin
    // processing incoming and outgoing messages.
    func NewInboundPeer(cfg *Config) *Peer {
        return newPeerBase(cfg, true)
    }
    
    // NewOutboundPeer returns a new outbound bitcoin peer.
    func NewOutboundPeer(cfg *Config, addr string) (*Peer, error) {
        p := newPeerBase(cfg, false)
        p.addr = addr
    
        host, portStr, err := net.SplitHostPort(addr)
        if err != nil {
            return nil, err
        }
    
        port, err := strconv.ParseUint(portStr, 10, 16)
        if err != nil {
            return nil, err
        }
    
        if cfg.HostToNetAddress != nil {
            na, err := cfg.HostToNetAddress(host, uint16(port), 0)
            if err != nil {
                return nil, err
            }
            p.na = na
        } else {
            p.na = wire.NewNetAddressIPPort(net.ParseIP(host), uint16(port), 0)
        }
    
        return p, nil
    }
    
    
    // newPeerBase returns a new base bitcoin peer based on the inbound flag.  This
    // is used by the NewInboundPeer and NewOutboundPeer functions to perform base
    // setup needed by both types of peers.
    func newPeerBase(origCfg *Config, inbound bool) *Peer {
        // Default to the max supported protocol version if not specified by the
        // caller.
        cfg := *origCfg // Copy to avoid mutating caller.
        if cfg.ProtocolVersion == 0 {
            cfg.ProtocolVersion = MaxProtocolVersion
        }
    
        // Set the chain parameters to testnet if the caller did not specify any.
        if cfg.ChainParams == nil {
            cfg.ChainParams = &chaincfg.TestNet3Params
        }
    
        // Set the trickle interval if a non-positive value is specified.
        if cfg.TrickleInterval <= 0 {
            cfg.TrickleInterval = DefaultTrickleInterval
        }
    
        p := Peer{
            inbound:         inbound,
            wireEncoding:    wire.BaseEncoding,
            knownInventory:  newMruInventoryMap(maxKnownInventory),
            stallControl:    make(chan stallControlMsg, 1), // nonblocking sync
            outputQueue:     make(chan outMsg, outputBufferSize),
            sendQueue:       make(chan outMsg, 1),   // nonblocking sync
            sendDoneQueue:   make(chan struct{}, 1), // nonblocking sync
            outputInvChan:   make(chan *wire.InvVect, outputBufferSize),
            inQuit:          make(chan struct{}),
            queueQuit:       make(chan struct{}),
            outQuit:         make(chan struct{}),
            quit:            make(chan struct{}),
            cfg:             cfg, // Copy so caller can't mutate.
            services:        cfg.Services,
            protocolVersion: cfg.ProtocolVersion,
        }
        return &p
    }
    
    

    2. 시동peer


    시작 순서
  • version Msg 교환 프로토콜 버전을 통해protocol version 판단(outbound가 먼저 자신의 version을 보내고 상대방을 읽으면 inbound가 반대면)
  • 버젼 악수에 성공하면 다른handler goroutine 시작
  • p.stallHandler() 처리 메시지 시간 초과
  • p.inHandler() 수신된 메시지 처리
  • p.queueHandler () 처리 메시지 발송 대기열
  • p.outHandler () 발송된 메시지 처리
  • p.pingHandler()발송주기 심박수
  • 마지막으로 verAck 확인을 보내고 악수 완료
  • // start begins processing input and output messages.
    func (p *Peer) start() error {
        log.Tracef("Starting peer %s", p)
    
        negotiateErr := make(chan error, 1)
        go func() {
            if p.inbound {
                negotiateErr 

    3. 교환 프로토콜 버젼 (inbound 단말기)


    주요 프로세스:
  • 수신 프로토콜 읽기
  • 자체 프로토콜 발송
  • 프로토콜 테스트 프로세스:
  • nonce 검출(자신과의 연결 방지)
  • peer의 기초 정보 채우기(block 높이, id, 에이전트, 서비스,address 등)
  • 콜백 onVersion() 방법
  • 버전이 요구에 부합되는지 판단
  • // negotiateInboundProtocol waits to receive a version message from the peer
    // then sends our version message. If the events do not occur in that order then
    // it returns an error.
    func (p *Peer) negotiateInboundProtocol() error {
        if err := p.readRemoteVersionMsg(); err != nil {
            return err
        }
    
        return p.writeLocalVersionMsg()
    }
    
    
    // readRemoteVersionMsg waits for the next message to arrive from the remote
    // peer.  If the next message is not a version message or the version is not
    // acceptable then return an error.
    func (p *Peer) readRemoteVersionMsg() error {
        // Read their version message.
        remoteMsg, _, err := p.readMessage(wire.LatestEncoding)
        if err != nil {
            return err
        }
    
        // Notify and disconnect clients if the first message is not a version
        // message.
        msg, ok := remoteMsg.(*wire.MsgVersion)
        if !ok {
            reason := "a version message must precede all others"
            rejectMsg := wire.NewMsgReject(msg.Command(), wire.RejectMalformed,
                reason)
            _ = p.writeMessage(rejectMsg, wire.LatestEncoding)
            return errors.New(reason)
        }
    
        // Detect self connections.
        if !allowSelfConns && sentNonces.Exists(msg.Nonce) {
            return errors.New("disconnecting peer connected to self")
        }
    
        // Negotiate the protocol version and set the services to what the remote
        // peer advertised.
        p.flagsMtx.Lock()
        p.advertisedProtoVer = uint32(msg.ProtocolVersion)
        p.protocolVersion = minUint32(p.protocolVersion, p.advertisedProtoVer)
        p.versionKnown = true
        p.services = msg.Services
        p.flagsMtx.Unlock()
        log.Debugf("Negotiated protocol version %d for peer %s",
            p.protocolVersion, p)
    
        // Updating a bunch of stats including block based stats, and the
        // peer's time offset.
        p.statsMtx.Lock()
        p.lastBlock = msg.LastBlock
        p.startingHeight = msg.LastBlock
        p.timeOffset = msg.Timestamp.Unix() - time.Now().Unix()
        p.statsMtx.Unlock()
    
        // Set the peer's ID, user agent, and potentially the flag which
        // specifies the witness support is enabled.
        p.flagsMtx.Lock()
        p.id = atomic.AddInt32(&nodeCount, 1)
        p.userAgent = msg.UserAgent
    
        // Determine if the peer would like to receive witness data with
        // transactions, or not.
        if p.services&wire.SFNodeWitness == wire.SFNodeWitness {
            p.witnessEnabled = true
        }
        p.flagsMtx.Unlock()
    
        // Once the version message has been exchanged, we're able to determine
        // if this peer knows how to encode witness data over the wire
        // protocol. If so, then we'll switch to a decoding mode which is
        // prepared for the new transaction format introduced as part of
        // BIP0144.
        if p.services&wire.SFNodeWitness == wire.SFNodeWitness {
            p.wireEncoding = wire.WitnessEncoding
        }
    
        // Invoke the callback if specified.
        if p.cfg.Listeners.OnVersion != nil {
            rejectMsg := p.cfg.Listeners.OnVersion(p, msg)
            if rejectMsg != nil {
                _ = p.writeMessage(rejectMsg, wire.LatestEncoding)
                return errors.New(rejectMsg.Reason)
            }
        }
    
        // Notify and disconnect clients that have a protocol version that is
        // too old.
        //
        // NOTE: If minAcceptableProtocolVersion is raised to be higher than
        // wire.RejectVersion, this should send a reject packet before
        // disconnecting.
        if uint32(msg.ProtocolVersion) < MinAcceptableProtocolVersion {
            // Send a reject message indicating the protocol version is
            // obsolete and wait for the message to be sent before
            // disconnecting.
            reason := fmt.Sprintf("protocol version must be %d or greater",
                MinAcceptableProtocolVersion)
            rejectMsg := wire.NewMsgReject(msg.Command(), wire.RejectObsolete,
                reason)
            _ = p.writeMessage(rejectMsg, wire.LatestEncoding)
            return errors.New(reason)
        }
    
        return nil
    }
    
    

    메시지 보내기

  • 메시지를 보내는 직접 입구는QueueMessage() 방법
  • outMessage가 outputQueue 대기열에 전송되었습니다 (버퍼링된 채널)
  • 그리고 queueHandler가sendQueue
  • 에 보내줍니다.
  • outHandler 쌍sendQueue 처리
  • stallContol
  • 에 발송
  • peer
  • 에 발급
  • stallhandler의 StallContol 처리
  • // QueueMessage adds the passed bitcoin message to the peer send queue.
    //
    // This function is safe for concurrent access.
    func (p *Peer) QueueMessage(msg wire.Message, doneChan chan
    p := Peer{
            inbound:         inbound,
            wireEncoding:    wire.BaseEncoding,
            knownInventory:  newMruInventoryMap(maxKnownInventory),
            stallControl:    make(chan stallControlMsg, 1), // nonblocking sync
            outputQueue:     make(chan outMsg, outputBufferSize),
            sendQueue:       make(chan outMsg, 1),   // nonblocking sync
            sendDoneQueue:   make(chan struct{}, 1), // nonblocking sync
            outputInvChan:   make(chan *wire.InvVect, outputBufferSize),
            inQuit:          make(chan struct{}),
            queueQuit:       make(chan struct{}),
            outQuit:         make(chan struct{}),
            quit:            make(chan struct{}),
            cfg:             cfg, // Copy so caller can't mutate.
            services:        cfg.Services,
            protocolVersion: cfg.ProtocolVersion,
        }
    

    메시지 수신

  • inHandler를 통해peer 메시지 감청
  • stallControl에 sccReceiveMessage
  • 보내기
  • stallControl에 sccHandlerStart
  • 보내기
  • onXXX에 해당하는 메시지를 호출하고queueMessage를 통해 결과를 발송
  • stallContol에 sccHandlerDone
  • 보내기
    // inHandler handles all incoming messages for the peer.  It must be run as a
    // goroutine.
    func (p *Peer) inHandler() {
        // The timer is stopped when a new message is received and reset after it
        // is processed.
        idleTimer := time.AfterFunc(idleTimeout, func() {
            log.Warnf("Peer %s no answer for %s -- disconnecting", p, idleTimeout)
            p.Disconnect()
        })
    
    out:
        for atomic.LoadInt32(&p.disconnect) == 0 {
            // Read a message and stop the idle timer as soon as the read
            // is done.  The timer is reset below for the next iteration if
            // needed.
            rmsg, buf, err := p.readMessage(p.wireEncoding)
            idleTimer.Stop()
            if err != nil {
                // In order to allow regression tests with malformed messages, don't
                // disconnect the peer when we're in regression test mode and the
                // error is one of the allowed errors.
                if p.isAllowedReadError(err) {
                    log.Errorf("Allowed test error from %s: %v", p, err)
                    idleTimer.Reset(idleTimeout)
                    continue
                }
    
                // Only log the error and send reject message if the
                // local peer is not forcibly disconnecting and the
                // remote peer has not disconnected.
                if p.shouldHandleReadError(err) {
                    errMsg := fmt.Sprintf("Can't read message from %s: %v", p, err)
                    if err != io.ErrUnexpectedEOF {
                        log.Errorf(errMsg)
                    }
    
                    // Push a reject message for the malformed message and wait for
                    // the message to be sent before disconnecting.
                    //
                    // NOTE: Ideally this would include the command in the header if
                    // at least that much of the message was valid, but that is not
                    // currently exposed by wire, so just used malformed for the
                    // command.
                    p.PushRejectMsg("malformed", wire.RejectMalformed, errMsg, nil,
                        true)
                }
                break out
            }
            atomic.StoreInt64(&p.lastRecv, time.Now().Unix())
            p.stallControl 

    좋은 웹페이지 즐겨찾기