ZooKeeper 의 FastLeader Election 알고리즘 상세 설명

우리 가 zookeeper 서 비 스 를 시작 할 때 가장 먼저 해 야 할 일 은 leader 선거 입 니 다. zookeeper 에서 leader 선거 알고리즘 은 3 가지 가 있 습 니 다. Leader Election 알고리즘, AuthFastLeader Election 알고리즘 과 FastLeader Election 알고리즘 을 포함 합 니 다. 그 중에서 FastLeadElection 알고리즘 은 묵인 합 니 다. 물론 설정 파일 에서 설정 항목 을 수정 할 수 있 습 니 다. election Alg.
1. zookeeper 서비스 가 시 작 될 때 클래스 QuorumPeerMain 의 입구 함수 main 에서 메 인 스 레 드 가 시 작 됩 니 다.
public class QuorumPeerMain {
    private static final Logger LOG = LoggerFactory.getLogger(QuorumPeerMain.class);

    private static final String USAGE = "Usage: QuorumPeerMain configfile";

    protected QuorumPeer quorumPeer;

    /**
     * To start the replicated server specify the configuration file name on
     * the command line.
     * @param args path to the configfile
     */
    public static void main(String[] args) {
        QuorumPeerMain main = new QuorumPeerMain();

2. 그 다음 에 QuorumPeer 가 Thread. start 방법 을 다시 쓰 고 시작 합 니 다.
          quorumPeer.start();
          quorumPeer.join();

클래스 QuorumPeer 에서
   @Override
    public synchronized void start() {
        if (!getView().containsKey(myid)) {
            throw new RuntimeException("My id " + myid + " not in the peer list");
         }
        loadDataBase();
        cnxnFactory.start();
        try {
            adminServer.start();
        } catch (AdminServerException e) {
            LOG.warn("Problem starting AdminServer", e);
            System.out.println(e);
        }
        startLeaderElection();
        super.start();
    }
3. 위의 소스 코드 에서 볼 수 있 습 니 다. quorumPeer 스 레 드 가 시 작 된 후에 먼저 데이터 복 구 를 합 니 다. 디스크 에 저 장 된 데 이 터 를 읽 습 니 다.
 private void loadDataBase() {
        try {
            //        db
            zkDb.loadDataBase();

            // load the epochs
            /*
                zxid  epoch  
              zxid long , 32   epoch , 32   zxid ,
              zxid(ZooKeeper Transaction Id),   id,zookeeper   ,zxid    
                      
            */
            long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid;
            long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid);
            try {
                currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);
            } catch(FileNotFoundException e) {
            	// pick a reasonable epoch number
            	// this should only happen once when moving to a
            	// new code version
            	currentEpoch = epochOfZxid;
                //....

4. 그 다음 에 선 거 를 초기 화 합 니 다. 처음에 자신 을 선 거 했 을 때 기본 적 으로 사용 하 는 알고리즘 은 FastLeader Election 입 니 다.
synchronized public void startLeaderElection() {
       try {
            /*
                
            */
           if (getPeerState() == ServerState.LOOKING) {
               currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
           }
       } catch(IOException e) {
           RuntimeException re = new RuntimeException(e.getMessage());
           re.setStackTrace(e.getStackTrace());
           throw re;
       }

       // if (!getView().containsKey(myid)) {
      //      throw new RuntimeException("My id " + myid + " not in the peer list");
        //}
        if (electionType == 0) {
            try {
                udpSocket = new DatagramSocket(myQuorumAddr.getPort());
                responder = new ResponderThread();
                responder.start();
            } catch (SocketException e) {
                throw new RuntimeException(e);
            }
        }
        this.electionAlg = createElectionAlgorithm(electionType);
    }
5. 그 다음 에 선거 포트 를 연결 하고 FastLeader Election 초기 화:
protected Election createElectionAlgorithm(int electionAlgorithm){
        Election le=null;

        //TODO: use a factory rather than a switch
        switch (electionAlgorithm) {
        case 0:
            le = new LeaderElection(this);
            break;
        case 1:
            le = new AuthFastLeaderElection(this);
            break;
        case 2:
            le = new AuthFastLeaderElection(this, true);
            break;
        case 3:
            qcm = new QuorumCnxManager(this);
            /*
                  ,          
            */
            QuorumCnxManager.Listener listener = qcm.listener;
            if(listener != null){
                listener.start();
                //  TCP     
                FastLeaderElection fle = new FastLeaderElection(this, qcm);
                fle.start();
                le = fle;
            } else {
                LOG.error("Null listener when initializing cnx manager");
            }
            break;
        default:
            assert false;
        }
        return le;
    }

6. QuorumPeer 스 레 드 시작:
private void starter(QuorumPeer self, QuorumCnxManager manager) {
        this.self = self;
        proposedLeader = -1;
        proposedZxid = -1;

        /*
               ,    ToSend
               ,    Notification
        */
        sendqueue = new LinkedBlockingQueue();
        recvqueue = new LinkedBlockingQueue();
        this.messenger = new Messenger(manager);

    }
FastLeader Election. java 파일 에서:
Messenger(QuorumCnxManager manager) {

            this.ws = new WorkerSender(manager);

            this.wsThread = new Thread(this.ws,
                    "WorkerSender[myid=" + self.getId() + "]");
            this.wsThread.setDaemon(true);

            this.wr = new WorkerReceiver(manager);

            this.wrThread = new Thread(this.wr,
                    "WorkerReceiver[myid=" + self.getId() + "]");
            this.wrThread.setDaemon(true);
        }
7. 선 거 를 실시 하 는 과정 에서 zookeeper server 서버 는 다음 과 같은 네 가지 상태 가 있 습 니 다. LOOKING, FOLLOWING, LEADING, OBSERVING, 그 중에서 OBSERVING 상태 에서 server 는 투표 과정 에 참가 하지 않 고 LOOKING 상태 에서 만 투표 과정 에 참가 합 니 다. 투표 가 끝나 면 server 의 상 태 는 FOLLOWER 또는 LEADER 가 됩 니 다.
리더 선거 과정 부터 말씀 드 리 겠 습 니 다.
STEP 1: LOOKING 상태 에 있 는 server 의 경우 논리 시계 값 (logicalclock) 이 라 고 불 리 는 것 을 먼저 판단 합 니 다. 받 은 logicalclock 의 값 이 현재 server 자체 의 logicalclock 값 보다 크 면 업 데 이 트 된 선거 임 을 설명 합 니 다. 이 때 는 자신의 server 의 logicalclock 값 을 업데이트 하고 이전에 받 은 다른 server 에서 온 투표 결 과 를 비 워 야 합 니 다.그 다음 에 자신의 투 표를 업데이트 해 야 하 는 지 판단 하 는 기준 은 epoch 값 의 크기 를 먼저 본 다음 에 zxid 의 크기 를 판단 하고 마지막 으로 server id 의 크기 를 보 는 것 입 니 다. (물론 이런 상황 에 대해 server 는 자신의 투 표를 업데이트 할 것 입 니 다. 현재 server 의 epoch 값 이 받 은 epoch 값 보다 작 기 때 문 입 니 다) 그리고 자신의 투 표를 다른 server 에 방송 합 니 다.
FastLeader Election. java 파일 에서:
 protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
        LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" +
                Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid));
        if(self.getQuorumVerifier().getWeight(newId) == 0){
            return false;
        }

