bitswapNetwork에서 bitswap에서libp2를 어떻게 사용하는지 배우기

56555 단어 ipfslibp2p
코드 버전: [email protected]

전언


bitswapNetwork는 bitswap의 네트워크 인터페이스로 bitswap의 네트워크 통신과dht 호출을 담당한다. bitswapNetwork에서 bitswap가libp2를 어떻게 사용하는지 알 수 있다.
본고를 읽고bitswap의 코드를 보며libp2의host,swarm,conn과stream에 대한 기초 지식을 이해하는 것이 좋습니다.코드를 보지 않았다면, 메인 라인을 보면libp2p를 어떻게 사용하는지 배울 수 있습니다.

bitswapNetwork

// NewFromIpfsHost returns a BitSwapNetwork supported by underlying IPFS host.
func NewFromIpfsHost(host host.Host, r routing.ContentRouting, opts ...NetOpt) BitSwapNetwork {
	s := Settings{}
	for _, opt := range opts {
		opt(&s)
	}

	bitswapNetwork := impl{
		host:    host,
		routing: r,

		protocolBitswap:       s.ProtocolPrefix + ProtocolBitswap,
		protocolBitswapOne:    s.ProtocolPrefix + ProtocolBitswapOne,
		protocolBitswapNoVers: s.ProtocolPrefix + ProtocolBitswapNoVers,
	}
	return &bitswapNetwork
}

// impl transforms the ipfs network interface, which sends and receives
// NetMessage objects, into the bitswap network interface.
type impl struct {
	host    host.Host
	routing routing.ContentRouting

	protocolBitswap       protocol.ID
	protocolBitswapOne    protocol.ID
	protocolBitswapNoVers protocol.ID

	// inbound messages from the network are forwarded to the receiver
	receiver Receiver

	stats Stats
}

bitswap의 네트워크 모듈은 주로host와routing을 포함하고host는 네트워크 통신에 사용되며 주로 연결의 생성과 흐름의 생성, 수발과 닫기를 포함한다.routing은 dht를 호출하는 데 사용됩니다.
receiver는 비트맵입니다. 비트맵을 호출하는 데 주로 사용되는 Receive Message ()/Peer Connected ()/Peer Disconnected ()
또한 몇 가지 버전의bitswap 통신 메시지 구조체에 변화가 생겼기 때문에 네트워크 모듈에는 3개의bitswap 프로토콜 번호가 포함되어 있어 처리 서열화와 반서열화(protobuf)를 구별하는 데 사용된다.

host


host 이 블록은 주로conn과stream을 포함합니다.

conn

func (bsnet *impl) ConnectTo(ctx context.Context, p peer.ID) error {
	return bsnet.host.Connect(ctx, peer.AddrInfo{ID: p})
}

세션이dht 네트워크에서provider를 찾으면providerQueryManager는ConnectTo theseprovider를 담당합니다.
// providermanager.go
func (pqm *ProviderQueryManager) findProviderWorker() {
	// findProviderWorker just cycles through incoming provider queries one
	// at a time. We have six of these workers running at once
	// to let requests go in parallel but keep them rate limited
	for {
		select {
		case fpr, ok := pqm.providerRequestsProcessing:
			if !ok {
				return
			}
			k := fpr.k
			log.Debugf("Beginning Find Provider Request for cid: %s", k.String())
			pqm.timeoutMutex.RLock()
			findProviderCtx, cancel := context.WithTimeout(fpr.ctx, pqm.findProviderTimeout)
			pqm.timeoutMutex.RUnlock()
			providers := pqm.network.FindProvidersAsync(findProviderCtx, k, maxProviders)
			wg := &sync.WaitGroup{}
			for p := range providers {
				wg.Add(1)
				go func(p peer.ID) {
					defer wg.Done()
					err := pqm.network.ConnectTo(findProviderCtx, p)
					if err != nil {
						log.Debugf("failed to connect to provider %s: %s", p, err)
						return
					}
					select {
					case pqm.providerQueryMessages  &receivedProviderMessage{
						k: k,
						p: p,
					}:
					case pqm.ctx.Done():
						return
					}
				}(p)
			}
			wg.Wait()
			cancel()
			select {
			case pqm.providerQueryMessages  &finishedProviderQueryMessage{
				k: k,
			}:
			case pqm.ctx.Done():
			}
		case pqm.ctx.Done():
			return
		}
	}
}

wantmanager는peerHandler(peermanager 대상)를 통해 연결된peer마다 메시지 queue를 유지합니다. 메시지 queue가 있으면.sender가 비어 있으면 OpenSender를 호출하여 연결과 흐름을 만듭니다.
// messagequeue.go
func openSender(ctx context.Context, network MessageNetwork, p peer.ID) (bsnet.MessageSender, error) {
	// allow ten minutes for connections this includes looking them up in the
	// dht dialing them, and handshaking
	conctx, cancel := context.WithTimeout(ctx, time.Minute*10)
	defer cancel()

	err := network.ConnectTo(conctx, p)
	if err != nil {
		return nil, err
	}

	nsender, err := network.NewMessageSender(ctx, p)
	if err != nil {
		return nil, err
	}

	return nsender, nil
}

