filecoin 기술 구조 분석의 5: filecoin 소스 코드 분석의 프로 토 콜 층 심장 박동 프로 토 콜
[이전 링크] filecoin 기술 구조 분석의 4: 4 filecoin 소스 코드 맨 위 구조 분석 [다음 링크] filecoin 기술 구조 분석의 6: 6 filecoin 소스 코드 프로 토 콜 층 분석의 hello 악수 프로 토 콜
목차
5.1 소스 코드 정보
5.2 소스 코드 분석
5.2.1 데이터 구조
// HeartbeatProtocol is the libp2p protocol used for the heartbeat service
const (
HeartbeatProtocol = "fil/heartbeat/1.0.0"
// Minutes to wait before logging connection failure at ERROR level
connectionFailureErrorLogPeriodMinutes = 10 * time.Minute
)
// Heartbeat contains the information required to determine the current state of a node.
// Heartbeats are used for aggregating information about nodes in a log aggregator
// to support alerting and devnet visualization.
type Heartbeat struct {
// Head represents the heaviest tipset the nodes is mining on
Head string
// Height represents the current height of the Tipset
Height uint64
// Nickname is the nickname given to the filecoin node by the user
Nickname string
// TODO: add when implemented
// Syncing is `true` iff the node is currently syncing its chain with the network.
// Syncing bool
// Address of this node's active miner. Can be empty - will return the zero address
MinerAddress address.Address
}
// HeartbeatService is responsible for sending heartbeats.
type HeartbeatService struct {
Host host.Host
Config *config.HeartbeatConfig
// A function that returns the heaviest tipset
HeadGetter func() types.TipSet
// A function that returns the miner's address
MinerAddressGetter func() address.Address
streamMu sync.Mutex
stream net.Stream
}
// HeartbeatServiceOption is the type of the heartbeat service's functional options.
type HeartbeatServiceOption func(service *HeartbeatService)
5.2.2 방법
// Stream returns the HeartbeatService stream. Safe for concurrent access.
// Stream is a libp2p connection that heartbeat messages are sent over to an aggregator.
func (hbs *HeartbeatService) Stream() net.Stream {
hbs.streamMu.Lock()
defer hbs.streamMu.Unlock()
return hbs.stream
}
// SetStream sets the stream on the HeartbeatService. Safe for concurrent access.
func (hbs *HeartbeatService) SetStream(s net.Stream) {
hbs.streamMu.Lock()
defer hbs.streamMu.Unlock()
hbs.stream = s
}
// Start starts the heartbeat service by, starting the connection loop. The connection
// loop will attempt to connected to the aggregator service, once a successful
// connection is made with the aggregator service hearbeats will be sent to it.
// If the connection is broken the heartbeat service will attempt to reconnect via
// the connection loop. Start will not return until context `ctx` is 'Done'.
func (hbs *HeartbeatService) Start(ctx context.Context) {
log.Debug("starting heartbeat service")
rd, err := time.ParseDuration(hbs.Config.ReconnectPeriod)
if err != nil {
log.Errorf("invalid heartbeat reconnectPeriod: %s", err)
return
}
//
reconTicker := time.NewTicker(rd)
defer reconTicker.Stop()
// Timestamp of the first connection failure since the last successful connection.
// Zero initially and while connected.
var failedAt time.Time
// Timestamp of the last ERROR log (or of failure, before the first ERROR log).
var erroredAt time.Time
for {
select {
case connectionFailureErrorLogPeriodMinutes {
logfn = log.Errorf
erroredAt = now // Reset the timer
}
failureDuration := now.Sub(failedAt)
logfn("Heartbeat service failed to connect for %s: %s", failureDuration, err)
// failed to connect, continue reconnect loop
continue
}
failedAt = time.Time{}
// we connected, send heartbeats!
// Run will block until it fails to send a heartbeat.
// ,
if err := hbs.Run(ctx); err != nil {
log.Warning("disconnecting from aggregator, failed to send heartbeat")
continue
}
}
}
}
// Run is called once the heartbeat service connects to the aggregator. Run
// send the actual heartbeat. Run will block until `ctx` is 'Done`. An error will
// be returned if Run encounters an error when sending the heartbeat and the connection
// to the aggregator will be closed.
func (hbs *HeartbeatService) Run(ctx context.Context) error {
bd, err := time.ParseDuration(hbs.Config.BeatPeriod)
if err != nil {
log.Errorf("invalid heartbeat beatPeriod: %s", err)
return err
}
//
beatTicker := time.NewTicker(bd)
defer beatTicker.Stop()
// encoder
// TODO use cbor instead of json
encoder := json.NewEncoder(hbs.stream)
for {
select {
case
// Beat will create a heartbeat.
func (hbs *HeartbeatService) Beat() Heartbeat {
nick := hbs.Config.Nickname
ts := hbs.HeadGetter()
tipset := ts.ToSortedCidSet().String()
height, err := ts.Height()
if err != nil {
log.Warningf("heartbeat service failed to get chain height: %s", err)
}
addr := hbs.MinerAddressGetter()
return Heartbeat{
Head: tipset,
Height: height,
Nickname: nick,
MinerAddress: addr,
}
}
// Connect will connects to `hbs.Config.BeatTarget` or returns an error
func (hbs *HeartbeatService) Connect(ctx context.Context) error {
log.Debugf("Heartbeat service attempting to connect, targetAddress: %s", hbs.Config.BeatTarget)
targetMaddr, err := ma.NewMultiaddr(hbs.Config.BeatTarget)
if err != nil {
return err
}
pid, err := targetMaddr.ValueForProtocol(ma.P_P2P)
if err != nil {
return err
}
peerid, err := peer.IDB58Decode(pid)
if err != nil {
return err
}
// Decapsulate the /p2p/ part from the target
// /ip4//p2p/ becomes /ip4/
targetPeerAddr, _ := ma.NewMultiaddr(
fmt.Sprintf("/p2p/%s", peer.IDB58Encode(peerid)))
targetAddr := targetMaddr.Decapsulate(targetPeerAddr)
hbs.Host.Peerstore().AddAddr(peerid, targetAddr, pstore.PermanentAddrTTL)
//
s, err := hbs.Host.NewStream(ctx, peerid, HeartbeatProtocol)
if err != nil {
log.Debugf("failed to open stream, peerID: %s, targetAddr: %s %s", peerid, targetAddr, err)
return err
}
log.Infof("successfully to open stream, peerID: %s, targetAddr: %s", peerid, targetAddr)
//
hbs.SetStream(s)
return nil
}
5.2.3 함수
// WithMinerAddressGetter returns an option that can be used to set the miner address getter.
func WithMinerAddressGetter(ag func() address.Address) HeartbeatServiceOption {
return func(service *HeartbeatService) {
service.MinerAddressGetter = ag
}
}
func defaultMinerAddressGetter() address.Address {
return address.Address{}
}
// NewHeartbeatService returns a HeartbeatService
func NewHeartbeatService(h host.Host, hbc *config.HeartbeatConfig, hg func() types.TipSet, options ...HeartbeatServiceOption) *HeartbeatService {
srv := &HeartbeatService{
Host: h,
Config: hbc,
HeadGetter: hg,
MinerAddressGetter: defaultMinerAddressGetter,
}
// ,
for _, option := range options {
option(srv)
}
return srv
}
5.2.4 실례 화 및 업무 논리
// Start boots up the node.
func (node *Node) Start(ctx context.Context) error {
......
if err := node.setupHeartbeatServices(ctx); err != nil {
return errors.Wrap(err, "failed to start heartbeat services")
}
return nil
}
func (node *Node) setupHeartbeatServices(ctx context.Context) error {
// “ ”
mag := func() address.Address {
addr, err := node.miningAddress()
// the only error miningAddress() returns is ErrNoMinerAddress.
// if there is no configured miner address, simply send a zero
// address across the wire.
if err != nil {
return address.Address{}
}
return addr
}
// ,
// start the primary heartbeat service
if len(node.Repo.Config().Heartbeat.BeatTarget) > 0 {
// metrics 、
hbs := metrics.NewHeartbeatService(node.Host(), node.Repo.Config().Heartbeat, node.ChainReader.Head, metrics.WithMinerAddressGetter(mag))
go hbs.Start(ctx)
}
// ( ), , 。
// check if we want to connect to an alert service. An alerting service is a heartbeat
// service that can trigger alerts based on the contents of heatbeats.
if alertTarget := os.Getenv("FIL_HEARTBEAT_ALERTS"); len(alertTarget) > 0 {
ahbs := metrics.NewHeartbeatService(node.Host(), &config.HeartbeatConfig{
BeatTarget: alertTarget,
BeatPeriod: "10s",
ReconnectPeriod: "10s",
Nickname: node.Repo.Config().Heartbeat.Nickname,
}, node.ChainReader.Head, metrics.WithMinerAddressGetter(mag))
go ahbs.Start(ctx)
}
return nil
}
[이전 링크] filecoin 기술 구조 분석의 4: 4 filecoin 소스 코드 맨 위 구조 분석 [다음 링크] filecoin 기술 구조 분석의 6: 6 filecoin 소스 코드 프로 토 콜 층 분석의 hello 악수 프로 토 콜
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
다양한 언어의 JSONJSON은 Javascript 표기법을 사용하여 데이터 구조를 레이아웃하는 데이터 형식입니다. 그러나 Javascript가 코드에서 이러한 구조를 나타낼 수 있는 유일한 언어는 아닙니다. 저는 일반적으로 '객체'{}...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.