        /*
         * We return true if one of the following three cases hold:
         * 1- New epoch is higher
         * 2- New epoch is the same as current epoch, but new zxid is higher
         * 3- New epoch is the same as current epoch, new zxid is the same
         *  as current zxid, but server id is higher.
         */

        return ((newEpoch > curEpoch) ||
                ((newEpoch == curEpoch) &&
                ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
    }
단계 2: 자신의 logicalclock 값 이 받 은 logicalclock 값 보다 크 면 바로 break;똑 같 으 면 epoch, zxid, server id 에 따라 업데이트 가 필요 한 지 판단 한 다음 에 자신의 투 표를 다른 server 에 방송 하고 마지막 으로 현재 server 가 받 은 투표 팀 에 투 표를 넣 어야 합 니 다.
 HashMap recvset = new HashMap();

            HashMap outofelection = new HashMap();

FastLeader Election. java 파일 의 lookForLeader 함수 에서:
case LOOKING:
                        // If notification > current, replace and send messages out
                        if (n.electionEpoch > logicalclock.get()) {
                            logicalclock.set(n.electionEpoch);
                            //           
                            recvset.clear();
                            //            
                            if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                    getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                                updateProposal(n.leader, n.zxid, n.peerEpoch);
                            } else {
                                updateProposal(getInitId(),
                                        getInitLastLoggedZxid(),
                                        getPeerEpoch());
                            }
                            sendNotifications();
                        } else if (n.electionEpoch < logicalclock.get()) {
                            if(LOG.isDebugEnabled()){
                                LOG.debug(
                                    "Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
                                        + Long.toHexString(n.electionEpoch)
                                        + ", logicalclock=0x" + Long.toHexString(logicalclock.get()));
                            }
                            break; 
                        } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                proposedLeader, proposedZxid, proposedEpoch)) {
                            updateProposal(n.leader, n.zxid, n.peerEpoch);
                            //  
                            sendNotifications();
                        }

                        if(LOG.isDebugEnabled()){
                            LOG.debug("Adding vote: from=" + n.sid +
                                    ", proposed leader=" + n.leader +
                                    ", proposed zxid=0x" + Long.toHexString(n.zxid) +
                                    ", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
                        }

                        //      
                        recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));

