이더리움 원본 분석 - 이더리움 P2P 프로토콜

45435 단어 블록체인

P2P 서버 만들기

func (n *Node) Start() error {
    ...

    // Initialize the p2p server. This creates the node key and
    // discovery databases.
    n.serverConfig = n.config.P2P
    n.serverConfig.PrivateKey = n.config.NodeKey()
    n.serverConfig.Name = n.config.NodeName()
    n.serverConfig.Logger = n.log
    if n.serverConfig.StaticNodes == nil {
        n.serverConfig.StaticNodes = n.config.StaticNodes()
    }
    if n.serverConfig.TrustedNodes == nil {
        n.serverConfig.TrustedNodes = n.config.TrustedNodes()
    }
    if n.serverConfig.NodeDatabase == "" {
        n.serverConfig.NodeDatabase = n.config.NodeDB()
    }
    running := &p2p.Server{Config: n.serverConfig}
    n.log.Info("Starting peer-to-peer node", "instance", n.serverConfig.Name)
    ....
}

코드는 먼저 몇 가지 검사 작업을 했습니다. 자물쇠를 채우고 결점이 실행되었는지 판단하고 데이터가 열릴 수 있는지 확인한 다음에 P2P 서버 설정을 초기화하고 마지막으로 이 설정으로 p2p를 만들었습니다.서버 인스턴스입니다.먼저 Node의 서비스 필드를 초기화하고 서비스Funcs, 즉 이전에 등록된 모든 서비스의 구조 함수 목록을 반복합니다.서비스 실례를 만들기 전에 모든 서비스에 서비스 Context를 만듭니다. 전제에서 보듯이 서비스 Context에는 Node에서 물려받은 정보가 저장되어 있습니다.이어서 구조 함수를 통해 서비스 실례를 만들고 서비스 맵에 추가합니다.

서비스 만들기

// Otherwise copy and specialize the P2P configuration
    services := make(map[reflect.Type]Service)
    for _, constructor := range n.serviceFuncs {
        // Create a new context for the particular service
        ctx := &ServiceContext{
            config:         n.config,
            services:       make(map[reflect.Type]Service),
            EventMux:       n.eventmux,
            AccountManager: n.accman,
        }
        for kind, s := range services { // copy needed for threaded access
            ctx.services[kind] = s
        }
        // Construct and save the service
        service, err := constructor(ctx)
        if err != nil {
            return err
        }
        kind := reflect.TypeOf(service)
        if _, exists := services[kind]; exists {
            return &DuplicateServiceError{Kind: kind}
        }
        services[kind] = service
    }

먼저 Node의 서비스 필드를 초기화하고 서비스Funcs, 즉 이전에 등록된 모든 서비스의 구조 함수 목록을 반복합니다.서비스 실례를 만들기 전에 모든 서비스에 서비스 Context를 만듭니다. 전제에서 보듯이 서비스 Context에는 Node에서 물려받은 정보가 저장되어 있습니다.이어서 구조 함수를 통해 서비스 실례를 만들고 서비스 맵에 추가합니다.

P2P 서버 시작

// Gather the protocols and start the freshly assembled P2P server 
    for _, service := range services {  
        running.Protocols = append(running.Protocols, service.Protocols()...)  
    }  
    if err := running.Start(); err != nil {  
        return convertFileLockError(err)  
    }  

