Elasticsearch 5.x 소스 코드 분석(4) 검색 프로토콜 ZenDiscovery
ZenDiscovery는 기본적으로 구현됩니다. 이번에는 ZenDiscovery의 원본을 통학해서 프로토콜이 어떻게 작동하는지 배우고 싶습니다.ZenDiscovery 논리에서 가장 중요한 역할을 하는 클래스는 다음과 같습니다. 또한 이 모듈의 몇 가지 기본 기능도 포함됩니다.
ZenDiscovery
에 있는 변수 이름은 publishClusterState
이다. 이전에 말했듯이 이것들**Action
은 모두 **Service
에 대한 봉인이기 때문에 이것은 주로 송신 사건과 사건을 처리하는 인터페이스이다. 예를 들어 송신clusterStateChangeEvent
과 이 이벤트를 처리하는 것은 모두 이런 종류를 통해 호출된다연결
노드에서.자바
start()
에서 디스커버리 코드는 네 줄입니다. Discovery discovery = injector.getInstance(Discovery.class);
clusterService.getMasterService().setClusterStatePublisher(discovery::publish);
// start after transport service so the local disco is known
discovery.start(); // start before cluster service so that it can set initial state on ClusterApplierService
discovery.startInitialJoin();
여기서 Discovery의 실례는 Disdcovery Module의 suppiler에서 제공합니다.
discoveryTypes.put("zen",
() -> new ZenDiscovery(settings, threadPool, transportService, namedWriteableRegistry, masterService, clusterApplier,
clusterSettings, hostsProvider, allocationService));
discoveryTypes.put("tribe", () -> new TribeDiscovery(settings, transportService, masterService, clusterApplier));
discoveryTypes.put("single-node", () -> new SingleNodeDiscovery(settings, transportService, masterService, clusterApplier));
tribe는clusters 간 통신이 필요한 모델을 구축할 때의 유형입니다.
discoverystart()
사실은 간단하게 변수를 초기화하는 것일 뿐이다. 진정으로 일을 하는 것은 마지막 문장startInitialJoin()
방법이다. 이것은 계속 startNewThredIfNotRunning()
을 조정하고 이어서 하나의 라인을 시작하여 실행한다innerJoinCluster()
. 여기에서 언급한 바와 같이 매번join은 한 라인까지만 실행할 수 있기 때문에 여기서 자물쇠를 채울 뿐만 아니라 한 라인만 실행할 수 있는지도 판단할 수 있다.innerJoinCluster()
중에서 처음에 해야 할 일은 당연히 findMaster()
즉, 하나의 Node가 시작되면'조직'을 찾아야 한다는 것이다. private DiscoveryNode findMaster() {
logger.trace("starting to ping");
List fullPingResponses = pingAndWait(pingTimeout).toList();
...
...
private ZenPing.PingCollection pingAndWait(TimeValue timeout) {
final CompletableFuture response = new CompletableFuture<>();
try {
zenPing.ping(response::complete, timeout);
} catch (Exception ex) {
// logged later
response.completeExceptionally(ex);
}
try {
return response.get();
} catch (InterruptedException e) {
logger.trace("pingAndWait interrupted");
return new ZenPing.PingCollection();
} catch (ExecutionException e) {
logger.warn("Ping execution failed", e);
return new ZenPing.PingCollection();
}
}
첫 번째 중요한 대장
zenPing
이 등장하기 시작했는데response::complete
과response.get
두 마디로 대체적으로 안에서 다른 걸음으로 요청을 한 무더기 할 수 있음을 짐작할 수 있고 메인 라인은 response를 기다리고 있다.다음은 핑의 구체적인 논리를 보도록 하겠습니다. final List seedNodes;
try {
seedNodes = resolveHostsLists(
unicastZenPingExecutorService,
logger,
configuredHosts,
limitPortCounts,
transportService,
UNICAST_NODE_PREFIX,
resolveTimeout);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
seedNodes.addAll(hostsProvider.buildDynamicNodes());
final DiscoveryNodes nodes = contextProvider.clusterState().nodes();
// add all possible master nodes that were active in the last known cluster configuration
for (ObjectCursor masterNode : nodes.getMasterNodes().values()) {
seedNodes.add(masterNode.value);
}
sendPing에 앞서
seedNodes
확인이 필요합니다. 세 군데에서 저희가 설정한 discovery.zen.ping.unicast.hosts
목록을 가져옵니다.hostsProvider.buildDynamicNodes()
(이것은 내가 아직 그것이 무엇을 하는지 모르니 아는 대로 알아봐라.그리고 이 실례가 있습니다. // create tasks to submit to the executor service; we will wait up to resolveTimeout for these tasks to complete
final List> callables =
hosts
.stream()
.map(hn -> (Callable) () -> transportService.addressesFromString(hn, limitPortCounts))
.collect(Collectors.toList());
final List> futures =
executorService.invokeAll(callables, resolveTimeout.nanos(), TimeUnit.NANOSECONDS);
final List discoveryNodes = new ArrayList<>();
final Set localAddresses = new HashSet<>();
localAddresses.add(transportService.boundAddress().publishAddress());
localAddresses.addAll(Arrays.asList(transportService.boundAddress().boundAddresses()));
// ExecutorService#invokeAll guarantees that the futures are returned in the iteration order of the tasks so we can associate the
// hostname with the corresponding task by iterating together
final Iterator it = hosts.iterator();
for (final Future future : futures) {
final String hostname = it.next();
if (!future.isCancelled()) {
assert future.isDone();
try {
final TransportAddress[] addresses = future.get();
logger.trace("resolved host [{}] to {}", hostname, addresses);
for (int addressId = 0; addressId < addresses.length; addressId++) {
final TransportAddress address = addresses[addressId];
// no point in pinging ourselves
if (localAddresses.contains(address) == false) {
discoveryNodes.add(
new DiscoveryNode(
nodeId_prefix + hostname + "_" + addressId + "#",
address,
emptyMap(),
emptySet(),
Version.CURRENT.minimumCompatibilityVersion()));
}
}
} catch (final ExecutionException e) {
assert e.getCause() != null;
final String message = "failed to resolve host [" + hostname + "]";
logger.warn(message, e.getCause());
}
} else {
logger.warn("timed out after [{}] resolving host [{}]", resolveTimeout, hostname);
}
}
resolveHostsLists
는 배합된 유니캐스트의 주소 목록을 모두TransportService
로 구성DiscoveryNode
하여 되돌려주는 것입니다. 여기에 비동기future를 사용해서 읽었습니다. 누군가가 도메인 주소를 맞추는 데 시간이 오래 걸릴까 봐 걱정하는 것 같습니다.해석이 그렇게 오래 걸리면 이렇게 어울리면 너무 아프지 않아요???seed Nodes를 받은 후에 연결을 시작해야 합니다. 여기는 PingRound라는 클래스를 만들어서 통계하고 각각 schedule Duration의 0, 1/3, 2/3시에 sendPing 조작을 시작합니다. final ConnectionProfile connectionProfile =
ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, requestDuration, requestDuration);
final PingingRound pingingRound = new PingingRound(pingingRoundIdGenerator.incrementAndGet(), seedNodes, resultsConsumer,
nodes.getLocalNode(), connectionProfile);
activePingingRounds.put(pingingRound.id(), pingingRound);
final AbstractRunnable pingSender = new AbstractRunnable() {
@Override
public void onFailure(Exception e) {
if (e instanceof AlreadyClosedException == false) {
logger.warn("unexpected error while pinging", e);
}
}
@Override
protected void doRun() throws Exception {
sendPings(requestDuration, pingingRound);
}
};
threadPool.generic().execute(pingSender);
threadPool.schedule(TimeValue.timeValueMillis(scheduleDuration.millis() / 3), ThreadPool.Names.GENERIC, pingSender);
threadPool.schedule(TimeValue.timeValueMillis(scheduleDuration.millis() / 3 * 2), ThreadPool.Names.GENERIC, pingSender);
threadPool.schedule(scheduleDuration, ThreadPool.Names.GENERIC, new AbstractRunnable() {
@Override
protected void doRun() throws Exception {
finishPingingRound(pingingRound);
}
@Override
public void onFailure(Exception e) {
logger.warn("unexpected error while finishing pinging round", e);
}
});
여기서 주의할 점은 핑의 연결이 다른 것처럼transport 서비스로 긴 연결을 유지하는 것이 아니라 바로 건설하고 판매하는 연결이라는 것이다.마지막으로finishPingRound는 이 임시 연결을 삭제합니다.
마스터 선출
findMaster()
로 돌아가면 위의 핑이 끝난 후에 우리는 하나씩pingResponses
을 받았습니다. 여기에 필터 작업이 있습니다. 만약에 discovery.zen.master_election.ignore_non_master_pings
을 사용하면 그 노드를 사용할 것입니다.master = false 노드는 무시됩니다. // filter responses
final List pingResponses = filterPingResponses(fullPingResponses, masterElectionIgnoreNonMasters, logger);
이어 이들
pingResponse
에서 다른 노드의 현재 마스터 노드가 누구인지 수집해 최종activeMasters
의 후보 명단을 받고 자신을 제거해야 한다. 디스커버리의 전략적 시비는 마지막 순간까지 자신을 마스터로 선택하지 않고 뇌분열이 처음부터 발생하는 것을 예방할 수 있다. if (activeMasters.isEmpty()) {
if (electMaster.hasEnoughCandidates(masterCandidates)) {
final ElectMasterService.MasterCandidate winner = electMaster.electMaster(masterCandidates);
logger.trace("candidate {} won election", winner);
return winner.getNode();
} else {
// if we don't have enough master nodes, we bail, because there are not enough master to elect from
logger.warn("not enough master nodes discovered during pinging (found [{}], but needed [{}]), pinging again",
masterCandidates, electMaster.minimumMasterNodes());
return null;
}
} else {
assert !activeMasters.contains(localNode) : "local node should never be elected as master when other nodes indicate an active master";
// lets tie break between discovered nodes
return electMaster.tieBreakActiveMasters(activeMasters);
}
이어서 이 후보 목록에 대해 가장 이상적인 것은 목록이 1이라는 것이다. 이것은 당신이 현재 건강한 집단에 가입하고 있다는 것을 증명한다. 만약에 여러 개가 있다면 (정상적인 상황에서 여러 개가 없을 것이다. 만약 당신이 그것
discovery.zen.minimum_master_nodes
을 설정하지 않아서 많은 분치자군을 초래하지 않았다면) 목록에서 간단하게 id 번호가 가장 작은 것을 선택하면 (혼란스럽지 않다는 뜻) 된다.만약 목록이 비어 있다면 모두가 막 시작했을 때 선거 부분에 들어가서 선거 부분은 그 id가 가장 작은 것을 뽑는 것이다.이제 이거masterNode
는 정해졌어요. 만약에 이 마스터가 다른 사람이라면 간단하게 Join 요청을 보내주면 돼요. 만약에 선택한 마스터가 당신 자신이라면 또 중요한 일이 있어요. 그 discovery.zen.minimum_master_nodes
파라미터를 기억하세요? 보통 이 값을 당신의 집단의cluster 노드 수의 절반+1로 맞추어 뇌분열을 예방해야 해요. 현재 당신이 마스터를 선출하면그러면 너는 minimumMasterNodes() - 1
이렇게 많은 사람들이 와서 네가 마스터라는 것을 인정하고 기다려야 한다. 그러면 너야말로 진정한 마스터이고 선거가 끝난다. if (transportService.getLocalNode().equals(masterNode)) {
final int requiredJoins = Math.max(0, electMaster.minimumMasterNodes() - 1); // we count as one
logger.debug("elected as master, waiting for incoming joins ([{}] needed)", requiredJoins);
nodeJoinController.waitToBeElectedAsMaster(requiredJoins, masterElectionWaitForJoinsTimeout,
new NodeJoinController.ElectionCallback() {
@Override
public void onElectedAsMaster(ClusterState state) {
synchronized (stateMutex) {
joinThreadControl.markThreadAsDone(currentThread);
}
}
@Override
public void onFailure(Throwable t) {
logger.trace("failed while waiting for nodes to join, rejoining", t);
synchronized (stateMutex) {
joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
}
}
}
);
} else {
여기에도 Join의 시간 초과 설정을 기다리는 것이 있습니다. 시간 초과 후 수량의 Join 요청을 충족시키지 못하면 선거에 실패하고 새로운 선거가 필요합니다. Join을 수신하는 세부 사항을 보내면 더 이상 지나지 않습니다.선거 절차가 끝나면 clusterState 동기화가 시작됩니다.
MasterFaultDetection 및 NodeFaultDetection
선거 절차가 끝난 후 두 개의 중요한 작은 task가 작업을 시작했다. 각각
masterFaultDetection
과NodeFaultDetection
이다. 이 두 개의 task는 매우 간단하다. 하나의master를 보면 유일하게 다른 것은 node의 안에 저장된 것은cluster 안의 모든 nodes이다. if (masterToPing.equals(MasterFaultDetection.this.masterNode())) {
// we don't stop on disconnection from master, we keep pinging it
threadPool.schedule(pingInterval, ThreadPool.Names.SAME, MasterPinger.this);
}
findMaster()
안과 다른 것은 여기는 더 이상temp로 연결하지 않고threadPool 안의 긴 연결입니다. 여기는 오류를 분류합니다. 만약에 일부 업무 오류라면 시도 횟수의 제한을 받지 않습니다. 예를 들어 요청한 노드는 마스터 노드가 아니며 요청한 마스터는 자신의 cluster 등이 아닙니다. 직접 호출notifyMasterFailure
합니다. 일반적인 오류라면 시도 횟수를 기록합니다. 오류 횟수가 한도값을 초과하면리셋 notifyMasterFailure
을 호출합니다.private void handleMasterGone(final DiscoveryNode masterNode, final Throwable cause, final String reason) {
if (lifecycleState() != Lifecycle.State.STARTED) {
// not started, ignore a master failure
return;
}
if (localNodeMaster()) {
// we might get this on both a master telling us shutting down, and then the disconnect failure
return;
}
logger.info((Supplier>) () -> new ParameterizedMessage("master_left [{}], reason [{}]", masterNode, reason), cause);
synchronized (stateMutex) {
if (localNodeMaster() == false && masterNode.equals(committedState.get().nodes().getMasterNode())) {
// flush any pending cluster states from old master, so it will not be set as master again
pendingStatesQueue.failAllStatesAndClear(new ElasticsearchException("master left [{}]", reason));
rejoin("master left (reason = " + reason + ")");
}
}
}
ZenDiscovery의 콜백 방법은 최종적으로 다시
rejoin()
프로세스에 들어갑니다.Cluster 상태 업데이트
자, 집단도 구축되었고,master도 선택되었고,정시ping도 보장되었고,나머지는 마지막으로master가 어떻게clusterState를 모든 노드로 전송하는지입니다.노드에 있었던 기억이 나요.java에서 ZenDiscovery를 초기화할 때 clusterState의 발표 방법을 등록했습니다.
clusterService.getMasterService().setClusterStatePublisher(discovery::publish);
publish의 핵심 코드는
pendingStatesQueue.addPending(newState);
try {
publishClusterState.publish(clusterChangedEvent, electMaster.minimumMasterNodes(), ackListener);
} catch (FailedToCommitClusterStateException t) {
// cluster service logs a WARN message
logger.debug("failed to publish cluster state version [{}] (not enough nodes acknowledged, min master nodes [{}])",
newState.version(), electMaster.minimumMasterNodes());
synchronized (stateMutex) {
pendingStatesQueue.failAllStatesAndClear(
new ElasticsearchException("failed to publish cluster state"));
rejoin("zen-disco-failed-to-publish");
}
throw t;
}
final DiscoveryNode localNode = newState.getNodes().getLocalNode();
final CountDownLatch latch = new CountDownLatch(1);
final AtomicBoolean processedOrFailed = new AtomicBoolean();
pendingStatesQueue.markAsCommitted(newState.stateUUID(),
new PendingClusterStatesQueue.StateProcessedListener() {
@Override
public void onNewClusterStateProcessed() {
processedOrFailed.set(true);
latch.countDown();
ackListener.onNodeAck(localNode, null);
}
@Override
public void onNewClusterStateFailed(Exception e) {
processedOrFailed.set(true);
latch.countDown();
ackListener.onNodeAck(localNode, e);
logger.warn(
(org.apache.logging.log4j.util.Supplier>) () -> new ParameterizedMessage(
"failed while applying cluster state locally [{}]",
clusterChangedEvent.source()),
e);
}
});
pendingStatesQueue
는 제출할state를 저장하고 최신commit의state를 다른 요청에 제공합니다.발표clusterChangedEvent
는 PublishClusterStateAction
의 주요 논리적 방법innerPublish
에 맡겼다private void innerPublish(final ClusterChangedEvent clusterChangedEvent, final Set nodesToPublishTo,
final SendingController sendingController, final boolean sendFullVersion,
final Map serializedStates, final Map serializedDiffs) {
final ClusterState clusterState = clusterChangedEvent.state();
final ClusterState previousState = clusterChangedEvent.previousState();
final TimeValue publishTimeout = discoverySettings.getPublishTimeout();
final long publishingStartInNanos = System.nanoTime();
for (final DiscoveryNode node : nodesToPublishTo) {
// try and serialize the cluster state once (or per version), so we don't serialize it
// per node when we send it over the wire, compress it while we are at it...
// we don't send full version if node didn't exist in the previous version of cluster state
if (sendFullVersion || !previousState.nodes().nodeExists(node)) {
sendFullClusterState(clusterState, serializedStates, node, publishTimeout, sendingController);
} else {
sendClusterStateDiff(clusterState, serializedDiffs, serializedStates, node, publishTimeout, sendingController);
}
}
sendingController.waitForCommit(discoverySettings.getCommitTimeout());
try {
long timeLeftInNanos = Math.max(0, publishTimeout.nanos() - (System.nanoTime() - publishingStartInNanos));
final BlockingClusterStatePublishResponseHandler publishResponseHandler = sendingController.getPublishResponseHandler();
sendingController.setPublishingTimedOut(!publishResponseHandler.awaitAllNodes(TimeValue.timeValueNanos(timeLeftInNanos)));
if (sendingController.getPublishingTimedOut()) {
DiscoveryNode[] pendingNodes = publishResponseHandler.pendingNodes();
// everyone may have just responded
if (pendingNodes.length > 0) {
logger.warn("timed out waiting for all nodes to process published state [{}] (timeout [{}], pending nodes: {})",
clusterState.version(), publishTimeout, pendingNodes);
}
}
} catch (InterruptedException e) {
// ignore & restore interrupt
Thread.currentThread().interrupt();
}
}
ES2에서x 이후에 가까운 버전의 diff를 보내서 상태를 동기화하는 것을 지원합니다. 네트워크 대역폭을 절약하기 위해 ClusterState 클래스에 누르면 안에 있는 상태 정보량이 적지 않은 것을 발견할 수 있습니다. 그러나 diff는 당신의 버전과 현재의 최신 버전이 한 버전만 차이가 납니다. 만약에 1에서 3으로 넘어가려면 full 상태를 보내야 합니다.
sendFullClusterState
과sendClusterStateDiff
는 모두 하부transport 서비스를 호출하여 진정으로 상태를 전송하고 상태 기록은 하나sendingController
를 통해 유지보수한다. ack를 받지 못하거나timeout은 controller로 하여금 minMasterNodes-1
에 도달했는지 확인하게 하고, 도달하면 이번 상태는commited를 전송하고 나머지 상황은 모두 틀리게 한다.여기서 반드시 주의해야 할 것은 Publish 상태는 두 단계로 나뉘는데 우선sendNotification이다
private void sendClusterStateToNode(final ClusterState clusterState, BytesReference bytes,
final DiscoveryNode node,
final TimeValue publishTimeout,
final SendingController sendingController,
final boolean sendDiffs, final Map serializedStates) {
try {
// -> no need to put a timeout on the options here, because we want the response to eventually be received
// and not log an error if it arrives after the timeout
// -> no need to compress, we already compressed the bytes
TransportRequestOptions options = TransportRequestOptions.builder()
.withType(TransportRequestOptions.Type.STATE).withCompress(false).build();
transportService.sendRequest(node, SEND_ACTION_NAME,
new BytesTransportRequest(bytes, node.getVersion()),
options,
new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
@Override
public void handleResponse(TransportResponse.Empty response) {
if (sendingController.getPublishingTimedOut()) {
logger.debug("node {} responded for cluster state [{}] (took longer than [{}])", node,
clusterState.version(), publishTimeout);
}
sendingController.onNodeSendAck(node);
}
바로master가 먼저 모든 노드에게 이 상태를 보내고minMasterNodes가 이 알림을 확인하면master 노드가 이 상태mark를commited로 만들고
sendCommitToNode()
모든 노드에게commited이 상태를 알려준다.public synchronized void onNodeSendAck(DiscoveryNode node) {
if (committed) {
assert sendAckedBeforeCommit.isEmpty();
sendCommitToNode(node, clusterState, this);
} else if (committedOrFailed()) {
logger.trace("ignoring ack from [{}] for cluster state version [{}]. already failed", node, clusterState.version());
} else {
// we're still waiting
sendAckedBeforeCommit.add(node);
if (node.isMasterNode()) {
checkForCommitOrFailIfNoPending(node);
}
}
}
다른 Node에서 이 두 가지 소식을 처리하는 Handler도 이 종류에 있습니다. 관심 있는 것은 읽어볼 수 있습니다. 여기는 불과합니다.
transportService.registerRequestHandler(SEND_ACTION_NAME, BytesTransportRequest::new, ThreadPool.Names.SAME, false, false,
new SendClusterStateRequestHandler());
transportService.registerRequestHandler(COMMIT_ACTION_NAME, CommitClusterStateRequest::new, ThreadPool.Names.SAME, false, false,
new CommitClusterStateRequestHandler());
이로써 Elasticsearch의 전체 발견 프로토콜과 상태 업데이트 절차는 끝났다.문제가 있으면 교류를 환영합니다.
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
다양한 언어의 JSONJSON은 Javascript 표기법을 사용하여 데이터 구조를 레이아웃하는 데이터 형식입니다. 그러나 Javascript가 코드에서 이러한 구조를 나타낼 수 있는 유일한 언어는 아닙니다. 저는 일반적으로 '객체'{}...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.