킹 버스 의 스타 래 프 트 에 대해 서 얘 기해 볼 게 요.
9194 단어 kingbus
본 고 는 주로 킹 버스 의 스타 래 프 트 를 연구 하고 자 합 니 다.
starRaft
kingbus/server/server.go
func (s *KingbusServer) starRaft(cfg config.RaftNodeConfig) error {
var (
etcdRaftNode etcdraft.Node
id types.ID
cl *membership.RaftCluster
remotes []*membership.Member
appliedIndex uint64
)
prt, err := rafthttp.NewRoundTripper(transport.TLSInfo{}, DialTimeout)
if err != nil {
return err
}
store, err := storage.NewDiskStorage(cfg.DataDir, cfg.ReserveDataSize)
if err != nil {
log.Log.Fatalf("NewKingbusServer:NewDiskStorage error,err:%s,dir:%s", err.Error(), cfg.DataDir)
}
//store, err := storage.NewMemoryStorage(cfg.DataDir)
//if err != nil {
// log.Log.Fatalf("NewKingbusServer:NewMemoryStorage error,err:%s,dir:%s", err.Error(), cfg.DataDir)
//}
defer func() {
//close storage when occur error
if err != nil {
store.Close()
}
}()
logExist := utils.ExistLog(cfg.DataDir)
switch {
case !logExist && !cfg.NewCluster:
if err = cfg.VerifyJoinExisting(); err != nil {
return err
}
cl, err = membership.NewClusterFromURLsMap(cfg.InitialPeerURLsMap)
if err != nil {
return err
}
remotePeerURLs := membership.GetRemotePeerURLs(cl, cfg.Name)
existingCluster, gerr := membership.GetClusterFromRemotePeers(remotePeerURLs, prt)
if gerr != nil {
return fmt.Errorf("cannot fetch cluster info from peer urls: %v", gerr)
}
if err = membership.ValidateClusterAndAssignIDs(cl, existingCluster); err != nil {
return fmt.Errorf("error validating peerURLs %s: %v", existingCluster, err)
}
remotes = existingCluster.Members()
cl.SetID(existingCluster.GetID())
cl.SetStore(store)
id, etcdRaftNode = startEtcdRaftNode(cfg, store, cl, nil)
case !logExist && cfg.NewCluster:
if err = cfg.VerifyBootstrap(); err != nil {
return err
}
cl, err = membership.NewClusterFromURLsMap(cfg.InitialPeerURLsMap)
if err != nil {
return err
}
m := cl.MemberByName(cfg.Name)
if membership.IsMemberBootstrapped(cl, cfg.Name, prt, DialTimeout) {
return fmt.Errorf("member %s has already been bootstrapped", m.ID)
}
cl.SetStore(store)
id, etcdRaftNode = startEtcdRaftNode(cfg, store, cl, cl.MemberIDs())
case logExist:
if err = utils.IsDirWriteable(cfg.DataDir); err != nil {
return fmt.Errorf("cannot write to member directory: %v", err)
}
//node restart, read states from storage
//get applied index
appliedIndex = raft.MustGetAppliedIndex(store)
cfg.AppliedIndex = appliedIndex
id, etcdRaftNode, cl = restartEtcdNode(cfg, store)
cl.SetStore(store)
default:
return fmt.Errorf("unsupported bootstrap config")
}
s.raftNode = raft.NewNode(
raft.NodeConfig{
IsIDRemoved: func(id uint64) bool { return cl.IsIDRemoved(types.ID(id)) },
Node: etcdRaftNode,
Heartbeat: cfg.HeartbeatMs,
Storage: store,
},
)
//committedIndex,term will update by fsm(UpdateCommittedIndex,SetTerm)
//set appliedIndex when applyEntries will check the entry continuity
s.raftNode.SetAppliedIndex(appliedIndex)
s.id = id
s.wait = wait.New()
s.reqIDGen = idutil.NewGenerator(uint16(id), time.Now())
s.stopping = make(chan struct{})
s.errorc = make(chan error)
s.applyBroadcast = utils.NewBroadcast()
s.stats = stats.NewServerStats(cfg.Name, id.String())
s.lstats = stats.NewLeaderStats(id.String())
s.store = store
tr := &rafthttp.Transport{
TLSInfo: transport.TLSInfo{},
DialTimeout: DialTimeout,
ID: id,
URLs: cfg.PeerURLs,
ClusterID: cl.GetID(),
Raft: s,
ServerStats: s.stats,
LeaderStats: s.lstats,
ErrorC: s.errorc,
}
if err = tr.Start(); err != nil {
return err
}
// add all remotes into transport
//Add remotes to rafthttp, who help newly joined members catch up the
//progress of the cluster. It supports basic message sending to remote, and
//has no stream connection for simplicity. remotes will not be used
//after the latest peers have been added into rafthttp.
for _, m := range remotes {
if m.ID != id {
tr.AddRemote(m.ID, m.PeerURLs)
}
}
for _, m := range cl.Members() {
if m.ID != id {
tr.AddPeer(m.ID, m.PeerURLs)
}
}
s.raftNode.Transport = tr
s.cluster = cl
return nil
}
startEtcdRaftNode
kingbus/server/server.go
func startEtcdRaftNode(cfg config.RaftNodeConfig, store storage.Storage, cl *membership.RaftCluster, ids []types.ID) (
id types.ID, n etcdraft.Node) {
member := cl.MemberByName(cfg.Name)
peers := make([]etcdraft.Peer, len(ids))
for i, id := range ids {
ctx, err := json.Marshal((*cl).Member(id))
if err != nil {
log.Log.Panicf("marshal member should never fail: %v", err)
}
peers[i] = etcdraft.Peer{ID: uint64(id), Context: ctx}
}
id = member.ID
log.Log.Infof("starting member %s in cluster %s", id, cl.GetID())
c := &etcdraft.Config{
ID: uint64(id),
ElectionTick: int(cfg.ElectionTimeoutMs / cfg.HeartbeatMs),
HeartbeatTick: 1,
Storage: store,
MaxSizePerMsg: cfg.MaxRequestBytes,
MaxInflightMsgs: maxInflightMsgs,
CheckQuorum: true,
PreVote: cfg.PreVote,
DisableProposalForwarding: true,
Logger: log.Log,
}
n = etcdraft.StartNode(c, peers)
raft.AdvanceTicks(n, c.ElectionTick)
return id, n
}
restartEtcdNode
kingbus/server/server.go
func restartEtcdNode(cfg config.RaftNodeConfig, store storage.Storage) (
types.ID, etcdraft.Node, *membership.RaftCluster) {
cl, err := membership.GetRaftClusterFromStorage(store)
if err != nil {
if err != nil {
log.Log.Panic("GetRaftClusterFromStorage error:%s", err.Error())
}
}
log.Log.Debugf("restartEtcdNode:get raft cluster from storage,cluster:%v", cl.String())
//get id from raftCluster
member := cl.MemberByName(cfg.Name)
if member == nil {
log.Log.Fatalf("restartEtcdNode:member not in raft cluster,cluster:%v,memberName:%s",
cl.String(), cfg.Name)
}
c := &etcdraft.Config{
ID: uint64(member.ID),
ElectionTick: int(cfg.ElectionTimeoutMs / cfg.HeartbeatMs),
HeartbeatTick: 1,
Applied: cfg.AppliedIndex, //set appliedIndex
Storage: store,
MaxSizePerMsg: cfg.MaxRequestBytes,
MaxInflightMsgs: maxInflightMsgs,
CheckQuorum: true,
PreVote: cfg.PreVote,
DisableProposalForwarding: true,
Logger: log.Log,
}
n := etcdraft.RestartNode(c)
return member.ID, n, cl
}
작은 매듭
starRaft 방법 은 raft http. NewRoundTripper 를 통 해 http. RoundTripper 를 만 든 다음 storage. NewDiskStorage 를 통 해 DiskStorage 를 만 든 다음 logExist 및 cfg. NewCluster 에 따라 다 릅 니 다.둘 다 false 이면 membership. Raft Cluster 의 id 를 존재 하 는 cluster 의 id 로 업데이트 하고 startEtcdRaft Node 를 실행 합 니 다.cfg. NewCluster 가 true 라면 cl. MemberIDs () 를 사용 하여 startEtcdRaft Node 를 실행 합 니 다.logExist 가 true 이면 restartEtcdNode 를 실행 합 니 다.마지막 으로 rafthttp. Transport 를 만 들 고 tr. Start (), tr. AddRemote, tr. AddPeer 를 실행 합 니 다.
doc