btcd 원본 분석 시리즈: 4 - p2p 네트워크의peer
1. peer 객체 만들기
peer는 구체적으로 두 가지로 나뉜다.
// NewInboundPeer returns a new inbound bitcoin peer. Use Start to begin
// processing incoming and outgoing messages.
func NewInboundPeer(cfg *Config) *Peer {
return newPeerBase(cfg, true)
}
// NewOutboundPeer returns a new outbound bitcoin peer.
func NewOutboundPeer(cfg *Config, addr string) (*Peer, error) {
p := newPeerBase(cfg, false)
p.addr = addr
host, portStr, err := net.SplitHostPort(addr)
if err != nil {
return nil, err
}
port, err := strconv.ParseUint(portStr, 10, 16)
if err != nil {
return nil, err
}
if cfg.HostToNetAddress != nil {
na, err := cfg.HostToNetAddress(host, uint16(port), 0)
if err != nil {
return nil, err
}
p.na = na
} else {
p.na = wire.NewNetAddressIPPort(net.ParseIP(host), uint16(port), 0)
}
return p, nil
}
// newPeerBase returns a new base bitcoin peer based on the inbound flag. This
// is used by the NewInboundPeer and NewOutboundPeer functions to perform base
// setup needed by both types of peers.
func newPeerBase(origCfg *Config, inbound bool) *Peer {
// Default to the max supported protocol version if not specified by the
// caller.
cfg := *origCfg // Copy to avoid mutating caller.
if cfg.ProtocolVersion == 0 {
cfg.ProtocolVersion = MaxProtocolVersion
}
// Set the chain parameters to testnet if the caller did not specify any.
if cfg.ChainParams == nil {
cfg.ChainParams = &chaincfg.TestNet3Params
}
// Set the trickle interval if a non-positive value is specified.
if cfg.TrickleInterval <= 0 {
cfg.TrickleInterval = DefaultTrickleInterval
}
p := Peer{
inbound: inbound,
wireEncoding: wire.BaseEncoding,
knownInventory: newMruInventoryMap(maxKnownInventory),
stallControl: make(chan stallControlMsg, 1), // nonblocking sync
outputQueue: make(chan outMsg, outputBufferSize),
sendQueue: make(chan outMsg, 1), // nonblocking sync
sendDoneQueue: make(chan struct{}, 1), // nonblocking sync
outputInvChan: make(chan *wire.InvVect, outputBufferSize),
inQuit: make(chan struct{}),
queueQuit: make(chan struct{}),
outQuit: make(chan struct{}),
quit: make(chan struct{}),
cfg: cfg, // Copy so caller can't mutate.
services: cfg.Services,
protocolVersion: cfg.ProtocolVersion,
}
return &p
}
2. 시동peer
시작 순서
// start begins processing input and output messages.
func (p *Peer) start() error {
log.Tracef("Starting peer %s", p)
negotiateErr := make(chan error, 1)
go func() {
if p.inbound {
negotiateErr
3. 교환 프로토콜 버젼 (inbound 단말기)
주요 프로세스:
// negotiateInboundProtocol waits to receive a version message from the peer
// then sends our version message. If the events do not occur in that order then
// it returns an error.
func (p *Peer) negotiateInboundProtocol() error {
if err := p.readRemoteVersionMsg(); err != nil {
return err
}
return p.writeLocalVersionMsg()
}
// readRemoteVersionMsg waits for the next message to arrive from the remote
// peer. If the next message is not a version message or the version is not
// acceptable then return an error.
func (p *Peer) readRemoteVersionMsg() error {
// Read their version message.
remoteMsg, _, err := p.readMessage(wire.LatestEncoding)
if err != nil {
return err
}
// Notify and disconnect clients if the first message is not a version
// message.
msg, ok := remoteMsg.(*wire.MsgVersion)
if !ok {
reason := "a version message must precede all others"
rejectMsg := wire.NewMsgReject(msg.Command(), wire.RejectMalformed,
reason)
_ = p.writeMessage(rejectMsg, wire.LatestEncoding)
return errors.New(reason)
}
// Detect self connections.
if !allowSelfConns && sentNonces.Exists(msg.Nonce) {
return errors.New("disconnecting peer connected to self")
}
// Negotiate the protocol version and set the services to what the remote
// peer advertised.
p.flagsMtx.Lock()
p.advertisedProtoVer = uint32(msg.ProtocolVersion)
p.protocolVersion = minUint32(p.protocolVersion, p.advertisedProtoVer)
p.versionKnown = true
p.services = msg.Services
p.flagsMtx.Unlock()
log.Debugf("Negotiated protocol version %d for peer %s",
p.protocolVersion, p)
// Updating a bunch of stats including block based stats, and the
// peer's time offset.
p.statsMtx.Lock()
p.lastBlock = msg.LastBlock
p.startingHeight = msg.LastBlock
p.timeOffset = msg.Timestamp.Unix() - time.Now().Unix()
p.statsMtx.Unlock()
// Set the peer's ID, user agent, and potentially the flag which
// specifies the witness support is enabled.
p.flagsMtx.Lock()
p.id = atomic.AddInt32(&nodeCount, 1)
p.userAgent = msg.UserAgent
// Determine if the peer would like to receive witness data with
// transactions, or not.
if p.services&wire.SFNodeWitness == wire.SFNodeWitness {
p.witnessEnabled = true
}
p.flagsMtx.Unlock()
// Once the version message has been exchanged, we're able to determine
// if this peer knows how to encode witness data over the wire
// protocol. If so, then we'll switch to a decoding mode which is
// prepared for the new transaction format introduced as part of
// BIP0144.
if p.services&wire.SFNodeWitness == wire.SFNodeWitness {
p.wireEncoding = wire.WitnessEncoding
}
// Invoke the callback if specified.
if p.cfg.Listeners.OnVersion != nil {
rejectMsg := p.cfg.Listeners.OnVersion(p, msg)
if rejectMsg != nil {
_ = p.writeMessage(rejectMsg, wire.LatestEncoding)
return errors.New(rejectMsg.Reason)
}
}
// Notify and disconnect clients that have a protocol version that is
// too old.
//
// NOTE: If minAcceptableProtocolVersion is raised to be higher than
// wire.RejectVersion, this should send a reject packet before
// disconnecting.
if uint32(msg.ProtocolVersion) < MinAcceptableProtocolVersion {
// Send a reject message indicating the protocol version is
// obsolete and wait for the message to be sent before
// disconnecting.
reason := fmt.Sprintf("protocol version must be %d or greater",
MinAcceptableProtocolVersion)
rejectMsg := wire.NewMsgReject(msg.Command(), wire.RejectObsolete,
reason)
_ = p.writeMessage(rejectMsg, wire.LatestEncoding)
return errors.New(reason)
}
return nil
}
메시지 보내기
// QueueMessage adds the passed bitcoin message to the peer send queue.
//
// This function is safe for concurrent access.
func (p *Peer) QueueMessage(msg wire.Message, doneChan chan
p := Peer{
inbound: inbound,
wireEncoding: wire.BaseEncoding,
knownInventory: newMruInventoryMap(maxKnownInventory),
stallControl: make(chan stallControlMsg, 1), // nonblocking sync
outputQueue: make(chan outMsg, outputBufferSize),
sendQueue: make(chan outMsg, 1), // nonblocking sync
sendDoneQueue: make(chan struct{}, 1), // nonblocking sync
outputInvChan: make(chan *wire.InvVect, outputBufferSize),
inQuit: make(chan struct{}),
queueQuit: make(chan struct{}),
outQuit: make(chan struct{}),
quit: make(chan struct{}),
cfg: cfg, // Copy so caller can't mutate.
services: cfg.Services,
protocolVersion: cfg.ProtocolVersion,
}
메시지 수신
// inHandler handles all incoming messages for the peer. It must be run as a
// goroutine.
func (p *Peer) inHandler() {
// The timer is stopped when a new message is received and reset after it
// is processed.
idleTimer := time.AfterFunc(idleTimeout, func() {
log.Warnf("Peer %s no answer for %s -- disconnecting", p, idleTimeout)
p.Disconnect()
})
out:
for atomic.LoadInt32(&p.disconnect) == 0 {
// Read a message and stop the idle timer as soon as the read
// is done. The timer is reset below for the next iteration if
// needed.
rmsg, buf, err := p.readMessage(p.wireEncoding)
idleTimer.Stop()
if err != nil {
// In order to allow regression tests with malformed messages, don't
// disconnect the peer when we're in regression test mode and the
// error is one of the allowed errors.
if p.isAllowedReadError(err) {
log.Errorf("Allowed test error from %s: %v", p, err)
idleTimer.Reset(idleTimeout)
continue
}
// Only log the error and send reject message if the
// local peer is not forcibly disconnecting and the
// remote peer has not disconnected.
if p.shouldHandleReadError(err) {
errMsg := fmt.Sprintf("Can't read message from %s: %v", p, err)
if err != io.ErrUnexpectedEOF {
log.Errorf(errMsg)
}
// Push a reject message for the malformed message and wait for
// the message to be sent before disconnecting.
//
// NOTE: Ideally this would include the command in the header if
// at least that much of the message was valid, but that is not
// currently exposed by wire, so just used malformed for the
// command.
p.PushRejectMsg("malformed", wire.RejectMalformed, errMsg, nil,
true)
}
break out
}
atomic.StoreInt64(&p.lastRecv, time.Now().Unix())
p.stallControl
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
다양한 언어의 JSONJSON은 Javascript 표기법을 사용하여 데이터 구조를 레이아웃하는 데이터 형식입니다. 그러나 Javascript가 코드에서 이러한 구조를 나타낼 수 있는 유일한 언어는 아닙니다. 저는 일반적으로 '객체'{}...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.