stream


new
앞에서 말한 OpenSender () 는 New Message Sender를 호출하여 흐름을 만들고 새로 만든 흐름은 stream Message Sender가 존재합니다.s중, 뒤에 사용하면 더 이상 흐름을 만들 필요가 없습니다.
다음에 소개할 SendMessage()는 new StreamToPeer를 직접 호출하여 흐름을 만듭니다.
func (bsnet *impl) NewMessageSender(ctx context.Context, p peer.ID) (MessageSender, error) {
	s, err := bsnet.newStreamToPeer(ctx, p)
	if err != nil {
		return nil, err
	}

	return &streamMessageSender{s: s, bsnet: bsnet}, nil
}

func (bsnet *impl) newStreamToPeer(ctx context.Context, p peer.ID) (network.Stream, error) {
	return bsnet.host.NewStream(ctx, p, bsnet.protocolBitswap, bsnet.protocolBitswapOne, bsnet.protocolBitswapNoVers)
}

Inbound
우선 SetStreamHandler, 다른peer의stream write를 받았을 때handleNewstream () 함수를 호출합니다.세 버전 모두 같은handler 함수로 메시지 프로토 구조체의 해석과 다르다.
func (bsnet *impl) SetDelegate(r Receiver) {
	bsnet.receiver = r
	bsnet.host.SetStreamHandler(bsnet.protocolBitswap, bsnet.handleNewStream)
	bsnet.host.SetStreamHandler(bsnet.protocolBitswapOne, bsnet.handleNewStream)
	bsnet.host.SetStreamHandler(bsnet.protocolBitswapNoVers, bsnet.handleNewStream)
	bsnet.host.Network().Notify((*netNotifiee)(bsnet))
	// TODO: StopNotify.

}

// handleNewStream receives a new stream from the network.
func (bsnet *impl) handleNewStream(s network.Stream) {
	defer s.Close()

	if bsnet.receiver == nil {
		_ = s.Reset()
		return
	}

	reader := msgio.NewVarintReaderSize(s, network.MessageSizeMax)
	for {
		received, err := bsmsg.FromMsgReader(reader)
		if err != nil {
			if err != io.EOF {
				_ = s.Reset()
				go bsnet.receiver.ReceiveError(err)
				log.Debugf("bitswap net handleNewStream from %s error: %s", s.Conn().RemotePeer(), err)
			}
			return
		}

		p := s.Conn().RemotePeer()
		ctx := context.Background()
		log.Debugf("bitswap net handleNewStream from %s", s.Conn().RemotePeer())
		bsnet.receiver.ReceiveMessage(ctx, p, received)
		atomic.AddUint64(&bsnet.stats.MessagesRecvd, 1)
	}
}

Outbound
MessageSender 객체는 SendMsg()를 호출하여 wantlist를 보내고, SendMsg()는 msgToStream()을 호출하며, msgToStream은 메시지를 proto 형식으로 변환한 후 stream write를 보냅니다.
type streamMessageSender struct {
	s     network.Stream
	bsnet *impl
}

func (s *streamMessageSender) Close() error {
	return helpers.FullClose(s.s)
}

func (s *streamMessageSender) Reset() error {
	return s.s.Reset()
}

func (s *streamMessageSender) SendMsg(ctx context.Context, msg bsmsg.BitSwapMessage) error {
	return s.bsnet.msgToStream(ctx, s.s, msg)
}

var sendMessageTimeout = time.Minute * 10

func (bsnet *impl) msgToStream(ctx context.Context, s network.Stream, msg bsmsg.BitSwapMessage) error {
	deadline := time.Now().Add(sendMessageTimeout)
	if dl, ok := ctx.Deadline(); ok {
		deadline = dl
	}

	if err := s.SetWriteDeadline(deadline); err != nil {
		log.Warningf("error setting deadline: %s", err)
	}

	switch s.Protocol() {
	case bsnet.protocolBitswap:
		if err := msg.ToNetV1(s); err != nil {
			log.Debugf("error: %s", err)
			return err
		}
	case bsnet.protocolBitswapOne, bsnet.protocolBitswapNoVers:
		if err := msg.ToNetV0(s); err != nil {
			log.Debugf("error: %s", err)
			return err
		}
	default:
		return fmt.Errorf("unrecognized protocol on remote: %s", s.Protocol())
	}

	if err := s.SetWriteDeadline(time.Time{}); err != nil {
		log.Warningf("error resetting deadline: %s", err)
	}
	return nil
}

SendMessage () 는 new stream, write, 마지막close, 다음 호출은 흐름을 다시 만들어야 합니다.주로 Block을 보내는 데 사용됩니다.
func (bsnet *impl) SendMessage(
	ctx context.Context,
	p peer.ID,
	outgoing bsmsg.BitSwapMessage) error {

	s, err := bsnet.newStreamToPeer(ctx, p)
	if err != nil {
		return err
	}

	if err = bsnet.msgToStream(ctx, s, outgoing); err != nil {
		_ = s.Reset()
		return err
	}
	atomic.AddUint64(&bsnet.stats.MessagesSent, 1)

	// TODO(https://github.com/libp2p/go-libp2p-net/issues/28): Avoid this goroutine.
	//nolint
	go helpers.AwaitEOF(s)
	return s.Close()

}

