ElasticSearch 클러스터 선거
24500 단어 원본 학습ElasticSearch
Node 노드의 start () 메서드에서 discovery를 사용합니다.startInitialJoin () 방법으로 클러스터에 가입하고 선거에 참여하기 시작합니다.
@Override
public void startInitialJoin() {
// start the join thread from a cluster state update. See {@link JoinThreadControl} for details.
synchronized (stateMutex) {
// do the join on a different thread, the caller of this method waits for 30s anyhow till it is discovered
joinThreadControl.startNewThreadIfNotRunning();
}
}
자물쇠를 채우고 startNewThreadIfNotRunning () 을 호출합니다. 그룹에 가입한 제어 라인의 유일성을 확보하기 위해서입니다.
public void startNewThreadIfNotRunning() {
assert Thread.holdsLock(stateMutex);
if (joinThreadActive()) {
return;
}
threadPool.generic().execute(new Runnable() {
@Override
public void run() {
Thread currentThread = Thread.currentThread();
if (!currentJoinThread.compareAndSet(null, currentThread)) {
return;
}
while (running.get() && joinThreadActive(currentThread)) {
try {
innerJoinCluster();
return;
} catch (Exception e) {
logger.error("unexpected error while joining cluster, trying again", e);
// Because we catch any exception here, we want to know in
// tests if an uncaught exception got to this point and the test infra uncaught exception
// leak detection can catch this. In practise no uncaught exception should leak
assert ExceptionsHelper.reThrowIfNotNull(e);
}
}
// cleaning the current thread from currentJoinThread is done by explicit calls.
}
});
}
이 방법을 실행하려면 이 라인에 자물쇠가 있는지 확인하십시오.
그 다음에 이 루틴의 실행이 시작되면 유일성을 확보하기 위해 이번에는 생성할 필요가 없고 종료합니다.
public boolean joinThreadActive() {
Thread currentThread = currentJoinThread.get();
return running.get() && currentThread != null && currentThread.isAlive();
}
그렇지 않으면 새로운 스레드를 생성하여 실행하고, 카스를 통해 이 스레드를currentJoinThread 구성원에 저장합니다. 이 구성원은 원자 변수로 스레드의 안전을 보장합니다.
private final AtomicBoolean running = new AtomicBoolean(false);
private final AtomicReference currentJoinThread = new AtomicReference<>();
그리고 이 유일한 루틴은 루틴이 작업할 때 innerJoinCluster () 함수를 계속 순환해서 실행하고 그룹에 가입할 준비를 합니다.
innerJoinCluster () 에서 현재 노드가 인정한 마스터를 찾을 때까지 findMaster () 를 순환해서 호출합니다.
while (masterNode == null && joinThreadControl.joinThreadActive(currentThread)) {
masterNode = findMaster();
}
findMaster에서 처음에 ping AndWait를 호출하여 같은 집단의 다른 노드에 ping을 보내고 다른 노드의 ping의 회답을 기다리며 fullPingResponses를 받습니다.
List fullPingResponses = pingAndWait(pingTimeout).toList();
ping And Wait 방법에서 지난 분석과 연결됩니다.
private ZenPing.PingCollection pingAndWait(TimeValue timeout) {
final CompletableFuture response = new CompletableFuture<>();
try {
zenPing.ping(response::complete, timeout);
} catch (Exception ex) {
// logged later
response.completeExceptionally(ex);
}
try {
return response.get();
} catch (InterruptedException e) {
logger.trace("pingAndWait interrupted");
return new ZenPing.PingCollection();
} catch (ExecutionException e) {
logger.warn("Ping execution failed", e);
return new ZenPing.PingCollection();
}
}
우리는 먼저 Completable Future를 구성한 다음에 핑을 호출하여 다른 노드에 핑 요청을 보내고 response::complete의 리셋 함수를 기다릴 수 있습니다. (핑이 다른 요청을 받은 후에 마지막으로 accept 호출을 기다립니다.)그리고response의 get 방법이 결과를 되돌려 주기를 기다립니다.
이때 우리는 집단 내의 다른 노드에서 선거에 대한ping 요청에 대한 회답을 받았다.
현재 노드는master를 제기하지 않았습니다. 현재 노드의 선거 정보도 받은 요청의 집합에 넣었습니다. 이때 가입한 지 얼마 되지 않았기 때문에master는null입니다.
assert fullPingResponses.stream().map(ZenPing.PingResponse::node)
.filter(n -> n.equals(localNode)).findAny().isPresent() == false;
fullPingResponses.add(new ZenPing.PingResponse(localNode, null, this.clusterState()));
물론 이때 마스터 노드가 선출되지 않았다.
final List pingResponses = filterPingResponses(fullPingResponses, masterElectionIgnoreNonMasters, logger);
static List filterPingResponses(List fullPingResponses, boolean masterElectionIgnoreNonMasters, Logger logger) {
List pingResponses;
if (masterElectionIgnoreNonMasters) {
pingResponses = fullPingResponses.stream().filter(ping -> ping.node().isMasterNode()).collect(Collectors.toList());
} else {
pingResponses = fullPingResponses;
}
if (logger.isDebugEnabled()) {
StringBuilder sb = new StringBuilder();
if (pingResponses.isEmpty()) {
sb.append(" {none}");
} else {
for (ZenPing.PingResponse pingResponse : pingResponses) {
sb.append("
\t--> ").append(pingResponse);
}
}
logger.debug("filtered ping responses: (ignore_non_masters [{}]){}", masterElectionIgnoreNonMasters, sb);
}
return pingResponses;
}
이때 master Election Ignore NonMasters는 기본적으로false이고 ping Responses는 원래의 집합입니다.
List activeMasters = new ArrayList<>();
for (ZenPing.PingResponse pingResponse : pingResponses) {
// We can't include the local node in pingMasters list, otherwise we may up electing ourselves without
// any check / verifications from other nodes in ZenDiscover#innerJoinCluster()
if (pingResponse.master() != null && !localNode.equals(pingResponse.master())) {
activeMasters.add(pingResponse.master());
}
}
// nodes discovered during pinging
List masterCandidates = new ArrayList<>();
for (ZenPing.PingResponse pingResponse : pingResponses) {
if (pingResponse.node().isMasterNode()) {
masterCandidates.add(new ElectMasterService.MasterCandidate(pingResponse.node(), pingResponse.getClusterStateVersion()));
}
}
ping에서 요청한 모든 노드의response를 훑어보고 현재 노드가 아닌 모든 마스터 노드를 꺼내서 activeMasters에 추가합니다.다시 한 번 반복하여 모든 속성master를true의 노드로 후보수 그룹masterCandidates에 추가합니다.
만약 우리가 얻은 active Masters가 비어 있다면, 이 집단은 아직 master를 선택하지 않았다는 것을 설명한다.현재 후보의 집합 크기가 최소 선거를 시작하는 노드 개수보다 큰지 판단하고 크면 electMaster의 electMaster 방법을 호출하여 후보 수조 master Candidates에 전송하여 현재 노드 투표의 노드 winner를 얻어서 되돌려줍니다.
if (activeMasters.isEmpty()) {
if (electMaster.hasEnoughCandidates(masterCandidates)) {
final ElectMasterService.MasterCandidate winner = electMaster.electMaster(masterCandidates);
logger.trace("candidate {} won election", winner);
return winner.getNode();
} else {
// if we don't have enough master nodes, we bail, because there are not enough master to elect from
logger.warn("not enough master nodes discovered during pinging (found [{}], but needed [{}]), pinging again",
masterCandidates, electMaster.minimumMasterNodes());
return null;
}
}
저희가 electMaster의 실현을 볼 수 있어요.
public MasterCandidate electMaster(Collection candidates) {
assert hasEnoughCandidates(candidates);
List sortedCandidates = new ArrayList<>(candidates);
sortedCandidates.sort(MasterCandidate::compare);
return sortedCandidates.get(0);
}
public static int compare(MasterCandidate c1, MasterCandidate c2) {
// we explicitly swap c1 and c2 here. the code expects "better" is lower in a sorted
// list, so if c2 has a higher cluster state version, it needs to come first.
int ret = Long.compare(c2.clusterStateVersion, c1.clusterStateVersion);
if (ret == 0) {
ret = compareNodes(c1.getNode(), c2.getNode());
}
return ret;
}
먼저 버전 비교를 통해 같은 상황에서 노드의 id를 비교하고 후보 집중 버전이 가장 작고 id가 가장 작은 노드를 후보 마스터로 선택한다.
만약 우리가 얻은 active Masters가 비어 있지 않다면, 이 집단 내에 이미 master 노드가 선출되었다는 것을 설명한다. 그러면 번호가 가장 작은 master를 현재 노드 투표 후보로 직접 선택한다.
assert !activeMasters.contains(localNode) : "local node should never be elected as master when other nodes indicate an active master";
// lets tie break between discovered nodes
return electMaster.tieBreakActiveMasters(activeMasters);
public DiscoveryNode tieBreakActiveMasters(Collection activeMasters) {
return activeMasters.stream().min(ElectMasterService::compareNodes).get();
}
지금까지 본 노드에서 투표를 지지하는 임시 마스터를 선출한 셈이다.
innerJoinCluster 함수로 돌아가서 선택한 임시 마스터가 이 노드일 경우 설정에서 최소 지원 수인 requiredJoins를 가져옵니다. 만약 투표 지원이 requiredJoins보다 크면 이 노드가 마스터가 됩니다.
if (transportService.getLocalNode().equals(masterNode)) {
final int requiredJoins = Math.max(0, electMaster.minimumMasterNodes() - 1); // we count as one
logger.debug("elected as master, waiting for incoming joins ([{}] needed)", requiredJoins);
nodeJoinController.waitToBeElectedAsMaster(requiredJoins, masterElectionWaitForJoinsTimeout,
new NodeJoinController.ElectionCallback() {
@Override
public void onElectedAsMaster(ClusterState state) {
synchronized (stateMutex) {
joinThreadControl.markThreadAsDone(currentThread);
}
}
@Override
public void onFailure(Throwable t) {
logger.trace("failed while waiting for nodes to join, rejoining", t);
synchronized (stateMutex) {
joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
}
}
}
);
}
여기에서wait To Be Elected AsMaster () 방법을 호출하고 리셋 함수를 전송한 것을 볼 수 있습니다.wait To Be Elected AsMaster () 방법 방법에서, 선생은 condown Latch가 되어 다른 노드의 join 투표를 막는 데 사용되며, 리셋 함수callback을 만들어서 노드가 정식으로 마스터 노드가 되는 절차를 완성하는 데 사용됩니다.
final CountDownLatch done = new CountDownLatch(1);
final ElectionCallback wrapperCallback = new ElectionCallback() {
@Override
public void onElectedAsMaster(ClusterState state) {
done.countDown();
callback.onElectedAsMaster(state);
}
@Override
public void onFailure(Throwable t) {
done.countDown();
callback.onFailure(t);
}
};
설정이 필요한 데이터를 설정한 후에 막혔다.
synchronized (this) {
assert electionContext != null : "waitToBeElectedAsMaster is called we are not accumulating joins";
myElectionContext = electionContext;
electionContext.onAttemptToBeElected(requiredMasterJoins, wrapperCallback);
checkPendingJoinsAndElectIfNeeded();
}
try {
if (done.await(timeValue.millis(), TimeUnit.MILLISECONDS)) {
// callback handles everything
return;
}
} catch (InterruptedException e) {
}
if (logger.isTraceEnabled()) {
final int pendingNodes = myElectionContext.getPendingMasterJoinsCount();
logger.trace("timed out waiting to be elected. waited [{}]. pending master node joins [{}]", timeValue, pendingNodes);
}
failContextIfNeeded(myElectionContext, "timed out waiting to be elected");
Count Down Latch가 await에 들어가서 시간 제한 내에 다른 노드의 투표를 기다립니다.논리는 여기까지. 일단 멈춰.라인이 여기에 멈추어 다른 노드의join 지원 정보를 기다리고 있다면 어떻게 받았을까요?
this.membership = new MembershipAction(settings, transportService, new MembershipListener(), onJoinValidators);
우리는 ZenDiscovery의 구조 함수에서 Membership Action 실례를 구성했는데 그 중에서 "internal:discovery/zen/join"url에 대응하는 JoinRequestHandler 함수를 등록했다
public MembershipAction(Settings settings, TransportService transportService, MembershipListener listener,
Collection> joinValidators) {
super(settings);
this.transportService = transportService;
this.listener = listener;
transportService.registerRequestHandler(DISCOVERY_JOIN_ACTION_NAME, JoinRequest::new,
ThreadPool.Names.GENERIC, new JoinRequestRequestHandler());
transportService.registerRequestHandler(DISCOVERY_JOIN_VALIDATE_ACTION_NAME,
() -> new ValidateJoinRequest(), ThreadPool.Names.GENERIC,
new ValidateJoinRequestRequestHandler(transportService::getLocalNode, joinValidators));
transportService.registerRequestHandler(DISCOVERY_LEAVE_ACTION_NAME, LeaveRequest::new,
ThreadPool.Names.GENERIC, new LeaveRequestRequestHandler());
}
public static final String DISCOVERY_JOIN_ACTION_NAME = "internal:discovery/zen/join";
Join Request Handler의 논리를 보겠습니다.
private class JoinRequestRequestHandler implements TransportRequestHandler {
@Override
public void messageReceived(final JoinRequest request, final TransportChannel channel, Task task) throws Exception {
listener.onJoin(request.node, new JoinCallback() {
@Override
public void onSuccess() {
try {
channel.sendResponse(TransportResponse.Empty.INSTANCE);
} catch (Exception e) {
onFailure(e);
}
}
@Override
public void onFailure(Exception e) {
try {
channel.sendResponse(e);
} catch (Exception inner) {
inner.addSuppressed(e);
logger.warn("failed to send back failure on join request", inner);
}
}
});
}
}
보시다시피 Membership은 다른 노드에서join 요청을 받은 후 MembershipListener의onJoin 방법을 호출하여 리셋 함수를 전송하고 성공하면 노드에sendResponse,Empty를 전송합니다.Membership Listener의 onJoin 방법을 살펴보겠습니다.
private class MembershipListener implements MembershipAction.MembershipListener {
@Override
public void onJoin(DiscoveryNode node, MembershipAction.JoinCallback callback) {
handleJoinRequest(node, ZenDiscovery.this.clusterState(), callback);
}
@Override
public void onLeave(DiscoveryNode node) {
handleLeaveRequest(node);
}
}
void handleJoinRequest(final DiscoveryNode node, final ClusterState state, final MembershipAction.JoinCallback callback) {
if (nodeJoinController == null) {
throw new IllegalStateException("discovery module is not yet started");
} else {
// we do this in a couple of places including the cluster update thread. This one here is really just best effort
// to ensure we fail as fast as possible.
onJoinValidators.stream().forEach(a -> a.accept(node, state));
if (state.getBlocks().hasGlobalBlock(STATE_NOT_RECOVERED_BLOCK) == false) {
MembershipAction.ensureMajorVersionBarrier(node.getVersion(), state.getNodes().getMinNodeVersion());
}
// try and connect to the node, if it fails, we can raise an exception back to the client...
transportService.connectToNode(node);
// validate the join request, will throw a failure if it fails, which will get back to the
// node calling the join request
try {
membership.sendValidateJoinRequestBlocking(node, state, joinTimeout);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("failed to validate incoming join request from node [{}]", node),
e);
callback.onFailure(new IllegalStateException("failure when sending a validation request to node", e));
return;
}
nodeJoinController.handleJoinRequest(node, callback);
}
}
handleJoinRequest 방법을 호출하여 다른 노드의 join 요청을 처리하는 것을 볼 수 있습니다.계속 따라와.
public synchronized void handleJoinRequest(final DiscoveryNode node, final MembershipAction.JoinCallback callback) {
if (electionContext != null) {
electionContext.addIncomingJoin(node, callback);
checkPendingJoinsAndElectIfNeeded();
} else {
masterService.submitStateUpdateTask("zen-disco-node-join",
node, ClusterStateTaskConfig.build(Priority.URGENT),
joinTaskExecutor, new JoinTaskListener(callback, logger));
}
}
join 요청이 있을 때마다 electionContext는 join 요청을 보내는 노드를 통계하여 join Request Accumulator의 맵에 추가합니다.그리고 checkPendingJoins And Elect IfNeeded () 를 호출하여 join이 지원하는 node 수가 이 노드를 마스터로 지원할 수 있는지 판단합니다.
private synchronized void checkPendingJoinsAndElectIfNeeded() {
assert electionContext != null : "election check requested but no active context";
final int pendingMasterJoins = electionContext.getPendingMasterJoinsCount();
if (electionContext.isEnoughPendingJoins(pendingMasterJoins) == false) {
if (logger.isTraceEnabled()) {
logger.trace("not enough joins for election. Got [{}], required [{}]", pendingMasterJoins,
electionContext.requiredMasterJoins);
}
} else {
if (logger.isTraceEnabled()) {
logger.trace("have enough joins for election. Got [{}], required [{}]", pendingMasterJoins,
electionContext.requiredMasterJoins);
}
electionContext.closeAndBecomeMaster();
electionContext = null; // clear this out so future joins won't be accumulated
}
}
getPendingMasterJoinsCount를 통해joinRequestAccumulator의 수량(즉 이 노드가 마스터가 될 수 있도록 지원하는 노드 집합)을 집계합니다. 설정의 최소 지원수인requiredJoins보다 크면 이 노드는closeAndBecomeMaster를 호출하여 마스터 노드가 되고 electionContext를 닫습니다.리셋 함수, 처리 스레드 등을 되돌려줍니다.
public boolean markThreadAsDone(Thread joinThread) {
assert Thread.holdsLock(stateMutex);
return currentJoinThread.compareAndSet(joinThread, null);
}
만약 기다린 시간 내에 충분한 투표를 받지 못했다면 이 노드의 선거 실패를 설명합니다. 이것은 리셋 함수로 돌아가는 것입니다. onFailure 지점입니다. markThread AsDone And Start New () 를 호출하여 현재 라인을 닫고 start New Thread If Not Running () 방법에서 다음 순환을 시작하여 상기 선거의 절차를 계속합니다.
public void markThreadAsDoneAndStartNew(Thread joinThread) {
assert Thread.holdsLock(stateMutex);
if (!markThreadAsDone(joinThread)) {
return;
}
startNewThreadIfNotRunning();
}
innerJoinCluster 방법으로 돌아가서 현재 노드가 마스터 노드가 아니라면 선택한 임시 마스터 노드에join 투표 요청을 보냅니다.
nodeJoinController.stopElectionContext(masterNode + " elected");
// send join request
final boolean success = joinElectedMaster(masterNode);
synchronized (stateMutex) {
if (success) {
DiscoveryNode currentMasterNode = this.clusterState().getNodes().getMasterNode();
if (currentMasterNode == null) {
// Post 1.3.0, the master should publish a new cluster state before acking our join request. we now should have
// a valid master.
logger.debug("no master node is set, despite of join request completing. retrying pings.");
joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
} else if (currentMasterNode.equals(masterNode) == false) {
// update cluster state
joinThreadControl.stopRunningThreadAndRejoin("master_switched_while_finalizing_join");
}
joinThreadControl.markThreadAsDone(currentThread);
} else {
// failed to join. Try again...
joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
}
}
join ElectedMaster () 방법을 통해 선택한 마스터가 된 노드에 자신의 투표를 보냅니다.
private boolean joinElectedMaster(DiscoveryNode masterNode) {
try {
// first, make sure we can connect to the master
transportService.connectToNode(masterNode);
} catch (Exception e) {
logger.warn(() -> new ParameterizedMessage("failed to connect to master [{}], retrying...", masterNode), e);
return false;
}
int joinAttempt = 0; // we retry on illegal state if the master is not yet ready
while (true) {
try {
logger.trace("joining master {}", masterNode);
membership.sendJoinRequestBlocking(masterNode, transportService.getLocalNode(), joinTimeout);
return true;
} catch (Exception e) {
final Throwable unwrap = ExceptionsHelper.unwrapCause(e);
if (unwrap instanceof NotMasterException) {
if (++joinAttempt == this.joinRetryAttempts) {
logger.info("failed to send join request to master [{}], reason [{}], tried [{}] times", masterNode, ExceptionsHelper.detailedMessage(e), joinAttempt);
return false;
} else {
logger.trace("master {} failed with [{}]. retrying... (attempts done: [{}])", masterNode, ExceptionsHelper.detailedMessage(e), joinAttempt);
}
} else {
if (logger.isTraceEnabled()) {
logger.trace(() -> new ParameterizedMessage("failed to send join request to master [{}]", masterNode), e);
} else {
logger.info("failed to send join request to master [{}], reason [{}]", masterNode, ExceptionsHelper.detailedMessage(e));
}
return false;
}
}
try {
Thread.sleep(this.joinRetryDelay.millis());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
마스터 노드를 연결하고join 요청을 보내면 URL은 자연히'discovery/zen/join'입니다.만약 수차례 요청 후, 매번 sleep를 요청할 때마다matser 노드를 선출하였다면, 이번 선거는 완성됩니다.만약 선거에 성공하지 못한 노드가 마스터 투표의 절반을 초과하거나 여러 가지 이유로 닫히지 않으면false로 돌아갑니다.
markThreadAsDone And StartNew를 호출하여 선거에 다시 참여합니다.
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Logstash 연결 kafka 출력 오류: Error registering plugin오류 메시지가 매우 길기 때문에 복사하여 분석하면 구체적인 오류를 찾을 수 있다. 일반적으로 설정 문제이다. 이 오류 정보는 es의 출력hosts에 문제가 있다는 것을 발견할 수 있고 수정하면 된다....
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.