우선 모든 서비스가 지원하는 프로토콜을 한데 모은 다음 p2p를 호출합니다.서버의 Start() 방법으로 P2P 서버를 시작합니다(코드는 p2p/server.go에 있습니다).P2P 서버는 UDP 포트와 TCP 포트를 연결합니다. 포트 번호는 동일합니다(기본 30303).UDP 포트는 주로 결점 발견, TCP 포트는 업무 데이터 전송, RLPx 암호화 전송 프로토콜 기반입니다.따라서 Start () 메서드는 다음과 같은 작업을 수행합니다.
  • UDP 포트 탐지: 결점 검색
  • UDP 요청 결과 테이블 가져오기: 내부적으로 goroutine가 시작됩니다
  • 탐지 TCP 포트: 비즈니스 데이터 전송용 RLPx 프로토콜 기반
  • 다른 결점에 TCP 요청 연결: goroutine 시작 완료
  • // p2p/server.go
    // Servers can not be re-used after stopping.
    func (srv *Server) Start() (err error) {
        srv.lock.Lock()
        defer srv.lock.Unlock()
        if srv.running {
            return errors.New("server already running")
        }
        srv.running = true
        srv.log = srv.Config.Logger
        if srv.log == nil {
            srv.log = log.New()
        }
        srv.log.Info("Starting P2P networking")
    
        // static fields
        if srv.PrivateKey == nil {
            return fmt.Errorf("Server.PrivateKey must be set to a non-nil key")
        }
        if srv.newTransport == nil {
            srv.newTransport = newRLPX
        }
        if srv.Dialer == nil {
            srv.Dialer = TCPDialer{&net.Dialer{Timeout: defaultDialTimeout}}
        }
        srv.quit = make(chan struct{})
        srv.addpeer = make(chan *conn)
        srv.delpeer = make(chan peerDrop)
        srv.posthandshake = make(chan *conn)
        srv.addstatic = make(chan *discover.Node)
        srv.removestatic = make(chan *discover.Node)
        srv.peerOp = make(chan peerOpFunc)
        srv.peerOpDone = make(chan struct{})
    
        var (
            conn      *net.UDPConn
            sconn     *sharedUDPConn
            realaddr  *net.UDPAddr
            unhandled chan discover.ReadPacket
        )
    
        if !srv.NoDiscovery || srv.DiscoveryV5 {
            addr, err := net.ResolveUDPAddr("udp", srv.ListenAddr)
            if err != nil {
                return err
            }
            conn, err = net.ListenUDP("udp", addr)
            if err != nil {
                return err
            }
            realaddr = conn.LocalAddr().(*net.UDPAddr)
            if srv.NAT != nil {
                if !realaddr.IP.IsLoopback() {
                    go nat.Map(srv.NAT, srv.quit, "udp", realaddr.Port, realaddr.Port, "ethereum discovery")
                }
                // TODO: react to external IP changes over time.
                if ext, err := srv.NAT.ExternalIP(); err == nil {
                    realaddr = &net.UDPAddr{IP: ext, Port: realaddr.Port}
                }
            }
        }
    
        if !srv.NoDiscovery && srv.DiscoveryV5 {
            unhandled = make(chan discover.ReadPacket, 100)
            sconn = &sharedUDPConn{conn, unhandled}
        }
    
        // node table
        if !srv.NoDiscovery {
            cfg := discover.Config{
                PrivateKey:   srv.PrivateKey,
                AnnounceAddr: realaddr,
                NodeDBPath:   srv.NodeDatabase,
                NetRestrict:  srv.NetRestrict,
                Bootnodes:    srv.BootstrapNodes,
                Unhandled:    unhandled,
            }
            ntab, err := discover.ListenUDP(conn, cfg)
            if err != nil {
                return err
            }
            srv.ntab = ntab
        }
    
        if srv.DiscoveryV5 {
            var (
                ntab *discv5.Network
                err  error
            )
            if sconn != nil {
                ntab, err = discv5.ListenUDP(srv.PrivateKey, sconn, realaddr, "", srv.NetRestrict) //srv.NodeDatabase)
            } else {
                ntab, err = discv5.ListenUDP(srv.PrivateKey, conn, realaddr, "", srv.NetRestrict) //srv.NodeDatabase)
            }
            if err != nil {
                return err
            }
            if err := ntab.SetFallbackNodes(srv.BootstrapNodesV5); err != nil {
                return err
            }
            srv.DiscV5 = ntab
        }
    
        dynPeers := srv.maxDialedConns()
        dialer := newDialState(srv.StaticNodes, srv.BootstrapNodes, srv.ntab, dynPeers, srv.NetRestrict)
    
        // handshake
        srv.ourHandshake = &protoHandshake{Version: baseProtocolVersion, Name: srv.Name, ID: discover.PubkeyID(&srv.PrivateKey.PublicKey)}
        for _, p := range srv.Protocols {
            srv.ourHandshake.Caps = append(srv.ourHandshake.Caps, p.cap())
        }
        // listen/dial
        if srv.ListenAddr != "" {
            if err := srv.startListening(); err != nil {
                return err
            }
        }
        if srv.NoDial && srv.ListenAddr == "" {
            srv.log.Warn("P2P server will be useless, neither dialing nor listening")
        }
    
        srv.loopWG.Add(1)
        go srv.run(dialer)
        srv.running = true
        return nil
    }

    서비스 시작

    // Start each of the services
        started := []reflect.Type{}
        for kind, service := range services {
            // Start the next service, stopping all previous upon failure
            if err := service.Start(running); err != nil {
                for _, kind := range started {
                    services[kind].Stop()
                }
                running.Stop()
    
                return err
            }
            // Mark the service started for potential cleanup
            started = append(started, kind)
        }

    주로 각 서비스의 Start () 방법을 순서대로 호출한 다음에 시작된 서비스의 유형을 started 테이블에 저장합니다.앞서 Ethereum은 하나의 서비스로 Node에 등록되었다고 언급했다.Node start는 등록된 모든 서비스를 시작합니다. Ethereum 서비스도 마찬가지입니다.

    ethereum service


    ethereum 서비스의 초기화

    eth/backend.go
    func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
        if config.SyncMode == downloader.LightSync {
            return nil, errors.New("can't run eth.Ethereum in light sync mode, use les.LightEthereum")
        }
        if !config.SyncMode.IsValid() {
            return nil, fmt.Errorf("invalid sync mode %d", config.SyncMode)
        }
        chainDb, err := CreateDB(ctx, config, "chaindata")
        if err != nil {
            return nil, err
        }
        chainConfig, genesisHash, genesisErr := core.SetupGenesisBlock(chainDb, config.Genesis)
        if _, ok := genesisErr.(*params.ConfigCompatError); genesisErr != nil && !ok {
            return nil, genesisErr
        }
        log.Info("Initialised chain configuration", "config", chainConfig)
    
        eth := &Ethereum{
            config:         config,
            chainDb:        chainDb,
            chainConfig:    chainConfig,
            eventMux:       ctx.EventMux,
            accountManager: ctx.AccountManager,
            engine:         CreateConsensusEngine(ctx, &config.Ethash, chainConfig, chainDb),
            shutdownChan:   make(chan bool),
            networkId:      config.NetworkId,
            gasPrice:       config.GasPrice,
            etherbase:      config.Etherbase,
            bloomRequests:  make(chan chan *bloombits.Retrieval),
            bloomIndexer:   NewBloomIndexer(chainDb, params.BloomBitsBlocks),
        }
    
        log.Info("Initialising Ethereum protocol", "versions", ProtocolVersions, "network", config.NetworkId)
    
        if !config.SkipBcVersionCheck {
            bcVersion := rawdb.ReadDatabaseVersion(chainDb)
            if bcVersion != core.BlockChainVersion && bcVersion != 0 {
                return nil, fmt.Errorf("Blockchain DB version mismatch (%d / %d). Run geth upgradedb.
    "
    , bcVersion, core.BlockChainVersion) } rawdb.WriteDatabaseVersion(chainDb, core.BlockChainVersion) } var ( vmConfig = vm.Config{EnablePreimageRecording: config.EnablePreimageRecording} cacheConfig = &core.CacheConfig{Disabled: config.NoPruning, TrieNodeLimit: config.TrieCache, TrieTimeLimit: config.TrieTimeout} ) eth.blockchain, err = core.NewBlockChain(chainDb, cacheConfig, eth.chainConfig, eth.engine, vmConfig) if err != nil { return nil, err } // Rewind the chain in case of an incompatible config upgrade. if compat, ok := genesisErr.(*params.ConfigCompatError); ok { log.Warn("Rewinding chain to upgrade configuration", "err", compat) eth.blockchain.SetHead(compat.RewindTo) rawdb.WriteChainConfig(chainDb, genesisHash, chainConfig) } eth.bloomIndexer.Start(eth.blockchain) if config.TxPool.Journal != "" { config.TxPool.Journal = ctx.ResolvePath(config.TxPool.Journal) } eth.txPool = core.NewTxPool(config.TxPool, eth.chainConfig, eth.blockchain) if eth.protocolManager, err = NewProtocolManager(eth.chainConfig, config.SyncMode, config.NetworkId, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb); err != nil { return nil, err } eth.miner = miner.New(eth, eth.chainConfig, eth.EventMux(), eth.engine) eth.miner.SetExtra(makeExtraData(config.ExtraData)) eth.APIBackend = &EthAPIBackend{eth, nil} gpoParams := config.GPO if gpoParams.Default == nil { gpoParams.Default = config.GasPrice } eth.APIBackend.gpo = gasprice.NewOracle(eth.APIBackend, gpoParams) return eth, nil }
  • 만약config.SyncMode는 다운로더입니다.LightSync, les/backend로 갑니다.go의 초기화 방법.
  • chainDb,err: = CreateDB(ctx,config,'chaindata')에서 leveldb를 열고, leveldb는 eth 저장 데이터베이스입니다.
  • stopDbUpgrade: = upgradeDeduplicateData(chainDb)는chainDb버전을 검사하고 필요하면 백그라운드 프로세스를 시작하여 업그레이드합니다.
  • chainConfig, genesisHash, genesisErr := core.SetupGenesisBlock(chainDb, config.Genesis)이 창세 블록을 마운트합니다.노드 조건에 따라 데이터베이스에서 읽을 것인지, 기본 프로필에서 읽을 것인지, 사용자 정의 프로필에서 읽을 것인지, 코드에서 기본값을 얻을 것인지를 판단한다.그리고 블록체인의 config와 창세 블록의hash를 되돌려줍니다.
  • Etherum struct를 마운트하는 각 구성원.eventMux와 accountManager는 Node가 eth 서비스를 시작할 때 전송된 것입니다.이벤트 Mux는 전역적인 이벤트 다중 복용기라고 할 수 있으며, 계정 관리자는 전역적인 계정 관리자라고 할 수 있습니다.engine에서 공통 인식 엔진을 만듭니다.etherbase에서 이 Etherum의 주 계정 주소를 구성합니다.bloomRequests 채널과 bloom 필터를 초기화합니다.
  • 클라이언트 버전 번호와 데이터베이스 버전 번호가 일치하는지 판단
  • eth.blockchain, err = core.New BlockChain(chainDb, cacheConfig, eth.chainConfig, eth.engine, vmConfig) eth의 blockchain, 즉 eth의 블록체인을 초기화
  • eth.blockchain.SetHead(compat.RewindTo)는 시작 블록에 따라 블록 헤더를 설정합니다
  • .
  • eth.bloomIndexer.Start(eth.blockchain) BloomIndexer 시작
  • eth.txPool = core.New TxPool(config.TxPool,eth.chainConfig,eth.blockchain)은 eth 블록체인의 거래 탱크를 초기화하고 로컬에서 생산된 P2P 네트워크와 동기화된 거래를 저장합니다.
  • eth.protocolManager,err = New ProtocolManager(eth.chainConfig,config.SyncMode,config.NetworkId,eth.eventMux,eth.txPool,eth.engine,eth.blockchain,chainDb) 이더리움 프로토콜 관리자를 초기화하여 블록체인 P2P 통신에 사용
  • miner.New(eth, eth.chainConfig, eth.EventMux(), eth.engine) 광부 초기화
  • eth.ApiBackend.gpo = gasprice.NewOracle(eth.ApiBackend, gpoParams) 최신 가스프릭을 예언하는 예언기
  • 만들기

    ethereum 서비스 시작

    func (s *Ethereum) Start(srvr *p2p.Server) error {
        // Start the bloom bits servicing goroutines
        s.startBloomHandlers()
    
        // Start the RPC service
        s.netRPCService = ethapi.NewPublicNetAPI(srvr, s.NetVersion())
    
        // Figure out a max peers count based on the server limits
        maxPeers := srvr.MaxPeers
        if s.config.LightServ > 0 {
            if s.config.LightPeers >= srvr.MaxPeers {
                return fmt.Errorf("invalid peer config: light peer count (%d) >= total peer count (%d)", s.config.LightPeers, srvr.MaxPeers)
            }
            maxPeers -= s.config.LightPeers
        }
        // Start the networking layer and the light server if requested
        s.protocolManager.Start(maxPeers)
        if s.lesServer != nil {
            s.lesServer.Start(srvr)
        }
        return nil
    }

    우선 bloom 필터 eth의 net 관련 Api를 시작하여 RPC 서비스에 가입합니다.s.protocolManager.Start(maxPeers)는 최대 동기화 노드 수를 설정하고 eth P2P 통신을 시작합니다.ethereum 서비스에 문제가 생기면 les Server를 시작합니다.

    ProtocolManager 이더리움 P2P 통신 프로토콜 관리


    ethereum 서비스의 초기화도 호출됩니다NewProtocolManager.
    func New(ctx *node.ServiceContext, config *Config) (*Ethereum, error) {
    ...
        if eth.protocolManager, err = NewProtocolManager(eth.chainConfig, config.SyncMode, config.NetworkId, eth.eventMux, eth.txPool, eth.engine, eth.blockchain, chainDb); err != nil {
                return nil, err
            }
    
            ....
    }

    ProtocolManager 초기화 방법
    func NewProtocolManager(config *params.ChainConfig, mode downloader.SyncMode, networkId uint64, mux *event.TypeMux, txpool txPool, engine consensus.Engine, blockchain *core.BlockChain, chaindb ethdb.Database) (*ProtocolManager, error) {
        // Create the protocol manager with the base fields
        manager := &ProtocolManager{
            networkId:   networkId,
            eventMux:    mux,
            txpool:      txpool,
            blockchain:  blockchain,
            chainconfig: config,
            peers:       newPeerSet(),
            newPeerCh:   make(chan *peer),
            noMorePeers: make(chan struct{}),
            txsyncCh:    make(chan *txsync),
            quitSync:    make(chan struct{}),
        }
        // Figure out whether to allow fast sync or not
        if mode == downloader.FastSync && blockchain.CurrentBlock().NumberU64() > 0 {
            log.Warn("Blockchain not empty, fast sync disabled")
            mode = downloader.FullSync
        }
        if mode == downloader.FastSync {
            manager.fastSync = uint32(1)
        }
        // Initiate a sub-protocol for every implemented version we can handle
        manager.SubProtocols = make([]p2p.Protocol, 0, len(ProtocolVersions))
        for i, version := range ProtocolVersions {
            // Skip protocol version if incompatible with the mode of operation
            if mode == downloader.FastSync && version < eth63 {
                continue
            }
            // Compatible; initialise the sub-protocol
            version := version // Closure for the run
            manager.SubProtocols = append(manager.SubProtocols, p2p.Protocol{
                Name:    ProtocolName,
                Version: version,
                Length:  ProtocolLengths[i],
                Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
                    peer := manager.newPeer(int(version), p, rw)
                    select {
                    case manager.newPeerCh (1)
                        defer manager.wg.Done()
                        return manager.handle(peer)
                    case return p2p.DiscQuitting
                    }
                },
                NodeInfo: func() interface{} {
                    return manager.NodeInfo()
                },
                PeerInfo: func(id discover.NodeID) interface{} {
                    if p := manager.peers.Peer(fmt.Sprintf("%x", id[:8])); p != nil {
                        return p.Info()
                    }
                    return nil
                },
            })
        }
        if len(manager.SubProtocols) == 0 {
            return nil, errIncompatibleConfig
        }
        // Construct the different synchronisation mechanisms
        manager.downloader = downloader.New(mode, chaindb, manager.eventMux, blockchain, nil, manager.removePeer)
    
        validator := func(header *types.Header) error {
            return engine.VerifyHeader(blockchain, header, true)
        }
        heighter := func() uint64 {
            return blockchain.CurrentBlock().NumberU64()
        }
        inserter := func(blocks types.Blocks) (int, error) {
            // If fast sync is running, deny importing weird blocks
            if atomic.LoadUint32(&manager.fastSync) == 1 {
                log.Warn("Discarded bad propagated block", "number", blocks[0].Number(), "hash", blocks[0].Hash())
                return 0, nil
            }
            atomic.StoreUint32(&manager.acceptTxs, 1) // Mark initial sync done on any fetcher import
            return manager.blockchain.InsertChain(blocks)
        }
        manager.fetcher = fetcher.New(blockchain.GetBlockByHash, validator, manager.BroadcastBlock, heighter, inserter, manager.removePeer)
    
        return manager, nil
    }
  • peers는 이더리움에 가까운 동기화 네트워크 노드로 newPeerCh, noMorePeers, txsyncCh,quitSync의 동기화 알림
  • manager.SubProtocols가 이더리움 P2P 서버를 만드는 통신 프로토콜은 보통 하나의 값만 있습니다.manager.SubProtocols, Node start 때 이더리움 P2P 서버에 전송하고 동시에 start P2P 서버.프로토콜에는 세 개의 함수 포인터(Run, NodeInfo, PeerInfo)가 매우 중요하며 그 뒤에 사용됩니다.
  • manager.downloader = downloader.New(mode,chaindb,manager.eventMux,blockchain,nil,manager.removePeer)는 원격 네트워크 노드에서hashes와blocks를 가져오는 다운로드기를 만들었습니다.
  • manager.fetcher = fetcher.New(blockchain.GetBlockByHash,validator,manager.BroadcastBlock,heighter,inserter,manager.removePeer)는 네트워크의 다른 이더리움 노드가 보낸 동기화 알림을 수집하여 검증하고 해당하는 처리를 한다.초기화 전송된 몇 개의 매개 변수는 동기화 블록체인 데이터를 처리하는 함수 바늘
  • 이다.
    Ethereum 서비스가 시작되면 ProtocolManager가 동시에 시작됩니다.
    ProtocolManager의 start() 메서드:
    func (pm *ProtocolManager) Start(maxPeers int) { pm.maxPeers = maxPeers
    // broadcast transactions
    pm.txsCh = make(chan core.NewTxsEvent, txChanSize)
    pm.txsSub = pm.txpool.SubscribeNewTxsEvent(pm.txsCh)
    go pm.txBroadcastLoop()
    
    // broadcast mined blocks
    pm.minedBlockSub = pm.eventMux.Subscribe(core.NewMinedBlockEvent{})
    go pm.minedBroadcastLoop()
    
    // start sync handlers
    go pm.syncer()
    go pm.txsyncLoop()
    

    }
  • 새로운 거래 구독 채널을 만들고 거래 방송을 시작하는goroutine
  • 구덩이를 파는 구독 채널을 만들고 시작
  • pm.syncer() 동기화goroutine를 시작하고 네트워크의 다른 노드와 동기화하며 네트워크 노드에 대한 알림을 처리합니다
  • pm.txsyncLoop() 거래 동기화 goroutine를 시작하여 새로운 거래를 네트워크 노드에 고르게 동기화
  • ProtocolManager가 네트워크 노드에 사전 예방적으로 브로드캐스트
    Protocol Manager Start () 방법 중의 4개의goroutine는 모두 Protocol Manager가 이더리움 네트워크 노드에 방송하는 것을 처리한다.
  • pm.txBroadcastLoop() 메서드
  • func (pm *ProtocolManager) txBroadcastLoop() {
        for {
            select {
            case event := event.Txs)
    
            // Err() channel will be closed when unsubscribing.
            case return
            }
        }
    }

    core/tx_pool.go 새로운 거래가 생길 때sendself.txCh, 이때 self가 활성화됩니다.BroadcastTx(event.Tx.Hash(), event.Tx)
    func (pm *ProtocolManager) BroadcastTx(hash common.Hash, tx *types.Transaction) {
        // Broadcast transaction to a batch of peers not knowing about it
        peers := pm.peers.PeersWithoutTx(hash)
        //FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))]
        for _, peer := range peers {
            peer.SendTransactions(types.Transactions{tx})
        }
        log.Trace("Broadcast transaction", "hash", hash, "recipients", len(peers))
    }

    캐시에 이 거래가 없는hash의 네트워크 노드에 이번 거래를 방송합니다.
  • pm.minedBroadcastLoop() 메서드
  • // Mined broadcast loop
    func (self *ProtocolManager) minedBroadcastLoop() {
        // automatically stops if unsubscribe
        for obj := range self.minedBlockSub.Chan() {
            switch ev := obj.Data.(type) {
            case core.NewMinedBlockEvent:
                self.BroadcastBlock(ev.Block, true)  // First propagate block to peers
                self.BroadcastBlock(ev.Block, false) // Only then announce to the rest
            }
        }
    }
    miner.goNewMinedBlockEvent에서 새로운 블록을 파는 이벤트 알림을 받고self를 활성화합니다.BroadcastBlock(ev.Block, true)
    func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {
        hash := block.Hash()
        peers := pm.peers.PeersWithoutBlock(hash)
    
        // If propagation is requested, send to a subset of the peer
        if propagate {
            // Calculate the TD of the block (it's not imported yet, so block.Td is not valid)
            var td *big.Int
            if parent := pm.blockchain.GetBlock(block.ParentHash(), block.NumberU64()-1); parent != nil {
                td = new(big.Int).Add(block.Difficulty(), pm.blockchain.GetTd(block.ParentHash(), block.NumberU64()-1))
            } else {
                log.Error("Propagating dangling block", "number", block.Number(), "hash", hash)
                return
            }
            // Send the block to a subset of our peers
            transfer := peers[:int(math.Sqrt(float64(len(peers))))]
            for _, peer := range transfer {
                peer.SendNewBlock(block, td)
            }
            log.Trace("Propagated block", "hash", hash, "recipients", len(transfer), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
            return
        }
        // Otherwise if the block is indeed in out own chain, announce it
        if pm.blockchain.HasBlock(hash, block.NumberU64()) {
            for _, peer := range peers {
                peer.SendNewBlockHashes([]common.Hash{hash}, []uint64{block.NumberU64()})
            }
            log.Trace("Announced block", "hash", hash, "recipients", len(peers), "duration", common.PrettyDuration(time.Since(block.ReceivedAt)))
        }
    }

    만약propagate가true로 네트워크 노드에 전체 파낸 블록을 방송하면false는 파낸 블록의hash값과number값만 방송합니다.방송의 블록은 이 블록이 포장한 모든 거래도 포함한다.
  • pm.syncer() 메서드
  • func (pm *ProtocolManager) syncer() {
        // Start and ensure cleanup of sync mechanisms
        pm.fetcher.Start()
        defer pm.fetcher.Stop()
        defer pm.downloader.Terminate()
    
        // Wait for different events to fire synchronisation operations
        forceSync := time.NewTicker(forceSyncCycle)
        defer forceSync.Stop()
    
        for {
            select {
            case // Make sure we have peers to select from, then sync
                if pm.peers.Len() < minDesiredPeerCount {
                    break
                }
                go pm.synchronise(pm.peers.BestPeer())
    
            case // Force a sync even if not enough peers are present
                go pm.synchronise(pm.peers.BestPeer())
    
            case return
            }
        }
    }

    pm.fetcher.Start () fetcher 시작, 보조 동기화 블록 데이터
    P2P 서버에서 ProtocolManager의 p2p를 실행합니다.Protocol의 Run 포인터가 있을 때 pm을 보냅니다.newPeerCh, 이때 가장 좋은 네트워크 노드 (TD 총 난이도가 가장 높은) 를 선택하여pm를 시작합니다.synchronise(pm.peers.BestPeer()) goroutine.
  • pm.txsyncLoop() 메서드
  • func (pm *ProtocolManager) txsyncLoop() {
        var (
            pending = make(map[discover.NodeID]*txsync)
            sending = false               // whether a send is active
            pack    = new(txsync)         // the pack that is being sent
            done    = make(chan error, 1) // result of the send
        )
    
        // send starts a sending a pack of transactions from the sync.
        send := func(s *txsync) {
            // Fill pack with transactions up to the target size.
            size := common.StorageSize(0)
            pack.p = s.p
            pack.txs = pack.txs[:0]
            for i := 0; i < len(s.txs) && size < txsyncPackSize; i++ {
                pack.txs = append(pack.txs, s.txs[i])
                size += s.txs[i].Size()
            }
            // Remove the transactions that will be sent.
            s.txs = s.txs[:copy(s.txs, s.txs[len(pack.txs):])]
            if len(s.txs) == 0 {
                delete(pending, s.p.ID())
            }
            // Send the pack in the background.
            s.p.Log().Trace("Sending batch of transactions", "count", len(pack.txs), "bytes", size)
            sending = true
            go func() { done // pick chooses the next pending sync.
        pick := func() *txsync {
            if len(pending) == 0 {
                return nil
            }
            n := rand.Intn(len(pending)) + 1
            for _, s := range pending {
                if n--; n == 0 {
                    return s
                }
            }
            return nil
        }
    
        for {
            select {
            case s := if !sending {
                    send(s)
                }
            case err := false
                // Stop tracking peers that cause send failures.
                if err != nil {
                    pack.p.Log().Debug("Transaction send failed", "err", err)
                    delete(pending, pack.p.ID())
                }
                // Schedule the next send.
                if s := pick(); s != nil {
                    send(s)
                }
            case return
            }
        }
    }

    네트워크 노드에서 최신 거래 데이터를 동기화한 후에 로컬에서도 새로 동기화된 거래 데이터를 네트워크의 다른 노드에 방송할 것이다.이 네 개의goroutine은 기본적으로 끊임없이 방송 블록, 방송 거래를 하고 블록, 거래에 동기화하고 방송 블록, 방송 거래를 한다.

    좋은 웹페이지 즐겨찾기