routing


routing은dht를 통해 네트워크provide와findprovider에 접근합니다.
bitswap에서 원하는 블록을 받았을 때provide Enabled가 있으면Provide () 를 호출하여 네트워크에provider를 발표합니다.
세션이wantmanager를 통해wantlist 요청을 보내고 일정 시간 원하는 Block을 얻지 못할 때,session는providerQuery Manager를 통해 네트워크findprovider,providerQuery Manager가providers를 받을 때providerQuery Manager와 연결을 만듭니다.
// FindProvidersAsync returns a channel of providers for the given key.
func (bsnet *impl) FindProvidersAsync(ctx context.Context, k cid.Cid, max int) chan peer.ID {
	out := make(chan peer.ID, max)
	go func() {
		defer close(out)
		providers := bsnet.routing.FindProvidersAsync(ctx, k, max)
		for info := range providers {
			if info.ID == bsnet.host.ID() {
				continue // ignore self as provider
			}
			bsnet.host.Peerstore().AddAddrs(info.ID, info.Addrs, peerstore.TempAddrTTL)
			select {
			case ctx.Done():
				return
			case out  info.ID:
			}
		}
	}()
	return out
}

// Provide provides the key to the network
func (bsnet *impl) Provide(ctx context.Context, k cid.Cid) error {
	return bsnet.routing.Provide(ctx, k, true)
}

Notifiee


Notifiee에는 모두 6개의 인터페이스가 있는데, 호출 시기의 주석이 명확하게 쓰여 있어, 여기서는 더 이상 군말하지 않는다.
// Notifiee is an interface for an object wishing to receive
// notifications from a Network.
type Notifiee interface {
	Listen(Network, ma.Multiaddr)      // called when network starts listening on an addr
	ListenClose(Network, ma.Multiaddr) // called when network stops listening on an addr
	Connected(Network, Conn)           // called when a connection opened
	Disconnected(Network, Conn)        // called when a connection closed
	OpenedStream(Network, Stream)      // called when a stream opened
	ClosedStream(Network, Stream)      // called when a stream closed

하나의case: 이 노드에 새로운 연결이 있을 때 (listen이든 dial이든) swarm(libp2swarm addConn)은 notify All을 호출하여 모든 Notify의 대상(Notifiee 인터페이스를 실현)에게 새로운connection이 있음을 알립니다. BitSwapNetwork는 Notifiee 인터페이스 인터페이스를 실현하고 SetDelegate () 함수 Notify를 통해 Connected 메시지를 받을 수 있습니다.
func (bsnet *impl) SetDelegate(r Receiver) {
	bsnet.receiver = r
	bsnet.host.SetStreamHandler(bsnet.protocolBitswap, bsnet.handleNewStream)
	bsnet.host.SetStreamHandler(bsnet.protocolBitswapOne, bsnet.handleNewStream)
	bsnet.host.SetStreamHandler(bsnet.protocolBitswapNoVers, bsnet.handleNewStream)
	bsnet.host.Network().Notify((*netNotifiee)(bsnet))
	// TODO: StopNotify.

}

type netNotifiee impl

func (nn *netNotifiee) Connected(n network.Network, v network.Conn) {
	nn.impl().receiver.PeerConnected(v.RemotePeer())
}

func (nn *netNotifiee) Disconnected(n network.Network, v network.Conn) {
	nn.impl().receiver.PeerDisconnected(v.RemotePeer())
}

func (nn *netNotifiee) OpenedStream(n network.Network, v network.Stream) {}
func (nn *netNotifiee) ClosedStream(n network.Network, v network.Stream) {}
func (nn *netNotifiee) Listen(n network.Network, a ma.Multiaddr)         {}
func (nn *netNotifiee) ListenClose(n network.Network, a ma.Multiaddr)    {}

연결할 때bitswap은 egine와wantmanager에게 통지하고 egine는 통지를 받은 후 새로운 노드에 장부를 작성합니다.wantmanager는 새 노드를 자신의 peerqueue에 추가해서 새로운 peer에게 brocast에 저장되어 있는 wantlists(bs.wantmanager.bcwl)가 있는지 물어본다.
// PeerConnected is called by the network interface
// when a peer initiates a new connection to bitswap.
func (bs *Bitswap) PeerConnected(p peer.ID) {
	bs.wm.Connected(p)
	bs.engine.PeerConnected(p)
}

// PeerDisconnected is called by the network interface when a peer
// closes a connection
func (bs *Bitswap) PeerDisconnected(p peer.ID) {
	bs.wm.Disconnected(p)
	bs.engine.PeerDisconnected(p)
}

좋은 웹페이지 즐겨찾기