filecoin 기술 구조 분석의 5: filecoin 소스 코드 분석의 프로 토 콜 층 심장 박동 프로 토 콜

11452 단어
본문 저자: 선 하 시스템 양 위;오리지널 작품, 전재 출처 를 밝 혀 주 십시오.
[이전 링크] filecoin 기술 구조 분석의 4: 4 filecoin 소스 코드 맨 위 구조 분석 [다음 링크] filecoin 기술 구조 분석의 6: 6 filecoin 소스 코드 프로 토 콜 층 분석의 hello 악수 프로 토 콜
목차
  • 5 filecoin 소스 코드 프로 토 콜 층 분석의 심장 박동 프로 토 콜
  • 5.1 소스 정보
  • 5.2 소스 코드 분석
  • 5.2.1 데이터 구조
  • 5.2.2 방법
  • 5.2.3 함수
  • 5.2.4 실례 화 및 업무 논리


  • 5.1 소스 코드 정보
  • version
  • master 분기 619 b0eb 1 (2019 년 3 월 2 일)
  • package
  • metrics

  • location
  • metrics/heartbeat.go
  • node/node.go


  • 5.2 소스 코드 분석
    5.2.1 데이터 구조
  • 심장 박동 프로 토 콜 이름과 연결 시간 초과 정의
  • // HeartbeatProtocol is the libp2p protocol used for the heartbeat service
    const (
        HeartbeatProtocol = "fil/heartbeat/1.0.0"
        // Minutes to wait before logging connection failure at ERROR level
        connectionFailureErrorLogPeriodMinutes = 10 * time.Minute
    )
    
  • 심장 박동 정보 구조 정의
  • 노드 의 블록 헤드
  • 노드 의 블록 높이
  • 노드 의 닉네임
  • 블록 동기 화 여부 (TODO)
  • 광부 주소 (채굴 이 없 으 면 여 기 는 0 주소)
  • // Heartbeat contains the information required to determine the current state of a node.
    // Heartbeats are used for aggregating information about nodes in a log aggregator
    // to support alerting and devnet visualization.
    type Heartbeat struct {
        // Head represents the heaviest tipset the nodes is mining on
        Head string
        // Height represents the current height of the Tipset
        Height uint64
        // Nickname is the nickname given to the filecoin node by the user
        Nickname string
        // TODO: add when implemented
        // Syncing is `true` iff the node is currently syncing its chain with the network.
        // Syncing bool
    
        // Address of this node's active miner. Can be empty - will return the zero address
        MinerAddress address.Address
    }
    
  • 심장 박동 서비스 구조 체
  • 호스트 구조 체: 대응 libp2p 호스트
  • 심장 박동 설정
  • 블록 버스터 획득
  • 채굴 지 주소 획득
  • stream 자물쇠
  • stream

  • // HeartbeatService is responsible for sending heartbeats.
    type HeartbeatService struct {
        Host   host.Host
        Config *config.HeartbeatConfig
    
        // A function that returns the heaviest tipset
        HeadGetter func() types.TipSet
    
        // A function that returns the miner's address
        MinerAddressGetter func() address.Address
    
        streamMu sync.Mutex
        stream   net.Stream
    }
    
  • 심장 박동 서비스 옵션 함수 정의
  • 함수 입 참 은 심장 박동 서비스 구조 체 로 주로 심장 박동 서비스 구조 체 에 대한 전 삼 또는 분석
  • 에 사용 된다.
    // HeartbeatServiceOption is the type of the heartbeat service's functional options.
    type HeartbeatServiceOption func(service *HeartbeatService)
    

    5.2.2 방법
  • 심장 박동 서 비 스 를 받 는 stream 인 스 턴 스
  • // Stream returns the HeartbeatService stream. Safe for concurrent access.
    // Stream is a libp2p connection that heartbeat messages are sent over to an aggregator.
    func (hbs *HeartbeatService) Stream() net.Stream {
        hbs.streamMu.Lock()
        defer hbs.streamMu.Unlock()
        return hbs.stream
    }
    
  • 심장 박동 서 비 스 를 설정 하 는 stream 인 스 턴 스
  • // SetStream sets the stream on the HeartbeatService. Safe for concurrent access.
    func (hbs *HeartbeatService) SetStream(s net.Stream) {
        hbs.streamMu.Lock()
        defer hbs.streamMu.Unlock()
        hbs.stream = s
    }
    
  • 정기 적 으로 연결 성 을 확인 하고 심장 박동 서 비 스 를 호출 합 니 다
  • // Start starts the heartbeat service by, starting the connection loop. The connection
    // loop will attempt to connected to the aggregator service, once a successful
    // connection is made with the aggregator service hearbeats will be sent to it.
    // If the connection is broken the heartbeat service will attempt to reconnect via
    // the connection loop. Start will not return until context `ctx` is 'Done'.
    func (hbs *HeartbeatService) Start(ctx context.Context) {
        log.Debug("starting heartbeat service")
    
        rd, err := time.ParseDuration(hbs.Config.ReconnectPeriod)
        if err != nil {
            log.Errorf("invalid heartbeat reconnectPeriod: %s", err)
            return
        }
    
        //       
        reconTicker := time.NewTicker(rd)
        defer reconTicker.Stop()
        // Timestamp of the first connection failure since the last successful connection.
        // Zero initially and while connected.
        var failedAt time.Time
        // Timestamp of the last ERROR log (or of failure, before the first ERROR log).
        var erroredAt time.Time
        for {
            select {
            case  connectionFailureErrorLogPeriodMinutes {
                        logfn = log.Errorf
                        erroredAt = now // Reset the timer
                    }
                    failureDuration := now.Sub(failedAt)
                    logfn("Heartbeat service failed to connect for %s: %s", failureDuration, err)
                    // failed to connect, continue reconnect loop
                    continue
                }
                failedAt = time.Time{}
    
                // we connected, send heartbeats!
                // Run will block until it fails to send a heartbeat.
                //      ,      
                if err := hbs.Run(ctx); err != nil {
                    log.Warning("disconnecting from aggregator, failed to send heartbeat")
                    continue
                }
            }
        }
    }
    
  • 심장 박동 서비스 운영
  • // Run is called once the heartbeat service connects to the aggregator. Run
    // send the actual heartbeat. Run will block until `ctx` is 'Done`. An error will
    // be returned if Run encounters an error when sending the heartbeat and the connection
    // to the aggregator will be closed.
    func (hbs *HeartbeatService) Run(ctx context.Context) error {
        bd, err := time.ParseDuration(hbs.Config.BeatPeriod)
        if err != nil {
            log.Errorf("invalid heartbeat beatPeriod: %s", err)
            return err
        }
    
        //       
        beatTicker := time.NewTicker(bd)
        defer beatTicker.Stop()
    
        //  encoder     
        // TODO use cbor instead of json
        encoder := json.NewEncoder(hbs.stream)
        for {
            select {
            case 
  • 심장 박동 파라미터 획득
  • // Beat will create a heartbeat.
    func (hbs *HeartbeatService) Beat() Heartbeat {
        nick := hbs.Config.Nickname
        ts := hbs.HeadGetter()
        tipset := ts.ToSortedCidSet().String()
        height, err := ts.Height()
        if err != nil {
            log.Warningf("heartbeat service failed to get chain height: %s", err)
        }
        addr := hbs.MinerAddressGetter()
        return Heartbeat{
            Head:         tipset,
            Height:       height,
            Nickname:     nick,
            MinerAddress: addr,
        }
    }
    
  • 심장 박동 연결
  • // Connect will connects to `hbs.Config.BeatTarget` or returns an error
    func (hbs *HeartbeatService) Connect(ctx context.Context) error {
        log.Debugf("Heartbeat service attempting to connect, targetAddress: %s", hbs.Config.BeatTarget)
        targetMaddr, err := ma.NewMultiaddr(hbs.Config.BeatTarget)
        if err != nil {
            return err
        }
    
        pid, err := targetMaddr.ValueForProtocol(ma.P_P2P)
        if err != nil {
            return err
        }
    
        peerid, err := peer.IDB58Decode(pid)
        if err != nil {
            return err
        }
    
        // Decapsulate the /p2p/ part from the target
        // /ip4//p2p/ becomes /ip4/
        targetPeerAddr, _ := ma.NewMultiaddr(
            fmt.Sprintf("/p2p/%s", peer.IDB58Encode(peerid)))
        targetAddr := targetMaddr.Decapsulate(targetPeerAddr)
    
        hbs.Host.Peerstore().AddAddr(peerid, targetAddr, pstore.PermanentAddrTTL)
    
        //        
        s, err := hbs.Host.NewStream(ctx, peerid, HeartbeatProtocol)
        if err != nil {
            log.Debugf("failed to open stream, peerID: %s, targetAddr: %s %s", peerid, targetAddr, err)
            return err
        }
        log.Infof("successfully to open stream, peerID: %s, targetAddr: %s", peerid, targetAddr)
    
        //     
        hbs.SetStream(s)
        return nil
    }
    

    5.2.3 함수
  • 심장 박동 서비스 구조 체 에 인삼 을 전달 하여 광부 주소 함수
  • 를 설정 하 는 데 사용 합 니 다.
    // WithMinerAddressGetter returns an option that can be used to set the miner address getter.
    func WithMinerAddressGetter(ag func() address.Address) HeartbeatServiceOption {
        return func(service *HeartbeatService) {
            service.MinerAddressGetter = ag
        }
    }
    
  • 기본 광부 주소 가 져 오기
  • func defaultMinerAddressGetter() address.Address {
        return address.Address{}
    }
    
  • 심장 박동 서 비 스 를 예화 하고 구체 적 인 예화 가 node 가방 에서 이 루어 집 니 다.
  • // NewHeartbeatService returns a HeartbeatService
    func NewHeartbeatService(h host.Host, hbc *config.HeartbeatConfig, hg func() types.TipSet, options ...HeartbeatServiceOption) *HeartbeatService {
        srv := &HeartbeatService{
            Host:               h,
            Config:             hbc,
            HeadGetter:         hg,
            MinerAddressGetter: defaultMinerAddressGetter,
        }
    
        //              ,                
        for _, option := range options {
            option(srv)
        }
    
        return srv
    }
    

    5.2.4 실례 화 및 업무 논리
  • 주로 node 에서 호출 됩 니 다. location: node / node. go, 주요 논 리 는 다음 과 같 습 니 다
  • node 의 시작 방법 에서 node. setupHeartbeat Services 방법 을 호출 하여 심장 박동 서 비 스 를 구축 합 니 다
  • // Start boots up the node.
    func (node *Node) Start(ctx context.Context) error {
        ......
    
        if err := node.setupHeartbeatServices(ctx); err != nil {
            return errors.Wrap(err, "failed to start heartbeat services")
        }
    
        return nil
    }
    
  • 심장 박동 서 비 스 를 구축 하 는데 구체 적 으로 다음 과 같은 주석
  • 을 볼 수 있다.
    func (node *Node) setupHeartbeatServices(ctx context.Context) error {
        //   “        ”
        mag := func() address.Address {
            addr, err := node.miningAddress()
            // the only error miningAddress() returns is ErrNoMinerAddress.
            // if there is no configured miner address, simply send a zero
            // address across the wire.
            if err != nil {
                return address.Address{}
            }
            return addr
        }
    
        //          ,         
        // start the primary heartbeat service
        if len(node.Repo.Config().Heartbeat.BeatTarget) > 0 {
            //  metrics           、            
            hbs := metrics.NewHeartbeatService(node.Host(), node.Repo.Config().Heartbeat, node.ChainReader.Head, metrics.WithMinerAddressGetter(mag))
            go hbs.Start(ctx)
        }
    
        //                         (         ),         ,            。
        // check if we want to connect to an alert service. An alerting service is a heartbeat
        // service that can trigger alerts based on the contents of heatbeats.
        if alertTarget := os.Getenv("FIL_HEARTBEAT_ALERTS"); len(alertTarget) > 0 {
            ahbs := metrics.NewHeartbeatService(node.Host(), &config.HeartbeatConfig{
                BeatTarget:      alertTarget,
                BeatPeriod:      "10s",
                ReconnectPeriod: "10s",
                Nickname:        node.Repo.Config().Heartbeat.Nickname,
            }, node.ChainReader.Head, metrics.WithMinerAddressGetter(mag))
            go ahbs.Start(ctx)
        }
        return nil
    }
    

    [이전 링크] filecoin 기술 구조 분석의 4: 4 filecoin 소스 코드 맨 위 구조 분석 [다음 링크] filecoin 기술 구조 분석의 6: 6 filecoin 소스 코드 프로 토 콜 층 분석의 hello 악수 프로 토 콜

    좋은 웹페이지 즐겨찾기