ZooKeeper 의 FastLeader Election 알고리즘 상세 설명
1. zookeeper 서비스 가 시 작 될 때 클래스 QuorumPeerMain 의 입구 함수 main 에서 메 인 스 레 드 가 시 작 됩 니 다.
public class QuorumPeerMain {
private static final Logger LOG = LoggerFactory.getLogger(QuorumPeerMain.class);
private static final String USAGE = "Usage: QuorumPeerMain configfile";
protected QuorumPeer quorumPeer;
/**
* To start the replicated server specify the configuration file name on
* the command line.
* @param args path to the configfile
*/
public static void main(String[] args) {
QuorumPeerMain main = new QuorumPeerMain();
2. 그 다음 에 QuorumPeer 가 Thread. start 방법 을 다시 쓰 고 시작 합 니 다.
quorumPeer.start();
quorumPeer.join();
클래스 QuorumPeer 에서
@Override
public synchronized void start() {
if (!getView().containsKey(myid)) {
throw new RuntimeException("My id " + myid + " not in the peer list");
}
loadDataBase();
cnxnFactory.start();
try {
adminServer.start();
} catch (AdminServerException e) {
LOG.warn("Problem starting AdminServer", e);
System.out.println(e);
}
startLeaderElection();
super.start();
}
3. 위의 소스 코드 에서 볼 수 있 습 니 다. quorumPeer 스 레 드 가 시 작 된 후에 먼저 데이터 복 구 를 합 니 다. 디스크 에 저 장 된 데 이 터 를 읽 습 니 다. private void loadDataBase() {
try {
// db
zkDb.loadDataBase();
// load the epochs
/*
zxid epoch
zxid long , 32 epoch , 32 zxid ,
zxid(ZooKeeper Transaction Id), id,zookeeper ,zxid
*/
long lastProcessedZxid = zkDb.getDataTree().lastProcessedZxid;
long epochOfZxid = ZxidUtils.getEpochFromZxid(lastProcessedZxid);
try {
currentEpoch = readLongFromFile(CURRENT_EPOCH_FILENAME);
} catch(FileNotFoundException e) {
// pick a reasonable epoch number
// this should only happen once when moving to a
// new code version
currentEpoch = epochOfZxid;
//....
4. 그 다음 에 선 거 를 초기 화 합 니 다. 처음에 자신 을 선 거 했 을 때 기본 적 으로 사용 하 는 알고리즘 은 FastLeader Election 입 니 다.
synchronized public void startLeaderElection() {
try {
/*
*/
if (getPeerState() == ServerState.LOOKING) {
currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
}
} catch(IOException e) {
RuntimeException re = new RuntimeException(e.getMessage());
re.setStackTrace(e.getStackTrace());
throw re;
}
// if (!getView().containsKey(myid)) {
// throw new RuntimeException("My id " + myid + " not in the peer list");
//}
if (electionType == 0) {
try {
udpSocket = new DatagramSocket(myQuorumAddr.getPort());
responder = new ResponderThread();
responder.start();
} catch (SocketException e) {
throw new RuntimeException(e);
}
}
this.electionAlg = createElectionAlgorithm(electionType);
}
5. 그 다음 에 선거 포트 를 연결 하고 FastLeader Election 초기 화:protected Election createElectionAlgorithm(int electionAlgorithm){
Election le=null;
//TODO: use a factory rather than a switch
switch (electionAlgorithm) {
case 0:
le = new LeaderElection(this);
break;
case 1:
le = new AuthFastLeaderElection(this);
break;
case 2:
le = new AuthFastLeaderElection(this, true);
break;
case 3:
qcm = new QuorumCnxManager(this);
/*
,
*/
QuorumCnxManager.Listener listener = qcm.listener;
if(listener != null){
listener.start();
// TCP
FastLeaderElection fle = new FastLeaderElection(this, qcm);
fle.start();
le = fle;
} else {
LOG.error("Null listener when initializing cnx manager");
}
break;
default:
assert false;
}
return le;
}
6. QuorumPeer 스 레 드 시작:
private void starter(QuorumPeer self, QuorumCnxManager manager) {
this.self = self;
proposedLeader = -1;
proposedZxid = -1;
/*
, ToSend
, Notification
*/
sendqueue = new LinkedBlockingQueue();
recvqueue = new LinkedBlockingQueue();
this.messenger = new Messenger(manager);
}
FastLeader Election. java 파일 에서:Messenger(QuorumCnxManager manager) {
this.ws = new WorkerSender(manager);
this.wsThread = new Thread(this.ws,
"WorkerSender[myid=" + self.getId() + "]");
this.wsThread.setDaemon(true);
this.wr = new WorkerReceiver(manager);
this.wrThread = new Thread(this.wr,
"WorkerReceiver[myid=" + self.getId() + "]");
this.wrThread.setDaemon(true);
}
7. 선 거 를 실시 하 는 과정 에서 zookeeper server 서버 는 다음 과 같은 네 가지 상태 가 있 습 니 다. LOOKING, FOLLOWING, LEADING, OBSERVING, 그 중에서 OBSERVING 상태 에서 server 는 투표 과정 에 참가 하지 않 고 LOOKING 상태 에서 만 투표 과정 에 참가 합 니 다. 투표 가 끝나 면 server 의 상 태 는 FOLLOWER 또는 LEADER 가 됩 니 다.리더 선거 과정 부터 말씀 드 리 겠 습 니 다.
STEP 1: LOOKING 상태 에 있 는 server 의 경우 논리 시계 값 (logicalclock) 이 라 고 불 리 는 것 을 먼저 판단 합 니 다. 받 은 logicalclock 의 값 이 현재 server 자체 의 logicalclock 값 보다 크 면 업 데 이 트 된 선거 임 을 설명 합 니 다. 이 때 는 자신의 server 의 logicalclock 값 을 업데이트 하고 이전에 받 은 다른 server 에서 온 투표 결 과 를 비 워 야 합 니 다.그 다음 에 자신의 투 표를 업데이트 해 야 하 는 지 판단 하 는 기준 은 epoch 값 의 크기 를 먼저 본 다음 에 zxid 의 크기 를 판단 하고 마지막 으로 server id 의 크기 를 보 는 것 입 니 다. (물론 이런 상황 에 대해 server 는 자신의 투 표를 업데이트 할 것 입 니 다. 현재 server 의 epoch 값 이 받 은 epoch 값 보다 작 기 때 문 입 니 다) 그리고 자신의 투 표를 다른 server 에 방송 합 니 다.
FastLeader Election. java 파일 에서:
protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" +
Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid));
if(self.getQuorumVerifier().getWeight(newId) == 0){
return false;
}
/*
* We return true if one of the following three cases hold:
* 1- New epoch is higher
* 2- New epoch is the same as current epoch, but new zxid is higher
* 3- New epoch is the same as current epoch, new zxid is the same
* as current zxid, but server id is higher.
*/
return ((newEpoch > curEpoch) ||
((newEpoch == curEpoch) &&
((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
}
단계 2: 자신의 logicalclock 값 이 받 은 logicalclock 값 보다 크 면 바로 break;똑 같 으 면 epoch, zxid, server id 에 따라 업데이트 가 필요 한 지 판단 한 다음 에 자신의 투 표를 다른 server 에 방송 하고 마지막 으로 현재 server 가 받 은 투표 팀 에 투 표를 넣 어야 합 니 다. HashMap recvset = new HashMap();
HashMap outofelection = new HashMap();
FastLeader Election. java 파일 의 lookForLeader 함수 에서:
case LOOKING:
// If notification > current, replace and send messages out
if (n.electionEpoch > logicalclock.get()) {
logicalclock.set(n.electionEpoch);
//
recvset.clear();
//
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
updateProposal(getInitId(),
getInitLastLoggedZxid(),
getPeerEpoch());
}
sendNotifications();
} else if (n.electionEpoch < logicalclock.get()) {
if(LOG.isDebugEnabled()){
LOG.debug(
"Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x"
+ Long.toHexString(n.electionEpoch)
+ ", logicalclock=0x" + Long.toHexString(logicalclock.get()));
}
break;
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
//
sendNotifications();
}
if(LOG.isDebugEnabled()){
LOG.debug("Adding vote: from=" + n.sid +
", proposed leader=" + n.leader +
", proposed zxid=0x" + Long.toHexString(n.zxid) +
", proposed election epoch=0x" + Long.toHexString(n.electionEpoch));
}
//
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
STEP 3: 서버 는 투표 가 끝 났 는 지 여 부 를 판단 합 니 다. 끝 난 조건 은 어떤 leader 가 절반 이상 의 server 의 지 지 를 받 았 는 지, 그렇다면 조금 만 더 기 다 려 (200 ms) 업데이트 데 이 터 를 받 았 는 지, 받 지 못 했다 면 자신의 역할 (follower Or leader) 을 설정 하고 선거 절 차 를 종료 하지 않 으 면 계속 합 니 다.
FastLeaderElection. java 파일 중;
//
private boolean termPredicate(HashMap votes, Vote vote) {
SyncedLearnerTracker voteSet = new SyncedLearnerTracker();
voteSet.addQuorumVerifier(self.getQuorumVerifier());
if (self.getLastSeenQuorumVerifier() != null
&& self.getLastSeenQuorumVerifier().getVersion() > self
.getQuorumVerifier().getVersion()) {
voteSet.addQuorumVerifier(self.getLastSeenQuorumVerifier());
}
/*
* First make the views consistent. Sometimes peers will have different
* zxids for a server depending on timing.
*/
for (Map.Entry entry : votes.entrySet()) {
if (vote.equals(entry.getValue())) {
voteSet.addAck(entry.getKey());
}
}
return voteSet.hasAllQuorums();
}
lookForLeader 함수 에서:
//
if (termPredicate(recvset,
new Vote(proposedLeader, proposedZxid,
logicalclock.get(), proposedEpoch))) {
// Verify if there is any change in the proposed leader
// ,
while((n = recvqueue.poll(finalizeWait,
TimeUnit.MILLISECONDS)) != null){
if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
proposedLeader, proposedZxid, proposedEpoch)){
recvqueue.put(n);
break;
}
}
/*
* This predicate is true once we don't read any new
* relevant message from the reception queue
*/
// ,
//
if (n == null) {
self.setPeerState((proposedLeader == self.getId()) ?
ServerState.LEADING: learningState());
Vote endVote = new Vote(proposedLeader,
proposedZxid, proposedEpoch);
leaveInstance(endVote);
return endVote;
}
}
STEP 4: 위 에서 논의 한 것 은 데이터 전송 server 의 상 태 는 LOOKING 상태 입 니 다. 만약 에 데이터 전송 자의 상태 가 FOLLOWING 또는 LEADING 상태 라면 logicalclock 이 같 으 면 데 이 터 를 recvset 에 저장 합 니 다. 만약 에 상대방 server 가 leader 라 고 주장 하면 절반 이상 의 server 가 지원 하 는 지 판단 합 니 다. 만약 에...자신의 선거 상 태 를 설정 하고 선거 에서 물러난다.
case FOLLOWING:
case LEADING:
/*
* Consider all notifications from the same epoch
* together.
*/
// server server logicalclock
if(n.electionEpoch == logicalclock.get()){
// recvset
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
if(termPredicate(recvset, new Vote(n.leader,
n.zxid, n.electionEpoch, n.peerEpoch, n.state))
&& checkLeader(outofelection, n.leader, n.electionEpoch)) {
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
}
STEP 5: 받 은 데이터 의 logicalclock 값 이 현재 server 의 logicalclock 과 같 지 않 으 면 다른 선거 에서 선거 결과 가 나 왔 다 는 뜻 으로 outofelection 집합 에 가입 하고 outofelection 집합 에서 판단 할 때 과반 을 지원 하 며, 그렇다면 자신의 투 표를 업데이트 하고 자신의 상 태 를 설정 합 니 다.
outofelection.put(n.sid, new Vote(n.leader,
IGNOREVALUE, IGNOREVALUE, n.peerEpoch, n.state));
if (termPredicate(outofelection, new Vote(n.leader,
IGNOREVALUE, IGNOREVALUE, n.peerEpoch, n.state))
&& checkLeader(outofelection, n.leader, IGNOREVALUE)) {
synchronized(this){
logicalclock.set(n.electionEpoch);
self.setPeerState((n.leader == self.getId()) ?
ServerState.LEADING: learningState());
}
Vote endVote = new Vote(n.leader, n.zxid, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
결론: 이것 이 바로 zookeeper 의 FastLeader Election 선거의 대체적인 과정 이다.
참고 블 로그:
http://blog.csdn.net/xhh198781/article/details/6619203
http://iwinit.iteye.com/blog/1773531
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
spark 의 2: 원리 소개Google Map/Reduce 를 바탕 으로 이 루어 진 Hadoop 은 개발 자 에 게 map, reduce 원 어 를 제공 하여 병렬 일괄 처리 프로그램 을 매우 간단 하고 아름 답 게 만 들 었 습 니 다.S...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.