golang etcd raft 프로 토 콜 11

9632 단어
golang etcd raft 프로 토 콜 11
발단
최근 읽 기 [클 라 우 드 원생 분포 식 저장 초석: etcd 깊이 분석] (두 군, 2019.1) 본 시 리 즈 는 golang 으로 연습 할 예정 이다.
raft 분산 식 일치 성 알고리즘
                       ,
         。
                 ——            ?

Raft              :
1.     (leader election)、
2.     (log replication)、
3.    (safety)
4.       (membership changes)
      。

  gitee  :
https://gitee.com/ioly/learning.gooop

목표.
  • raft 프로 토 콜 에 따라 높 은 분포 식 강 일치 kv 저장
  • 을 실현 합 니 다.
    하위 목표 (Day 11)
  • 리더 스테이 트 가 아직 세부 사항 을 다 처리 하지 못 했 지만 기본 서 비 스 를 시작 하고 제공 할 수 있 을 것 같다
  • 외곽 기능 을 추가 하여 첫 번 째 '점화' 를 준비 합 니 다.
  • config / tRaft Config: 로 컬 json 파일 에서 클 러 스 터 노드 설정 을 읽 고 IRaftConfig / IRaftNodeConfig 의 실현 을 제공 합 니 다
  • lsm / tRaftLSMImplement: 최상 위 인터페이스 IRaftLSM 의 실현 을 제공 하고 '설정 / kv 저장 / 노드 통신' 세 블록 을 붙 입 니 다
  • server / IRaftKVServer: server 시동기 인터페이스
  • server / tRaftKVServer: server 시동기 의 실현, raft rpc 와 kv rpc 감청

  • config/tRaftConfig.go
    로 컬 json 파일 에서 클 러 스 터 노드 설정 을 읽 고 IRaftConfig / IRaftNodeConfig 의 실현 을 제공 합 니 다.
    package config
    
    import (
        "encoding/json"
        "os"
    )
    
    type tRaftConfig struct {
        ID string
        Nodes []*tRaftNodeConfig
    }
    
    type tRaftNodeConfig struct {
        ID string
        Endpoint string
    }
    
    func (me *tRaftConfig) GetID() string {
        return me.ID
    }
    
    func (me *tRaftConfig) GetNodes() []IRaftNodeConfig {
        a := make([]IRaftNodeConfig, len(me.Nodes))
        for i,it := range me.Nodes {
            a[i] = it
        }
        return a
    }
    
    
    func (me *tRaftNodeConfig) GetID() string {
        return me.ID
    }
    
    func (me *tRaftNodeConfig) GetEndpoint() string {
        return me.Endpoint
    }
    
    func LoadJSONFile(file string) IRaftConfig {
        data, err := os.ReadFile(file)
        if err != nil {
            panic(err)
        }
    
        c := new(tRaftConfig)
        err = json.Unmarshal(data, c)
        if err != nil {
            panic(err)
        }
    
        return c
    }

    lsm/tRaftLSMImplement.go
    최상 위 인터페이스 IRaftLSM 의 실현 을 제공 합 니 다. '설정 / kv 저장 / 노드 통신' 세 블록 을 붙 이 고 진단 로 그 를 추가 합 니 다.
    package lsm
    
    import (
        "learning/gooop/etcd/raft/common"
        "learning/gooop/etcd/raft/config"
        "learning/gooop/etcd/raft/logger"
        "learning/gooop/etcd/raft/rpc"
        "learning/gooop/etcd/raft/rpc/client"
        "learning/gooop/etcd/raft/store"
        "sync"
    )
    
    type tRaftLSMImplement struct {
        tEventDrivenModel
        mInitOnce sync.Once
    
        mConfig config.IRaftConfig
        mStore store.ILogStore
        mClientService client.IRaftClientService
        mState IRaftState
    }
    
    
    // trigger: init()
    // args: empty
    const meInit = "lsm.Init"
    
    // trigger: HandleStateChanged()
    // args: IRaftState
    const meStateChanged = "lsm.StateChnaged"
    
    func (me *tRaftLSMImplement) init() {
        me.mInitOnce.Do(func() {
            me.initEventHandlers()
            me.raise(meInit)
        })
    }
    
    func (me *tRaftLSMImplement) initEventHandlers() {
        // write only
        me.hookEventsForConfig()
        me.hookEventsForStore()
        me.hookEventsForPeerService()
        me.hookEventsForState()
    }
    
    func (me *tRaftLSMImplement) hookEventsForConfig() {
        me.hook(meInit, func(e string, args ...interface{}) {
            logger.Logf("tRaftLSMImplement.init, ConfigFile = %v", common.ConfigFile)
            me.mConfig = config.LoadJSONFile(common.ConfigFile)
        })
    }
    
    func (me *tRaftLSMImplement) hookEventsForStore() {
        me.hook(meInit, func(e string, args ...interface{}) {
            logger.Logf("tRaftLSMImplement.init, DataFile = %v", common.DataFile)
            err, db := store.NewBoltStore(common.DataFile)
            if err != nil {
                panic(err)
            }
            me.mStore = db
        })
    }
    
    
    func (me *tRaftLSMImplement) hookEventsForPeerService() {
        me.hook(meInit, func(e string, args ...interface{}) {
            me.mClientService = client.NewRaftClientService(me.mConfig)
        })
    }
    
    func (me *tRaftLSMImplement) hookEventsForState() {
        me.hook(meInit, func(e string, args ...interface{}) {
            me.mState = newFollowerState(me, me.mStore.LastCommittedTerm())
            me.mState.Start()
        })
    
        me.hook(meStateChanged, func(e string, args ...interface{}) {
            state := args[0].(IRaftState)
            logger.Logf("tRaftLSMImplement.StateChanged, %v", state.Role())
    
            me.mState = state
            state.Start()
        })
    }
    
    
    func (me *tRaftLSMImplement) Config() config.IRaftConfig {
        return me.mConfig
    }
    
    func (me *tRaftLSMImplement) Store() store.ILogStore {
        return me.mStore
    }
    
    func (me *tRaftLSMImplement) HandleStateChanged(state IRaftState) {
        me.raise(meStateChanged, state)
    }
    
    func (me *tRaftLSMImplement) RaftClientService() client.IRaftClientService {
        return me.mClientService
    }
    
    func (me *tRaftLSMImplement) Heartbeat(cmd *rpc.HeartbeatCmd, ret *rpc.HeartbeatRet) error {
        state := me.mState
        e := state.Heartbeat(cmd, ret)
        logger.Logf("tRaftLSMImplement.Heartbeat, state=%v, cmd=%v, ret=%v, err=%v", cmd, ret, e)
        return e
    }
    
    func (me *tRaftLSMImplement) AppendLog(cmd *rpc.AppendLogCmd, ret *rpc.AppendLogRet) error {
        state := me.mState
        e := state.AppendLog(cmd, ret)
        logger.Logf("tRaftLSMImplement.AppendLog, state=%v, cmd=%v, ret=%v, err=%v", cmd, ret, e)
        return e
    }
    
    func (me *tRaftLSMImplement) CommitLog(cmd *rpc.CommitLogCmd, ret *rpc.CommitLogRet) error {
        state := me.mState
        e := state.CommitLog(cmd, ret)
        logger.Logf("tRaftLSMImplement.CommitLog, state=%v, cmd=%v, ret=%v, err=%v", cmd, ret, e)
        return e
    }
    
    func (me *tRaftLSMImplement) RequestVote(cmd *rpc.RequestVoteCmd, ret *rpc.RequestVoteRet) error {
        state := me.mState
        e := state.RequestVote(cmd, ret)
        logger.Logf("tRaftLSMImplement.RequestVote, state=%v, cmd=%v, ret=%v, err=%v", cmd, ret, e)
        return e
    }
    
    func (me *tRaftLSMImplement) ExecuteKVCmd(cmd *rpc.KVCmd, ret *rpc.KVRet) error {
        state := me.mState
        e := state.ExecuteKVCmd(cmd, ret)
        logger.Logf("tRaftLSMImplement.ExecuteKVCmd, state=%v, cmd=%v, ret=%v, err=%v", cmd, ret, e)
        return e
    }
    
    func (me *tRaftLSMImplement) State() IRaftState {
        return me.mState
    }
    
    func NewRaftLSM() IRaftLSM {
        it := new(tRaftLSMImplement)
        it.init()
        return it
    }

    server/IRaftKVServer.go
    server 시동기 인터페이스
    package server
    
    type IRaftKVServer interface {
        BeginServeTCP(port int) error
    }

    server/tRaftKVServer.go
    server 시동기 의 실현, raft rpc 와 kv rpc 감청
    package server
    
    import (
        "fmt"
        "learning/gooop/etcd/raft/lsm"
        rrpc "learning/gooop/etcd/raft/rpc"
        "learning/gooop/saga/mqs/logger"
        "net"
        "net/rpc"
        "time"
    )
    
    type tRaftKVServer int
    
    func (me *tRaftKVServer) BeginServeTCP(port int) error {
        logger.Logf("tRaftKVServer.BeginServeTCP, starting, port=%v", port)
    
        // resolve address
        addy, err := net.ResolveTCPAddr("tcp", fmt.Sprintf("0.0.0.0:%d", port))
        if err != nil {
            return err
        }
    
        // create raft lsm singleton
        raftLSM := lsm.NewRaftLSM()
    
        // register raft rpc server
        rserver := &RaftRPCServer {
            mRaftLSM : raftLSM,
        }
        err = rpc.Register(rserver)
        if err != nil {
            return err
        }
    
        // register kv rpc server
        kserver := &KVStoreRPCServer{
            mRaftLSM: raftLSM,
        }
        err = rpc.Register(kserver)
        if err != nil {
            return err
        }
    
        inbound, err := net.ListenTCP("tcp", addy)
        if err != nil {
            return err
        }
        go rpc.Accept(inbound)
    
        logger.Logf("tRaftKVServer.BeginServeTCP, service ready at port=%v", port)
        return nil
    }
    
    // RaftRPCServer exposes a raft rpc service
    type RaftRPCServer struct {
        mRaftLSM lsm.IRaftLSM
    }
    
    // Heartbeat leader to follower
    func (me *RaftRPCServer) Heartbeat(cmd *rrpc.HeartbeatCmd, ret *rrpc.HeartbeatRet) error {
        e := me.mRaftLSM.Heartbeat(cmd, ret)
        logger.Logf("RaftRPCServer.Heartbeat, cmd=%v, ret=%v, e=%v", cmd, ret, e)
        return e
    }
    
    // AppendLog leader to follower
    func (me *RaftRPCServer) AppendLog(cmd *rrpc.AppendLogCmd, ret *rrpc.AppendLogRet) error {
        e := me.mRaftLSM.AppendLog(cmd, ret)
        logger.Logf("RaftRPCServer.AppendLog, cmd=%v, ret=%v, e=%v", cmd, ret, e)
        return e
    }
    
    // CommitLog leader to follower
    func (me *RaftRPCServer) CommitLog(cmd *rrpc.CommitLogCmd, ret *rrpc.CommitLogRet) error {
        e := me.mRaftLSM.CommitLog(cmd, ret)
        logger.Logf("RaftRPCServer.CommitLog, cmd=%v, ret=%v, e=%v", cmd, ret, e)
        return e
    }
    
    // RequestVote candidate to others
    func (me *RaftRPCServer) RequestVote(cmd *rrpc.RequestVoteCmd, ret *rrpc.RequestVoteRet) error {
        e := me.mRaftLSM.RequestVote(cmd, ret)
        logger.Logf("RaftRPCServer.RequestVote, cmd=%v, ret=%v, e=%v", cmd, ret, e)
        return e
    }
    
    // Ping to keep alive
    func (me *RaftRPCServer) Ping(cmd *rrpc.PingCmd, ret *rrpc.PingRet) error {
        ret.SenderID = me.mRaftLSM.Config().GetID()
        ret.Timestamp = time.Now().UnixNano()
    
        logger.Logf("RaftRPCServer.Ping, cmd=%v, ret=%v", cmd, ret)
        return nil
    }
    
    // KVStoreRPCServer expose a kv storage service
    type KVStoreRPCServer struct {
        mRaftLSM lsm.IRaftLSM
    }
    
    // ExecuteKVCmd leader to follower
    func (me *KVStoreRPCServer) ExecuteKVCmd(cmd *rrpc.KVCmd, ret *rrpc.KVRet) error {
        e := me.mRaftLSM.ExecuteKVCmd(cmd, ret)
        logger.Logf("KVStoreRPCServer.ExecuteKVCmd, cmd=%v, ret=%v, e=%v", cmd, ret, e)
        return e
    }

    (미 완성 계속)

    좋은 웹페이지 즐겨찾기