ES5.6 Bulk 소스 분석

12037 단어
Bulk 등록
클래스 BootStrap을 시작하는 start () 방법에서 node가 시작되었습니다.start () 방법.노드를 초기화하는 동안 ActionModel이 포함된 일련의 모듈과 플러그인이 로드되었습니다.
 ActionModule actionModule = new ActionModule(false, settings, clusterModule.getIndexNameExpressionResolver(),
                settingsModule.getIndexScopedSettings(), settingsModule.getClusterSettings(), settingsModule.getSettingsFilter(),
                threadPool, pluginsService.filterPlugins(ActionPlugin.class), client, circuitBreakerService);
 modules.add(actionModule);

ActionModel에는 이번에 확인한 BulkAction과 같이 자주 사용하는 작업 액션이 등록되어 있습니다.
  actions.register(UpdateAction.INSTANCE, TransportUpdateAction.class);
  actions.register(MultiGetAction.INSTANCE, TransportMultiGetAction.class,TransportShardMultiGetAction.class);
  actions.register(BulkAction.INSTANCE, TransportBulkAction.class,TransportShardBulkAction.class);

RestHandler를 초기화합니다.
 registerHandler.accept(new RestMultiTermVectorsAction(settings, restController));
 registerHandler.accept(new RestBulkAction(settings, restController));
 registerHandler.accept(new RestUpdateAction(settings, restController));

RestBulkAction 에 정의된 질의 방법:
  controller.registerHandler(POST, "/_bulk", this);
  controller.registerHandler(PUT, "/_bulk", this);
  controller.registerHandler(POST, "/{index}/_bulk", this);
  controller.registerHandler(PUT, "/{index}/_bulk", this);
  controller.registerHandler(POST, "/{index}/{type}/_bulk", this);
  controller.registerHandler(PUT, "/{index}/{type}/_bulk", this);

요청 수신
RestBulkAction은prepareRequest 방법에서 우리의 일반적인 RestRequest를BulkReqest로 전환하고 NodeClient를 통해 호출합니다.
 channel -> client.bulk(bulkRequest, new RestStatusToXContentListener<>(channel));

NodeClient의 bulk에서는 NodeClient의 DoExecute () 방법이 호출됩니다.
doExecute(Action action, Request request, ActionListener listener)

들어오는 Action은 BulkAction입니다.Instance, Request는 이전에 봉인된 Bulk Request이고,listener는 감청기입니다.
DoExecute 방법에서 먼저 일반적인 action을tansportAction으로 전환한 다음에 전환된tansportAction으로 이 요청을 실행합니다.
transportAction(action).execute(request, listener);

bulkAction이 전환되면 TransportBulkAction으로 바뀌고, TransportBulkAction의 excute 방법은 그 자체의doExecute() 방법을 호출한다.DoExecut() 방법에서 먼저 존재하거나 존재하지 않는 색인을 분류합니다.
1)Step 1: collect all the indices in the request
2)Step 2: filter that to indices that don't exist and we can create. At the same time build a map of indices we can't create that we'll use when we try to run the requests.
3)Step 3: create all the indices that are missing, if there are any missing. start the bulk after all the creates come back.

그런 다음 excuteBulk () 방법을 실행하고 excuteBulk에 BulkOperation을 만들고 BulkOperation을 시작합니다.
void executeBulk(Task task, final BulkRequest bulkRequest, final long startTimeNanos, final ActionListener listener,
        final AtomicArray responses, Map indicesThatCannotBeCreated) {
    new BulkOperation(task, bulkRequest, listener, responses, startTimeNanos, indicesThatCannotBeCreated).run();
}

