Elasticsearch 5.x 소스 코드 분석(4) 검색 프로토콜 ZenDiscovery

24527 단어
Elasticsearch의 발견 모듈은 Elasticsearch가 시작하고 정상적으로 작동하는 것을 보장하는 가장 기본적인 모듈이라고 할 수 있다. 예를 들어 하나의 실례를 시작한 후에 가장 기본적인'조직'가입조차 실패하면 서비스를 제공할 수 없다는 것을 이해할 수 있다.Elasticsearch의 Discovery Module에는 다음과 같은 몇 가지 기능이 있습니다.
  • Azure Classic Discovery
  • EC2 Discovery
  • Google Compute Engine Discovery
  • Zen Discovery

  • ZenDiscovery는 기본적으로 구현됩니다. 이번에는 ZenDiscovery의 원본을 통학해서 프로토콜이 어떻게 작동하는지 배우고 싶습니다.ZenDiscovery 논리에서 가장 중요한 역할을 하는 클래스는 다음과 같습니다. 또한 이 모듈의 몇 가지 기본 기능도 포함됩니다.
  • ZenDiscovery.java 모듈의 메인 클래스이자 이 모듈을 시작하는 입구입니다. Node.java 호출 및 초기화, 거의 모든 발견 프로토콜의 논리를 포함하고 고도의 분류
  • UnicastZenPing.java는 ZenPing 구현 클래스로 주로 밑바닥과 다른 Nodes가 연결을 구축하고 유지하는 임무를 담당한다
  • PublishClusterStateAction.자바가 ZenDiscovery에 있는 변수 이름은 publishClusterState이다. 이전에 말했듯이 이것들**Action은 모두 **Service에 대한 봉인이기 때문에 이것은 주로 송신 사건과 사건을 처리하는 인터페이스이다. 예를 들어 송신clusterStateChangeEvent과 이 이벤트를 처리하는 것은 모두 이런 종류를 통해 호출된다
  • MasterFaultDetection.javacluster 구축 후 모든 node는master의 생존 상태를 측정하는 클래스
  • NodeFaultDetection.javacluster 구축 후 마스터는 다른 node의 생존 상태를 측정하는 클래스
  • 연결


    노드에서.자바 start() 에서 디스커버리 코드는 네 줄입니다.
      Discovery discovery = injector.getInstance(Discovery.class);
      clusterService.getMasterService().setClusterStatePublisher(discovery::publish);
      // start after transport service so the local disco is known
      discovery.start(); // start before cluster service so that it can set initial state on ClusterApplierService
      discovery.startInitialJoin();   
    

    여기서 Discovery의 실례는 Disdcovery Module의 suppiler에서 제공합니다.
            discoveryTypes.put("zen",
                () -> new ZenDiscovery(settings, threadPool, transportService, namedWriteableRegistry, masterService, clusterApplier,
                    clusterSettings, hostsProvider, allocationService));
            discoveryTypes.put("tribe", () -> new TribeDiscovery(settings, transportService, masterService, clusterApplier));
            discoveryTypes.put("single-node", () -> new SingleNodeDiscovery(settings, transportService, masterService, clusterApplier));
    

    tribe는clusters 간 통신이 필요한 모델을 구축할 때의 유형입니다.discoverystart() 사실은 간단하게 변수를 초기화하는 것일 뿐이다. 진정으로 일을 하는 것은 마지막 문장startInitialJoin() 방법이다. 이것은 계속 startNewThredIfNotRunning()을 조정하고 이어서 하나의 라인을 시작하여 실행한다innerJoinCluster(). 여기에서 언급한 바와 같이 매번join은 한 라인까지만 실행할 수 있기 때문에 여기서 자물쇠를 채울 뿐만 아니라 한 라인만 실행할 수 있는지도 판단할 수 있다.innerJoinCluster() 중에서 처음에 해야 할 일은 당연히 findMaster() 즉, 하나의 Node가 시작되면'조직'을 찾아야 한다는 것이다.
       private DiscoveryNode findMaster() {
            logger.trace("starting to ping");
            List fullPingResponses = pingAndWait(pingTimeout).toList();
    ...
    ...
       private ZenPing.PingCollection pingAndWait(TimeValue timeout) {
            final CompletableFuture response = new CompletableFuture<>();
            try {
                zenPing.ping(response::complete, timeout);
            } catch (Exception ex) {
                // logged later
                response.completeExceptionally(ex);
            }
    
            try {
                return response.get();
            } catch (InterruptedException e) {
                logger.trace("pingAndWait interrupted");
                return new ZenPing.PingCollection();
            } catch (ExecutionException e) {
                logger.warn("Ping execution failed", e);
                return new ZenPing.PingCollection();
            }
        }
    

    첫 번째 중요한 대장zenPing이 등장하기 시작했는데response::completeresponse.get 두 마디로 대체적으로 안에서 다른 걸음으로 요청을 한 무더기 할 수 있음을 짐작할 수 있고 메인 라인은 response를 기다리고 있다.다음은 핑의 구체적인 논리를 보도록 하겠습니다.
            final List seedNodes;
            try {
                seedNodes = resolveHostsLists(
                    unicastZenPingExecutorService,
                    logger,
                    configuredHosts,
                    limitPortCounts,
                    transportService,
                    UNICAST_NODE_PREFIX,
                    resolveTimeout);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
            seedNodes.addAll(hostsProvider.buildDynamicNodes());
            final DiscoveryNodes nodes = contextProvider.clusterState().nodes();
            // add all possible master nodes that were active in the last known cluster configuration
            for (ObjectCursor masterNode : nodes.getMasterNodes().values()) {
                seedNodes.add(masterNode.value);
            }
    

    sendPing에 앞서 seedNodes 확인이 필요합니다. 세 군데에서 저희가 설정한 discovery.zen.ping.unicast.hosts 목록을 가져옵니다.hostsProvider.buildDynamicNodes()(이것은 내가 아직 그것이 무엇을 하는지 모르니 아는 대로 알아봐라.그리고 이 실례가 있습니다.
            // create tasks to submit to the executor service; we will wait up to resolveTimeout for these tasks to complete
            final List> callables =
                hosts
                    .stream()
                    .map(hn -> (Callable) () -> transportService.addressesFromString(hn, limitPortCounts))
                    .collect(Collectors.toList());
            final List> futures =
                executorService.invokeAll(callables, resolveTimeout.nanos(), TimeUnit.NANOSECONDS);
            final List discoveryNodes = new ArrayList<>();
            final Set localAddresses = new HashSet<>();
            localAddresses.add(transportService.boundAddress().publishAddress());
            localAddresses.addAll(Arrays.asList(transportService.boundAddress().boundAddresses()));
            // ExecutorService#invokeAll guarantees that the futures are returned in the iteration order of the tasks so we can associate the
            // hostname with the corresponding task by iterating together
            final Iterator it = hosts.iterator();
            for (final Future future : futures) {
                final String hostname = it.next();
                if (!future.isCancelled()) {
                    assert future.isDone();
                    try {
                        final TransportAddress[] addresses = future.get();
                        logger.trace("resolved host [{}] to {}", hostname, addresses);
                        for (int addressId = 0; addressId < addresses.length; addressId++) {
                            final TransportAddress address = addresses[addressId];
                            // no point in pinging ourselves
                            if (localAddresses.contains(address) == false) {
                                discoveryNodes.add(
                                    new DiscoveryNode(
                                        nodeId_prefix + hostname + "_" + addressId + "#",
                                        address,
                                        emptyMap(),
                                        emptySet(),
                                        Version.CURRENT.minimumCompatibilityVersion()));
                            }
                        }
                    } catch (final ExecutionException e) {
                        assert e.getCause() != null;
                        final String message = "failed to resolve host [" + hostname + "]";
                        logger.warn(message, e.getCause());
                    }
                } else {
                    logger.warn("timed out after [{}] resolving host [{}]", resolveTimeout, hostname);
                }
            }
    
    resolveHostsLists는 배합된 유니캐스트의 주소 목록을 모두TransportService로 구성DiscoveryNode하여 되돌려주는 것입니다. 여기에 비동기future를 사용해서 읽었습니다. 누군가가 도메인 주소를 맞추는 데 시간이 오래 걸릴까 봐 걱정하는 것 같습니다.해석이 그렇게 오래 걸리면 이렇게 어울리면 너무 아프지 않아요???seed Nodes를 받은 후에 연결을 시작해야 합니다. 여기는 PingRound라는 클래스를 만들어서 통계하고 각각 schedule Duration의 0, 1/3, 2/3시에 sendPing 조작을 시작합니다.
            final ConnectionProfile connectionProfile =
                ConnectionProfile.buildSingleChannelProfile(TransportRequestOptions.Type.REG, requestDuration, requestDuration);
            final PingingRound pingingRound = new PingingRound(pingingRoundIdGenerator.incrementAndGet(), seedNodes, resultsConsumer,
                nodes.getLocalNode(), connectionProfile);
            activePingingRounds.put(pingingRound.id(), pingingRound);
            final AbstractRunnable pingSender = new AbstractRunnable() {
                @Override
                public void onFailure(Exception e) {
                    if (e instanceof AlreadyClosedException == false) {
                        logger.warn("unexpected error while pinging", e);
                    }
                }
    
                @Override
                protected void doRun() throws Exception {
                    sendPings(requestDuration, pingingRound);
                }
            };
            threadPool.generic().execute(pingSender);
            threadPool.schedule(TimeValue.timeValueMillis(scheduleDuration.millis() / 3), ThreadPool.Names.GENERIC, pingSender);
            threadPool.schedule(TimeValue.timeValueMillis(scheduleDuration.millis() / 3 * 2), ThreadPool.Names.GENERIC, pingSender);
            threadPool.schedule(scheduleDuration, ThreadPool.Names.GENERIC, new AbstractRunnable() {
                @Override
                protected void doRun() throws Exception {
                    finishPingingRound(pingingRound);
                }
    
                @Override
                public void onFailure(Exception e) {
                    logger.warn("unexpected error while finishing pinging round", e);
                }
            });
    

    여기서 주의할 점은 핑의 연결이 다른 것처럼transport 서비스로 긴 연결을 유지하는 것이 아니라 바로 건설하고 판매하는 연결이라는 것이다.마지막으로finishPingRound는 이 임시 연결을 삭제합니다.

    마스터 선출

    findMaster()로 돌아가면 위의 핑이 끝난 후에 우리는 하나씩pingResponses을 받았습니다. 여기에 필터 작업이 있습니다. 만약에 discovery.zen.master_election.ignore_non_master_pings을 사용하면 그 노드를 사용할 것입니다.master = false 노드는 무시됩니다.
           // filter responses
            final List pingResponses = filterPingResponses(fullPingResponses, masterElectionIgnoreNonMasters, logger);
    

    이어 이들pingResponse에서 다른 노드의 현재 마스터 노드가 누구인지 수집해 최종activeMasters의 후보 명단을 받고 자신을 제거해야 한다. 디스커버리의 전략적 시비는 마지막 순간까지 자신을 마스터로 선택하지 않고 뇌분열이 처음부터 발생하는 것을 예방할 수 있다.
           if (activeMasters.isEmpty()) {
                if (electMaster.hasEnoughCandidates(masterCandidates)) {
                    final ElectMasterService.MasterCandidate winner = electMaster.electMaster(masterCandidates);
                    logger.trace("candidate {} won election", winner);
                    return winner.getNode();
                } else {
                    // if we don't have enough master nodes, we bail, because there are not enough master to elect from
                    logger.warn("not enough master nodes discovered during pinging (found [{}], but needed [{}]), pinging again",
                                masterCandidates, electMaster.minimumMasterNodes());
                    return null;
                }
            } else {
                assert !activeMasters.contains(localNode) : "local node should never be elected as master when other nodes indicate an active master";
                // lets tie break between discovered nodes
                return electMaster.tieBreakActiveMasters(activeMasters);
            }
    

    이어서 이 후보 목록에 대해 가장 이상적인 것은 목록이 1이라는 것이다. 이것은 당신이 현재 건강한 집단에 가입하고 있다는 것을 증명한다. 만약에 여러 개가 있다면 (정상적인 상황에서 여러 개가 없을 것이다. 만약 당신이 그것discovery.zen.minimum_master_nodes을 설정하지 않아서 많은 분치자군을 초래하지 않았다면) 목록에서 간단하게 id 번호가 가장 작은 것을 선택하면 (혼란스럽지 않다는 뜻) 된다.만약 목록이 비어 있다면 모두가 막 시작했을 때 선거 부분에 들어가서 선거 부분은 그 id가 가장 작은 것을 뽑는 것이다.이제 이거masterNode는 정해졌어요. 만약에 이 마스터가 다른 사람이라면 간단하게 Join 요청을 보내주면 돼요. 만약에 선택한 마스터가 당신 자신이라면 또 중요한 일이 있어요. 그 discovery.zen.minimum_master_nodes 파라미터를 기억하세요? 보통 이 값을 당신의 집단의cluster 노드 수의 절반+1로 맞추어 뇌분열을 예방해야 해요. 현재 당신이 마스터를 선출하면그러면 너는 minimumMasterNodes() - 1 이렇게 많은 사람들이 와서 네가 마스터라는 것을 인정하고 기다려야 한다. 그러면 너야말로 진정한 마스터이고 선거가 끝난다.
           if (transportService.getLocalNode().equals(masterNode)) {
                final int requiredJoins = Math.max(0, electMaster.minimumMasterNodes() - 1); // we count as one
                logger.debug("elected as master, waiting for incoming joins ([{}] needed)", requiredJoins);
                nodeJoinController.waitToBeElectedAsMaster(requiredJoins, masterElectionWaitForJoinsTimeout,
                        new NodeJoinController.ElectionCallback() {
                            @Override
                            public void onElectedAsMaster(ClusterState state) {
                                synchronized (stateMutex) {
                                    joinThreadControl.markThreadAsDone(currentThread);
                                }
                            }
    
                            @Override
                            public void onFailure(Throwable t) {
                                logger.trace("failed while waiting for nodes to join, rejoining", t);
                                synchronized (stateMutex) {
                                    joinThreadControl.markThreadAsDoneAndStartNew(currentThread);
                                }
                            }
                        }
    
                );
            } else {
    

    여기에도 Join의 시간 초과 설정을 기다리는 것이 있습니다. 시간 초과 후 수량의 Join 요청을 충족시키지 못하면 선거에 실패하고 새로운 선거가 필요합니다. Join을 수신하는 세부 사항을 보내면 더 이상 지나지 않습니다.선거 절차가 끝나면 clusterState 동기화가 시작됩니다.

    MasterFaultDetection 및 NodeFaultDetection


    선거 절차가 끝난 후 두 개의 중요한 작은 task가 작업을 시작했다. 각각masterFaultDetectionNodeFaultDetection이다. 이 두 개의 task는 매우 간단하다. 하나의master를 보면 유일하게 다른 것은 node의 안에 저장된 것은cluster 안의 모든 nodes이다.
                 if (masterToPing.equals(MasterFaultDetection.this.masterNode())) {
                                    // we don't stop on disconnection from master, we keep pinging it
                                    threadPool.schedule(pingInterval, ThreadPool.Names.SAME, MasterPinger.this);
                                }
    
    findMaster()안과 다른 것은 여기는 더 이상temp로 연결하지 않고threadPool 안의 긴 연결입니다. 여기는 오류를 분류합니다. 만약에 일부 업무 오류라면 시도 횟수의 제한을 받지 않습니다. 예를 들어 요청한 노드는 마스터 노드가 아니며 요청한 마스터는 자신의 cluster 등이 아닙니다. 직접 호출notifyMasterFailure합니다. 일반적인 오류라면 시도 횟수를 기록합니다. 오류 횟수가 한도값을 초과하면리셋 notifyMasterFailure 을 호출합니다.
    private void handleMasterGone(final DiscoveryNode masterNode, final Throwable cause, final String reason) {
            if (lifecycleState() != Lifecycle.State.STARTED) {
                // not started, ignore a master failure
                return;
            }
            if (localNodeMaster()) {
                // we might get this on both a master telling us shutting down, and then the disconnect failure
                return;
            }
    
            logger.info((Supplier>) () -> new ParameterizedMessage("master_left [{}], reason [{}]", masterNode, reason), cause);
    
            synchronized (stateMutex) {
                if (localNodeMaster() == false && masterNode.equals(committedState.get().nodes().getMasterNode())) {
                    // flush any pending cluster states from old master, so it will not be set as master again
                    pendingStatesQueue.failAllStatesAndClear(new ElasticsearchException("master left [{}]", reason));
                    rejoin("master left (reason = " + reason + ")");
                }
            }
        }
    

    ZenDiscovery의 콜백 방법은 최종적으로 다시 rejoin() 프로세스에 들어갑니다.

    Cluster 상태 업데이트


    자, 집단도 구축되었고,master도 선택되었고,정시ping도 보장되었고,나머지는 마지막으로master가 어떻게clusterState를 모든 노드로 전송하는지입니다.노드에 있었던 기억이 나요.java에서 ZenDiscovery를 초기화할 때 clusterState의 발표 방법을 등록했습니다.
    clusterService.getMasterService().setClusterStatePublisher(discovery::publish);
    

    publish의 핵심 코드는
            pendingStatesQueue.addPending(newState);
    
            try {
                publishClusterState.publish(clusterChangedEvent, electMaster.minimumMasterNodes(), ackListener);
            } catch (FailedToCommitClusterStateException t) {
                // cluster service logs a WARN message
                logger.debug("failed to publish cluster state version [{}] (not enough nodes acknowledged, min master nodes [{}])",
                    newState.version(), electMaster.minimumMasterNodes());
    
                synchronized (stateMutex) {
                    pendingStatesQueue.failAllStatesAndClear(
                        new ElasticsearchException("failed to publish cluster state"));
    
                    rejoin("zen-disco-failed-to-publish");
                }
                throw t;
            }
    
            final DiscoveryNode localNode = newState.getNodes().getLocalNode();
            final CountDownLatch latch = new CountDownLatch(1);
            final AtomicBoolean processedOrFailed = new AtomicBoolean();
            pendingStatesQueue.markAsCommitted(newState.stateUUID(),
                new PendingClusterStatesQueue.StateProcessedListener() {
                    @Override
                    public void onNewClusterStateProcessed() {
                        processedOrFailed.set(true);
                        latch.countDown();
                        ackListener.onNodeAck(localNode, null);
                    }
    
                    @Override
                    public void onNewClusterStateFailed(Exception e) {
                        processedOrFailed.set(true);
                        latch.countDown();
                        ackListener.onNodeAck(localNode, e);
                        logger.warn(
                            (org.apache.logging.log4j.util.Supplier>) () -> new ParameterizedMessage(
                                "failed while applying cluster state locally [{}]",
                                clusterChangedEvent.source()),
                            e);
                    }
                });
    
    pendingStatesQueue는 제출할state를 저장하고 최신commit의state를 다른 요청에 제공합니다.발표clusterChangedEventPublishClusterStateAction의 주요 논리적 방법innerPublish에 맡겼다
    private void innerPublish(final ClusterChangedEvent clusterChangedEvent, final Set nodesToPublishTo,
                                  final SendingController sendingController, final boolean sendFullVersion,
                                  final Map serializedStates, final Map serializedDiffs) {
    
            final ClusterState clusterState = clusterChangedEvent.state();
            final ClusterState previousState = clusterChangedEvent.previousState();
            final TimeValue publishTimeout = discoverySettings.getPublishTimeout();
    
            final long publishingStartInNanos = System.nanoTime();
    
            for (final DiscoveryNode node : nodesToPublishTo) {
                // try and serialize the cluster state once (or per version), so we don't serialize it
                // per node when we send it over the wire, compress it while we are at it...
                // we don't send full version if node didn't exist in the previous version of cluster state
                if (sendFullVersion || !previousState.nodes().nodeExists(node)) {
                    sendFullClusterState(clusterState, serializedStates, node, publishTimeout, sendingController);
                } else {
                    sendClusterStateDiff(clusterState, serializedDiffs, serializedStates, node, publishTimeout, sendingController);
                }
            }
    
            sendingController.waitForCommit(discoverySettings.getCommitTimeout());
    
            try {
                long timeLeftInNanos = Math.max(0, publishTimeout.nanos() - (System.nanoTime() - publishingStartInNanos));
                final BlockingClusterStatePublishResponseHandler publishResponseHandler = sendingController.getPublishResponseHandler();
                sendingController.setPublishingTimedOut(!publishResponseHandler.awaitAllNodes(TimeValue.timeValueNanos(timeLeftInNanos)));
                if (sendingController.getPublishingTimedOut()) {
                    DiscoveryNode[] pendingNodes = publishResponseHandler.pendingNodes();
                    // everyone may have just responded
                    if (pendingNodes.length > 0) {
                        logger.warn("timed out waiting for all nodes to process published state [{}] (timeout [{}], pending nodes: {})",
                            clusterState.version(), publishTimeout, pendingNodes);
                    }
                }
            } catch (InterruptedException e) {
                // ignore & restore interrupt
                Thread.currentThread().interrupt();
            }
        }
    

    ES2에서x 이후에 가까운 버전의 diff를 보내서 상태를 동기화하는 것을 지원합니다. 네트워크 대역폭을 절약하기 위해 ClusterState 클래스에 누르면 안에 있는 상태 정보량이 적지 않은 것을 발견할 수 있습니다. 그러나 diff는 당신의 버전과 현재의 최신 버전이 한 버전만 차이가 납니다. 만약에 1에서 3으로 넘어가려면 full 상태를 보내야 합니다.sendFullClusterStatesendClusterStateDiff는 모두 하부transport 서비스를 호출하여 진정으로 상태를 전송하고 상태 기록은 하나sendingController를 통해 유지보수한다. ack를 받지 못하거나timeout은 controller로 하여금 minMasterNodes-1에 도달했는지 확인하게 하고, 도달하면 이번 상태는commited를 전송하고 나머지 상황은 모두 틀리게 한다.
    여기서 반드시 주의해야 할 것은 Publish 상태는 두 단계로 나뉘는데 우선sendNotification이다
     private void sendClusterStateToNode(final ClusterState clusterState, BytesReference bytes,
                                            final DiscoveryNode node,
                                            final TimeValue publishTimeout,
                                            final SendingController sendingController,
                                            final boolean sendDiffs, final Map serializedStates) {
            try {
    
                // -> no need to put a timeout on the options here, because we want the response to eventually be received
                //  and not log an error if it arrives after the timeout
                // -> no need to compress, we already compressed the bytes
                TransportRequestOptions options = TransportRequestOptions.builder()
                    .withType(TransportRequestOptions.Type.STATE).withCompress(false).build();
                transportService.sendRequest(node, SEND_ACTION_NAME,
                        new BytesTransportRequest(bytes, node.getVersion()),
                        options,
                        new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
    
                            @Override
                            public void handleResponse(TransportResponse.Empty response) {
                                if (sendingController.getPublishingTimedOut()) {
                                    logger.debug("node {} responded for cluster state [{}] (took longer than [{}])", node,
                                        clusterState.version(), publishTimeout);
                                }
                                sendingController.onNodeSendAck(node);
                            }
    

    바로master가 먼저 모든 노드에게 이 상태를 보내고minMasterNodes가 이 알림을 확인하면master 노드가 이 상태mark를commited로 만들고sendCommitToNode() 모든 노드에게commited이 상태를 알려준다.
    public synchronized void onNodeSendAck(DiscoveryNode node) {
                if (committed) {
                    assert sendAckedBeforeCommit.isEmpty();
                    sendCommitToNode(node, clusterState, this);
                } else if (committedOrFailed()) {
                    logger.trace("ignoring ack from [{}] for cluster state version [{}]. already failed", node, clusterState.version());
                } else {
                    // we're still waiting
                    sendAckedBeforeCommit.add(node);
                    if (node.isMasterNode()) {
                        checkForCommitOrFailIfNoPending(node);
                    }
                }
            }
    

    다른 Node에서 이 두 가지 소식을 처리하는 Handler도 이 종류에 있습니다. 관심 있는 것은 읽어볼 수 있습니다. 여기는 불과합니다.
     transportService.registerRequestHandler(SEND_ACTION_NAME, BytesTransportRequest::new, ThreadPool.Names.SAME, false, false,
                new SendClusterStateRequestHandler());
            transportService.registerRequestHandler(COMMIT_ACTION_NAME, CommitClusterStateRequest::new, ThreadPool.Names.SAME, false, false,
                new CommitClusterStateRequestHandler());
    

    이로써 Elasticsearch의 전체 발견 프로토콜과 상태 업데이트 절차는 끝났다.문제가 있으면 교류를 환영합니다.

    좋은 웹페이지 즐겨찾기