킹 버스 의 스타 래 프 트 에 대해 서 얘 기해 볼 게 요.

9194 단어 kingbus
순서.
본 고 는 주로 킹 버스 의 스타 래 프 트 를 연구 하고 자 합 니 다.
starRaft
kingbus/server/server.go
func (s *KingbusServer) starRaft(cfg config.RaftNodeConfig) error {
    var (
        etcdRaftNode etcdraft.Node
        id           types.ID
        cl           *membership.RaftCluster
        remotes      []*membership.Member
        appliedIndex uint64
    )

    prt, err := rafthttp.NewRoundTripper(transport.TLSInfo{}, DialTimeout)
    if err != nil {
        return err
    }

    store, err := storage.NewDiskStorage(cfg.DataDir, cfg.ReserveDataSize)
    if err != nil {
        log.Log.Fatalf("NewKingbusServer:NewDiskStorage error,err:%s,dir:%s", err.Error(), cfg.DataDir)
    }

    //store, err := storage.NewMemoryStorage(cfg.DataDir)
    //if err != nil {
    //    log.Log.Fatalf("NewKingbusServer:NewMemoryStorage error,err:%s,dir:%s", err.Error(), cfg.DataDir)
    //}

    defer func() {
        //close storage when occur error
        if err != nil {
            store.Close()
        }
    }()

    logExist := utils.ExistLog(cfg.DataDir)
    switch {
    case !logExist && !cfg.NewCluster:
        if err = cfg.VerifyJoinExisting(); err != nil {
            return err
        }
        cl, err = membership.NewClusterFromURLsMap(cfg.InitialPeerURLsMap)
        if err != nil {
            return err
        }
        remotePeerURLs := membership.GetRemotePeerURLs(cl, cfg.Name)
        existingCluster, gerr := membership.GetClusterFromRemotePeers(remotePeerURLs, prt)
        if gerr != nil {
            return fmt.Errorf("cannot fetch cluster info from peer urls: %v", gerr)
        }
        if err = membership.ValidateClusterAndAssignIDs(cl, existingCluster); err != nil {
            return fmt.Errorf("error validating peerURLs %s: %v", existingCluster, err)
        }

        remotes = existingCluster.Members()
        cl.SetID(existingCluster.GetID())
        cl.SetStore(store)
        id, etcdRaftNode = startEtcdRaftNode(cfg, store, cl, nil)
    case !logExist && cfg.NewCluster:
        if err = cfg.VerifyBootstrap(); err != nil {
            return err
        }
        cl, err = membership.NewClusterFromURLsMap(cfg.InitialPeerURLsMap)
        if err != nil {
            return err
        }
        m := cl.MemberByName(cfg.Name)
        if membership.IsMemberBootstrapped(cl, cfg.Name, prt, DialTimeout) {
            return fmt.Errorf("member %s has already been bootstrapped", m.ID)
        }

        cl.SetStore(store)
        id, etcdRaftNode = startEtcdRaftNode(cfg, store, cl, cl.MemberIDs())
    case logExist:
        if err = utils.IsDirWriteable(cfg.DataDir); err != nil {
            return fmt.Errorf("cannot write to member directory: %v", err)
        }
        //node restart, read states from storage
        //get applied index
        appliedIndex = raft.MustGetAppliedIndex(store)
        cfg.AppliedIndex = appliedIndex
        id, etcdRaftNode, cl = restartEtcdNode(cfg, store)
        cl.SetStore(store)
    default:
        return fmt.Errorf("unsupported bootstrap config")
    }

    s.raftNode = raft.NewNode(
        raft.NodeConfig{
            IsIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) },
            Node:        etcdRaftNode,
            Heartbeat:   cfg.HeartbeatMs,
            Storage:     store,
        },
    )
    //committedIndex,term will update by fsm(UpdateCommittedIndex,SetTerm)
    //set appliedIndex when applyEntries will check the entry continuity
    s.raftNode.SetAppliedIndex(appliedIndex)

    s.id = id
    s.wait = wait.New()
    s.reqIDGen = idutil.NewGenerator(uint16(id), time.Now())
    s.stopping = make(chan struct{})
    s.errorc = make(chan error)
    s.applyBroadcast = utils.NewBroadcast()
    s.stats = stats.NewServerStats(cfg.Name, id.String())
    s.lstats = stats.NewLeaderStats(id.String())
    s.store = store

    tr := &rafthttp.Transport{
        TLSInfo:     transport.TLSInfo{},
        DialTimeout: DialTimeout,
        ID:          id,
        URLs:        cfg.PeerURLs,
        ClusterID:   cl.GetID(),
        Raft:        s,
        ServerStats: s.stats,
        LeaderStats: s.lstats,
        ErrorC:      s.errorc,
    }
    if err = tr.Start(); err != nil {
        return err
    }
    // add all remotes into transport
    //Add remotes to rafthttp, who help newly joined members catch up the
    //progress of the cluster. It supports basic message sending to remote, and
    //has no stream connection for simplicity. remotes will not be used
    //after the latest peers have been added into rafthttp.
    for _, m := range remotes {
        if m.ID != id {
            tr.AddRemote(m.ID, m.PeerURLs)
        }
    }
    for _, m := range cl.Members() {
        if m.ID != id {
            tr.AddPeer(m.ID, m.PeerURLs)
        }
    }
    s.raftNode.Transport = tr
    s.cluster = cl

    return nil
}
  • starRaft 방법 은 raft http. NewRoundTripper 를 통 해 http. RoundTripper 를 만 든 다음 storage. NewDiskStorage 를 통 해 DiskStorage 를 만 든 다음 logExist 및 cfg. NewCluster 에 따라 다 릅 니 다.둘 다 false 이면 membership. Raft Cluster 의 id 를 존재 하 는 cluster 의 id 로 업데이트 하고 startEtcdRaft Node 를 실행 합 니 다.cfg. NewCluster 가 true 라면 cl. MemberIDs () 를 사용 하여 startEtcdRaft Node 를 실행 합 니 다.logExist 가 true 이면 restartEtcdNode 를 실행 합 니 다.마지막 으로 rafthttp. Transport 를 만 들 고 tr. Start (), tr. AddRemote, tr. AddPeer
  • 를 실행 합 니 다.
    startEtcdRaftNode
    kingbus/server/server.go
    func startEtcdRaftNode(cfg config.RaftNodeConfig, store storage.Storage, cl *membership.RaftCluster, ids []types.ID) (
        id types.ID, n etcdraft.Node) {
        member := cl.MemberByName(cfg.Name)
        peers := make([]etcdraft.Peer, len(ids))
    
        for i, id := range ids {
            ctx, err := json.Marshal((*cl).Member(id))
            if err != nil {
                log.Log.Panicf("marshal member should never fail: %v", err)
            }
            peers[i] = etcdraft.Peer{ID: uint64(id), Context: ctx}
        }
        id = member.ID
        log.Log.Infof("starting member %s in cluster %s", id, cl.GetID())
    
        c := &etcdraft.Config{
            ID:                        uint64(id),
            ElectionTick:              int(cfg.ElectionTimeoutMs / cfg.HeartbeatMs),
            HeartbeatTick:             1,
            Storage:                   store,
            MaxSizePerMsg:             cfg.MaxRequestBytes,
            MaxInflightMsgs:           maxInflightMsgs,
            CheckQuorum:               true,
            PreVote:                   cfg.PreVote,
            DisableProposalForwarding: true,
            Logger:                    log.Log,
        }
    
        n = etcdraft.StartNode(c, peers)
        raft.AdvanceTicks(n, c.ElectionTick)
        return id, n
    }
  • startEtcdRaft Node 방법 은 지정 한 ids 를 통 해 peers 를 만 든 후 etdraft. StartNode 및 raft. AdvanceTicks
  • 를 실행 합 니 다.
    restartEtcdNode
    kingbus/server/server.go
    func restartEtcdNode(cfg config.RaftNodeConfig, store storage.Storage) (
        types.ID, etcdraft.Node, *membership.RaftCluster) {
        cl, err := membership.GetRaftClusterFromStorage(store)
        if err != nil {
            if err != nil {
                log.Log.Panic("GetRaftClusterFromStorage error:%s", err.Error())
            }
        }
    
        log.Log.Debugf("restartEtcdNode:get raft cluster from storage,cluster:%v", cl.String())
    
        //get id from raftCluster
        member := cl.MemberByName(cfg.Name)
        if member == nil {
            log.Log.Fatalf("restartEtcdNode:member not in raft cluster,cluster:%v,memberName:%s",
                cl.String(), cfg.Name)
        }
        c := &etcdraft.Config{
            ID:                        uint64(member.ID),
            ElectionTick:              int(cfg.ElectionTimeoutMs / cfg.HeartbeatMs),
            HeartbeatTick:             1,
            Applied:                   cfg.AppliedIndex, //set appliedIndex
            Storage:                   store,
            MaxSizePerMsg:             cfg.MaxRequestBytes,
            MaxInflightMsgs:           maxInflightMsgs,
            CheckQuorum:               true,
            PreVote:                   cfg.PreVote,
            DisableProposalForwarding: true,
            Logger:                    log.Log,
        }
    
        n := etcdraft.RestartNode(c)
        return member.ID, n, cl
    }
  • restartEtcdNode 방법 은 membership. GetRaft ClusterFromStorage (store) 를 통 해 Raft Cluster 를 얻 은 후 cl. MemberByName (cfg. Name) 을 통 해 Member 를 얻 은 다음 member. ID 구조 etdraft. Config 를 사용 하여 마지막 으로 etdraft. Config 에 따라 etdraft. RestartNode
  • 를 실행 합 니 다.
    작은 매듭
    starRaft 방법 은 raft http. NewRoundTripper 를 통 해 http. RoundTripper 를 만 든 다음 storage. NewDiskStorage 를 통 해 DiskStorage 를 만 든 다음 logExist 및 cfg. NewCluster 에 따라 다 릅 니 다.둘 다 false 이면 membership. Raft Cluster 의 id 를 존재 하 는 cluster 의 id 로 업데이트 하고 startEtcdRaft Node 를 실행 합 니 다.cfg. NewCluster 가 true 라면 cl. MemberIDs () 를 사용 하여 startEtcdRaft Node 를 실행 합 니 다.logExist 가 true 이면 restartEtcdNode 를 실행 합 니 다.마지막 으로 rafthttp. Transport 를 만 들 고 tr. Start (), tr. AddRemote, tr. AddPeer 를 실행 합 니 다.
    doc
  • server.go
  • 좋은 웹페이지 즐겨찾기