bitswapNetwork에서 bitswap에서libp2를 어떻게 사용하는지 배우기
전언
bitswapNetwork는 bitswap의 네트워크 인터페이스로 bitswap의 네트워크 통신과dht 호출을 담당한다. bitswapNetwork에서 bitswap가libp2를 어떻게 사용하는지 알 수 있다.
본고를 읽고bitswap의 코드를 보며libp2의host,swarm,conn과stream에 대한 기초 지식을 이해하는 것이 좋습니다.코드를 보지 않았다면, 메인 라인을 보면libp2p를 어떻게 사용하는지 배울 수 있습니다.
bitswapNetwork
// NewFromIpfsHost returns a BitSwapNetwork supported by underlying IPFS host.
func NewFromIpfsHost(host host.Host, r routing.ContentRouting, opts ...NetOpt) BitSwapNetwork {
s := Settings{}
for _, opt := range opts {
opt(&s)
}
bitswapNetwork := impl{
host: host,
routing: r,
protocolBitswap: s.ProtocolPrefix + ProtocolBitswap,
protocolBitswapOne: s.ProtocolPrefix + ProtocolBitswapOne,
protocolBitswapNoVers: s.ProtocolPrefix + ProtocolBitswapNoVers,
}
return &bitswapNetwork
}
// impl transforms the ipfs network interface, which sends and receives
// NetMessage objects, into the bitswap network interface.
type impl struct {
host host.Host
routing routing.ContentRouting
protocolBitswap protocol.ID
protocolBitswapOne protocol.ID
protocolBitswapNoVers protocol.ID
// inbound messages from the network are forwarded to the receiver
receiver Receiver
stats Stats
}
bitswap의 네트워크 모듈은 주로host와routing을 포함하고host는 네트워크 통신에 사용되며 주로 연결의 생성과 흐름의 생성, 수발과 닫기를 포함한다.routing은 dht를 호출하는 데 사용됩니다.
receiver는 비트맵입니다. 비트맵을 호출하는 데 주로 사용되는 Receive Message ()/Peer Connected ()/Peer Disconnected ()
또한 몇 가지 버전의bitswap 통신 메시지 구조체에 변화가 생겼기 때문에 네트워크 모듈에는 3개의bitswap 프로토콜 번호가 포함되어 있어 처리 서열화와 반서열화(protobuf)를 구별하는 데 사용된다.
host
host 이 블록은 주로conn과stream을 포함합니다.
conn
func (bsnet *impl) ConnectTo(ctx context.Context, p peer.ID) error {
return bsnet.host.Connect(ctx, peer.AddrInfo{ID: p})
}
세션이dht 네트워크에서provider를 찾으면providerQueryManager는ConnectTo theseprovider를 담당합니다.
// providermanager.go
func (pqm *ProviderQueryManager) findProviderWorker() {
// findProviderWorker just cycles through incoming provider queries one
// at a time. We have six of these workers running at once
// to let requests go in parallel but keep them rate limited
for {
select {
case fpr, ok := pqm.providerRequestsProcessing:
if !ok {
return
}
k := fpr.k
log.Debugf("Beginning Find Provider Request for cid: %s", k.String())
pqm.timeoutMutex.RLock()
findProviderCtx, cancel := context.WithTimeout(fpr.ctx, pqm.findProviderTimeout)
pqm.timeoutMutex.RUnlock()
providers := pqm.network.FindProvidersAsync(findProviderCtx, k, maxProviders)
wg := &sync.WaitGroup{}
for p := range providers {
wg.Add(1)
go func(p peer.ID) {
defer wg.Done()
err := pqm.network.ConnectTo(findProviderCtx, p)
if err != nil {
log.Debugf("failed to connect to provider %s: %s", p, err)
return
}
select {
case pqm.providerQueryMessages &receivedProviderMessage{
k: k,
p: p,
}:
case pqm.ctx.Done():
return
}
}(p)
}
wg.Wait()
cancel()
select {
case pqm.providerQueryMessages &finishedProviderQueryMessage{
k: k,
}:
case pqm.ctx.Done():
}
case pqm.ctx.Done():
return
}
}
}
wantmanager는peerHandler(peermanager 대상)를 통해 연결된peer마다 메시지 queue를 유지합니다. 메시지 queue가 있으면.sender가 비어 있으면 OpenSender를 호출하여 연결과 흐름을 만듭니다.
// messagequeue.go
func openSender(ctx context.Context, network MessageNetwork, p peer.ID) (bsnet.MessageSender, error) {
// allow ten minutes for connections this includes looking them up in the
// dht dialing them, and handshaking
conctx, cancel := context.WithTimeout(ctx, time.Minute*10)
defer cancel()
err := network.ConnectTo(conctx, p)
if err != nil {
return nil, err
}
nsender, err := network.NewMessageSender(ctx, p)
if err != nil {
return nil, err
}
return nsender, nil
}
stream
new
앞에서 말한 OpenSender () 는 New Message Sender를 호출하여 흐름을 만들고 새로 만든 흐름은 stream Message Sender가 존재합니다.s중, 뒤에 사용하면 더 이상 흐름을 만들 필요가 없습니다.
다음에 소개할 SendMessage()는 new StreamToPeer를 직접 호출하여 흐름을 만듭니다.
func (bsnet *impl) NewMessageSender(ctx context.Context, p peer.ID) (MessageSender, error) {
s, err := bsnet.newStreamToPeer(ctx, p)
if err != nil {
return nil, err
}
return &streamMessageSender{s: s, bsnet: bsnet}, nil
}
func (bsnet *impl) newStreamToPeer(ctx context.Context, p peer.ID) (network.Stream, error) {
return bsnet.host.NewStream(ctx, p, bsnet.protocolBitswap, bsnet.protocolBitswapOne, bsnet.protocolBitswapNoVers)
}
Inbound
우선 SetStreamHandler, 다른peer의stream write를 받았을 때handleNewstream () 함수를 호출합니다.세 버전 모두 같은handler 함수로 메시지 프로토 구조체의 해석과 다르다.
func (bsnet *impl) SetDelegate(r Receiver) {
bsnet.receiver = r
bsnet.host.SetStreamHandler(bsnet.protocolBitswap, bsnet.handleNewStream)
bsnet.host.SetStreamHandler(bsnet.protocolBitswapOne, bsnet.handleNewStream)
bsnet.host.SetStreamHandler(bsnet.protocolBitswapNoVers, bsnet.handleNewStream)
bsnet.host.Network().Notify((*netNotifiee)(bsnet))
// TODO: StopNotify.
}
// handleNewStream receives a new stream from the network.
func (bsnet *impl) handleNewStream(s network.Stream) {
defer s.Close()
if bsnet.receiver == nil {
_ = s.Reset()
return
}
reader := msgio.NewVarintReaderSize(s, network.MessageSizeMax)
for {
received, err := bsmsg.FromMsgReader(reader)
if err != nil {
if err != io.EOF {
_ = s.Reset()
go bsnet.receiver.ReceiveError(err)
log.Debugf("bitswap net handleNewStream from %s error: %s", s.Conn().RemotePeer(), err)
}
return
}
p := s.Conn().RemotePeer()
ctx := context.Background()
log.Debugf("bitswap net handleNewStream from %s", s.Conn().RemotePeer())
bsnet.receiver.ReceiveMessage(ctx, p, received)
atomic.AddUint64(&bsnet.stats.MessagesRecvd, 1)
}
}
Outbound
MessageSender 객체는 SendMsg()를 호출하여 wantlist를 보내고, SendMsg()는 msgToStream()을 호출하며, msgToStream은 메시지를 proto 형식으로 변환한 후 stream write를 보냅니다.
type streamMessageSender struct {
s network.Stream
bsnet *impl
}
func (s *streamMessageSender) Close() error {
return helpers.FullClose(s.s)
}
func (s *streamMessageSender) Reset() error {
return s.s.Reset()
}
func (s *streamMessageSender) SendMsg(ctx context.Context, msg bsmsg.BitSwapMessage) error {
return s.bsnet.msgToStream(ctx, s.s, msg)
}
var sendMessageTimeout = time.Minute * 10
func (bsnet *impl) msgToStream(ctx context.Context, s network.Stream, msg bsmsg.BitSwapMessage) error {
deadline := time.Now().Add(sendMessageTimeout)
if dl, ok := ctx.Deadline(); ok {
deadline = dl
}
if err := s.SetWriteDeadline(deadline); err != nil {
log.Warningf("error setting deadline: %s", err)
}
switch s.Protocol() {
case bsnet.protocolBitswap:
if err := msg.ToNetV1(s); err != nil {
log.Debugf("error: %s", err)
return err
}
case bsnet.protocolBitswapOne, bsnet.protocolBitswapNoVers:
if err := msg.ToNetV0(s); err != nil {
log.Debugf("error: %s", err)
return err
}
default:
return fmt.Errorf("unrecognized protocol on remote: %s", s.Protocol())
}
if err := s.SetWriteDeadline(time.Time{}); err != nil {
log.Warningf("error resetting deadline: %s", err)
}
return nil
}
SendMessage () 는 new stream, write, 마지막close, 다음 호출은 흐름을 다시 만들어야 합니다.주로 Block을 보내는 데 사용됩니다.
func (bsnet *impl) SendMessage(
ctx context.Context,
p peer.ID,
outgoing bsmsg.BitSwapMessage) error {
s, err := bsnet.newStreamToPeer(ctx, p)
if err != nil {
return err
}
if err = bsnet.msgToStream(ctx, s, outgoing); err != nil {
_ = s.Reset()
return err
}
atomic.AddUint64(&bsnet.stats.MessagesSent, 1)
// TODO(https://github.com/libp2p/go-libp2p-net/issues/28): Avoid this goroutine.
//nolint
go helpers.AwaitEOF(s)
return s.Close()
}
routing
routing은dht를 통해 네트워크provide와findprovider에 접근합니다.
bitswap에서 원하는 블록을 받았을 때provide Enabled가 있으면Provide () 를 호출하여 네트워크에provider를 발표합니다.
세션이wantmanager를 통해wantlist 요청을 보내고 일정 시간 원하는 Block을 얻지 못할 때,session는providerQuery Manager를 통해 네트워크findprovider,providerQuery Manager가providers를 받을 때providerQuery Manager와 연결을 만듭니다.
// FindProvidersAsync returns a channel of providers for the given key.
func (bsnet *impl) FindProvidersAsync(ctx context.Context, k cid.Cid, max int) chan peer.ID {
out := make(chan peer.ID, max)
go func() {
defer close(out)
providers := bsnet.routing.FindProvidersAsync(ctx, k, max)
for info := range providers {
if info.ID == bsnet.host.ID() {
continue // ignore self as provider
}
bsnet.host.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.TempAddrTTL)
select {
case ctx.Done():
return
case out info.ID:
}
}
}()
return out
}
// Provide provides the key to the network
func (bsnet *impl) Provide(ctx context.Context, k cid.Cid) error {
return bsnet.routing.Provide(ctx, k, true)
}
Notifiee
Notifiee에는 모두 6개의 인터페이스가 있는데, 호출 시기의 주석이 명확하게 쓰여 있어, 여기서는 더 이상 군말하지 않는다.
// Notifiee is an interface for an object wishing to receive
// notifications from a Network.
type Notifiee interface {
Listen(Network, ma.Multiaddr) // called when network starts listening on an addr
ListenClose(Network, ma.Multiaddr) // called when network stops listening on an addr
Connected(Network, Conn) // called when a connection opened
Disconnected(Network, Conn) // called when a connection closed
OpenedStream(Network, Stream) // called when a stream opened
ClosedStream(Network, Stream) // called when a stream closed
하나의case: 이 노드에 새로운 연결이 있을 때 (listen이든 dial이든) swarm(libp2swarm addConn)은 notify All을 호출하여 모든 Notify의 대상(Notifiee 인터페이스를 실현)에게 새로운connection이 있음을 알립니다. BitSwapNetwork는 Notifiee 인터페이스 인터페이스를 실현하고 SetDelegate () 함수 Notify를 통해 Connected 메시지를 받을 수 있습니다.
func (bsnet *impl) SetDelegate(r Receiver) {
bsnet.receiver = r
bsnet.host.SetStreamHandler(bsnet.protocolBitswap, bsnet.handleNewStream)
bsnet.host.SetStreamHandler(bsnet.protocolBitswapOne, bsnet.handleNewStream)
bsnet.host.SetStreamHandler(bsnet.protocolBitswapNoVers, bsnet.handleNewStream)
bsnet.host.Network().Notify((*netNotifiee)(bsnet))
// TODO: StopNotify.
}
type netNotifiee impl
func (nn *netNotifiee) Connected(n network.Network, v network.Conn) {
nn.impl().receiver.PeerConnected(v.RemotePeer())
}
func (nn *netNotifiee) Disconnected(n network.Network, v network.Conn) {
nn.impl().receiver.PeerDisconnected(v.RemotePeer())
}
func (nn *netNotifiee) OpenedStream(n network.Network, v network.Stream) {}
func (nn *netNotifiee) ClosedStream(n network.Network, v network.Stream) {}
func (nn *netNotifiee) Listen(n network.Network, a ma.Multiaddr) {}
func (nn *netNotifiee) ListenClose(n network.Network, a ma.Multiaddr) {}
연결할 때bitswap은 egine와wantmanager에게 통지하고 egine는 통지를 받은 후 새로운 노드에 장부를 작성합니다.wantmanager는 새 노드를 자신의 peerqueue에 추가해서 새로운 peer에게 brocast에 저장되어 있는 wantlists(bs.wantmanager.bcwl)가 있는지 물어본다.
// PeerConnected is called by the network interface
// when a peer initiates a new connection to bitswap.
func (bs *Bitswap) PeerConnected(p peer.ID) {
bs.wm.Connected(p)
bs.engine.PeerConnected(p)
}
// PeerDisconnected is called by the network interface when a peer
// closes a connection
func (bs *Bitswap) PeerDisconnected(p peer.ID) {
bs.wm.Disconnected(p)
bs.engine.PeerDisconnected(p)
}
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Gitpod x 블록체인Gitpod에서 하드 설정을 제거하여 블록체인 개발 시간을 절약하세요. 먼저 이 저장소 를 살펴보겠습니다. 이 프로젝트는 으로 빌드되었으며 하위 그래프에 전원 인덱서도 있습니다. 이 프로젝트는 openzeppelin...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.