BulkOperation에서는 Bulk에 있는 모든 요청을 두 번 반복합니다. 첫 번째 반복은 Routing, Mapping 등을 설정하고 ID를 생성할 수 있으면 자동으로 ID를 생성합니다.두 번째 스트리밍은shardID에 따라 분류를 요청합니다.ES 홈페이지에 대량 처리를 할 때 bulk를 사용하라고 했는데 bulk가 요청을 처리할 때 밑바닥의 최적화를 했기 때문이다.이것은 하나의 최적화점으로 같은shard의 요청을 한데 모아 노드에 대응하는shard로 직접 보내서 요청이 노드 간에 전달되고 효율에 영향을 주지 않도록 한다.
for (int i = 0; i < bulkRequest.requests.size(); i++) {
    ....
    switch (docWriteRequest.opType()) {
                    case CREATE:
                    case INDEX:
                        IndexRequest indexRequest = (IndexRequest) docWriteRequest;
                        MappingMetaData mappingMd = null;
                        final IndexMetaData indexMetaData = metaData.index(concreteIndex);
                        if (indexMetaData != null) {
                            mappingMd = indexMetaData.mappingOrDefault(indexRequest.type());
                        }
                        indexRequest.resolveRouting(metaData);
                        indexRequest.process(mappingMd, allowIdGeneration, concreteIndex.getName());
            ....
    }
....
}

....
for (int i = 0; i < bulkRequest.requests.size(); i++) {
            DocWriteRequest request = bulkRequest.requests.get(i);
            if (request == null) {
                continue;
             }
            String concreteIndex = concreteIndices.getConcreteIndex(request.index()).getName();
            ShardId shardId = clusterService.operationRouting().indexShards(clusterState, concreteIndex, request.id(), request.routing()).shardId();
            List shardRequests = requestsByShard.computeIfAbsent(shardId, shard -> new ArrayList<>());
            shardRequests.add(new BulkItemRequest(i, request));
         }

그리고 각기 다른shardRequest에 대해shardBulkAction으로 처리합니다.
shardBulkAction.execute(bulkShardRequest, new ActionListener() {}

각shard의 처리 프로세스
다음은 복잡한 계승 관계다.
TransportShardBulkAction>TransportWriteAction >TransportReplicationAction>TransportAction

이전 단계의shardBulkAction.excute () 방법은 TransportAction을 실행하는 excute 방법입니다.내가 본 원본 버전은 5.6 버전이다. 5.0 버전에 비해 ES는 TransportWriteAction 클래스를 추가했고 TransportReplicationAction에서run 방법을 직접 실행하는 것이 아니라transport Service의 RPC 인터페이스를 통해 기능을 실현했다.구체적인 절차는 다음과 같다.
1)TransportAction.execute () 방법은 TransportReplicationAction의doExecute () 방법을 호출합니다
2) TransportReplicationAction의doExecute() 방법에서 Reroute Phase의run 방법을 실행하고,run 방법에서 요청한shardID에 따라primary shardID를 얻고,primary shard의NodeID를 받습니다. 현재 노드에 primary shard가 포함되어 있으면performLocal Action 방법을 실행하고,그렇지 않으면perform Remote Action을 실행합니다.
3)performLocalaction과performRemoteAction은 최종적으로performAction 방법을 실행할 것이다.performAction에서transport Service가 요청을 보내는 것을 볼 수 있다.
transportService.sendRequest(node, action, requestToPerform, transportOptions, new TransportResponseHandler() {}

4)transport Service가 요청을 받은 후 Primary OperationTransportHandler로 처리하고Primary OperationTransportHandler는TransportReplicationAction에 등록된 경우
transportService.registerRequestHandler(transportPrimaryAction, () -> new ConcreteShardRequest<>(request), executor,
        new PrimaryOperationTransportHandler());

5)PrimaryOperationTransportHandler는 primary 작업의 처리 클래스로 이 클래스가 정보를 받은 후에 AsyncPrimaryAction 처리를 호출합니다.
@Override
    public void messageReceived(ConcreteShardRequest request, TransportChannel channel, Task task) {
        new AsyncPrimaryAction(request.request, request.targetAllocationID, channel, (ReplicationTask) task).run();
    }