STEP 3: 서버 는 투표 가 끝 났 는 지 여 부 를 판단 합 니 다. 끝 난 조건 은 어떤 leader 가 절반 이상 의 server 의 지 지 를 받 았 는 지, 그렇다면 조금 만 더 기 다 려 (200 ms) 업데이트 데 이 터 를 받 았 는 지, 받 지 못 했다 면 자신의 역할 (follower Or leader) 을 설정 하고 선거 절 차 를 종료 하지 않 으 면 계속 합 니 다.
FastLeaderElection. java 파일 중;
//        
    private boolean termPredicate(HashMap votes, Vote vote) {
        SyncedLearnerTracker voteSet = new SyncedLearnerTracker();
        voteSet.addQuorumVerifier(self.getQuorumVerifier());
        if (self.getLastSeenQuorumVerifier() != null
                && self.getLastSeenQuorumVerifier().getVersion() > self
                        .getQuorumVerifier().getVersion()) {
            voteSet.addQuorumVerifier(self.getLastSeenQuorumVerifier());
        }

        /*
         * First make the views consistent. Sometimes peers will have different
         * zxids for a server depending on timing.
         */
        for (Map.Entry entry : votes.entrySet()) {
            if (vote.equals(entry.getValue())) {
                voteSet.addAck(entry.getKey());
            }
        }

        return voteSet.hasAllQuorums();
    }

lookForLeader 함수 에서:
 //        
                        if (termPredicate(recvset,
                                new Vote(proposedLeader, proposedZxid,
                                        logicalclock.get(), proposedEpoch))) {

                            // Verify if there is any change in the proposed leader
                            //     ,        
                            while((n = recvqueue.poll(finalizeWait,
                                    TimeUnit.MILLISECONDS)) != null){
                                if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                        proposedLeader, proposedZxid, proposedEpoch)){
                                    recvqueue.put(n);
                                    break;
                                }
                            }

                            /*
                             * This predicate is true once we don't read any new
                             * relevant message from the reception queue
                             */
                            //          ,       
                            //      
                            if (n == null) {
                                self.setPeerState((proposedLeader == self.getId()) ?
                                        ServerState.LEADING: learningState());

                                Vote endVote = new Vote(proposedLeader,
                                        proposedZxid, proposedEpoch);
                                leaveInstance(endVote);
                                return endVote;
                            }
                        }

STEP 4: 위 에서 논의 한 것 은 데이터 전송 server 의 상 태 는 LOOKING 상태 입 니 다. 만약 에 데이터 전송 자의 상태 가 FOLLOWING 또는 LEADING 상태 라면 logicalclock 이 같 으 면 데 이 터 를 recvset 에 저장 합 니 다. 만약 에 상대방 server 가 leader 라 고 주장 하면 절반 이상 의 server 가 지원 하 는 지 판단 합 니 다. 만약 에...자신의 선거 상 태 를 설정 하고 선거 에서 물러난다.
 case FOLLOWING:
                    case LEADING:
                        /*
                         * Consider all notifications from the same epoch
                         * together.
                         */
                        //  server    server logicalclock  
                        if(n.electionEpoch == logicalclock.get()){
                            //   recvset 
                            recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
                            if(termPredicate(recvset, new Vote(n.leader,
                                            n.zxid, n.electionEpoch, n.peerEpoch, n.state))
                                            && checkLeader(outofelection, n.leader, n.electionEpoch)) {
                                self.setPeerState((n.leader == self.getId()) ?
                                        ServerState.LEADING: learningState());

                                Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
                                leaveInstance(endVote);
                                return endVote;
                            }
                        }

STEP 5: 받 은 데이터 의 logicalclock 값 이 현재 server 의 logicalclock 과 같 지 않 으 면 다른 선거 에서 선거 결과 가 나 왔 다 는 뜻 으로 outofelection 집합 에 가입 하고 outofelection 집합 에서 판단 할 때 과반 을 지원 하 며, 그렇다면 자신의 투 표를 업데이트 하고 자신의 상 태 를 설정 합 니 다.
 outofelection.put(n.sid, new Vote(n.leader, 
                                IGNOREVALUE, IGNOREVALUE, n.peerEpoch, n.state));
                        if (termPredicate(outofelection, new Vote(n.leader,
                                IGNOREVALUE, IGNOREVALUE, n.peerEpoch, n.state))
                                && checkLeader(outofelection, n.leader, IGNOREVALUE)) {
                            synchronized(this){
                                logicalclock.set(n.electionEpoch);
                                self.setPeerState((n.leader == self.getId()) ?
                                        ServerState.LEADING: learningState());
                            }
                            Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
                            leaveInstance(endVote);
                            return endVote;
                        }

결론: 이것 이 바로 zookeeper 의 FastLeader Election 선거의 대체적인 과정 이다.
참고 블 로그:
http://blog.csdn.net/xhh198781/article/details/6619203
http://iwinit.iteye.com/blog/1773531

좋은 웹페이지 즐겨찾기