6) AsyncPrimaryAction에서 shard 자물쇠를 먼저 가져오고 자물쇠를 성공적으로 가져오면 자신의 onresponse () 방법을 호출합니다. 그렇지 않으면 가져오는 동작을 스레드 풀에 추가합니다.
            synchronized (this) {
            releasable = tryAcquire();
            if (releasable == null) {
                // blockOperations is executing, this operation will be retried by blockOperations once it finishes
                if (delayedOperations == null) {
                    delayedOperations = new ArrayList<>();
                }
                final Supplier contextSupplier = threadPool.getThreadContext().newRestorableContext(false);
                if (executorOnDelay != null) {
                    delayedOperations.add(
                        new ThreadedActionListener<>(logger, threadPool, executorOnDelay,
                            new ContextPreservingActionListener<>(contextSupplier, onAcquired), forceExecution));
                } else {
                    delayedOperations.add(new ContextPreservingActionListener<>(contextSupplier, onAcquired));
                }
                return;
            }
        }

7) onresponse에서 이 primary Shard Reference가 이동되면 올바른 primary shard와 nodeID를 가져와 다시 요청을 보냅니다.그렇지 않으면 primaryShardReference로 직접 처리합니다.
 @Override
    public void onResponse(PrimaryShardReference primaryShardReference) {
        try {
            if (primaryShardReference.isRelocated()) {
                primaryShardReference.close(); // release shard operation lock as soon as possible
                setPhase(replicationTask, "primary_delegation");
                // delegate primary phase to relocation target
                // it is safe to execute primary phase on relocation target as there are no more in-flight operations where primary
                // phase is executed on local shard and all subsequent operations are executed on relocation target as primary phase.
                final ShardRouting primary = primaryShardReference.routingEntry();
                assert primary.relocating() : "indexShard is marked as relocated but routing isn't" + primary;
                DiscoveryNode relocatingNode = clusterService.state().nodes().get(primary.relocatingNodeId());
                transportService.sendRequest(relocatingNode, transportPrimaryAction,
                    new ConcreteShardRequest<>(request, primary.allocationId().getRelocationId()),
                    transportOptions,
                    new TransportChannelResponseHandler(logger, channel, "rerouting indexing to target primary " + primary,
                        TransportReplicationAction.this::newResponseInstance) {

                        @Override
                        public void handleResponse(Response response) {
                            setPhase(replicationTask, "finished");
                            super.handleResponse(response);
                        }

                        @Override
                        public void handleException(TransportException exp) {
                            setPhase(replicationTask, "finished");
                            super.handleException(exp);
                        }
                    });
            } else {
                setPhase(replicationTask, "primary");
                final IndexMetaData indexMetaData = clusterService.state().getMetaData().index(request.shardId().getIndex());
                final boolean executeOnReplicas = (indexMetaData == null) || shouldExecuteReplication(indexMetaData);
                final ActionListener listener = createResponseListener(primaryShardReference);
                createReplicatedOperation(request,
                        ActionListener.wrap(result -> result.respond(listener), listener::onFailure),
                        primaryShardReference, executeOnReplicas)
                        .execute();
            }
        } catch (Exception e) {
            Releasables.closeWhileHandlingException(primaryShardReference); // release shard operation lock before responding to caller
            onFailure(e);
        }
    }

8)createReplicatedOperation은 이름을 보면 바로 복사본 처리인 줄 알았는데 누르고 보니 primary를 먼저 실행한 다음에 Replia를 실행한 것으로 나타났다.
 primaryResult = primary.perform(request);
    ...
 performOnReplicas(replicaRequest, shards);

주 분할 처리
주 섹션의 처리는 PrimaryShardReference를 호출합니다.perform () 방법, 이 방법에서는shardoperation OnPrimary () 를 호출하여 메인 섹션을 처리합니다.
shard Operation OnPrimary () 방법은 TransportShardBulkAction에 의해 이루어진 것으로 구체적인 절차는 다음과 같다.
1) 노드의 모든 인덱스 메타데이터 가져오기
2) 버전 번호 가져오기
3) 맵핑 업데이트
4) Engin의 기본 코드를 호출합니다.예를 들면 primary.delete(delete),primary.index(operation) 등등.
5)tanslog에 쓰기
부본 분편은 주 분편과 유사하니 여기서는 많은 설명을 하지 않겠다.

좋은 웹페이지 즐겨찾기