Elasticsearch 쓰기 프로세스 소스 분석 (3)

39824 단어 Elasticsearch
이어서 상기 원본 코드 분석(二), 분편이 있는 노드에 색인 작업을 실행하도록 진정으로 요청해야 한다.TransportReplicationAction 클래스의 구성 방법에 주 분할 쓰기 및 사본 분할 쓰기 관련 방법을 등록했습니다.
 		//  handler
        transportService.registerRequestHandler(transportPrimaryAction, executor, forceExecutionOnPrimary, true,
            in -> new ConcreteShardRequest<>(requestReader, in), this::handlePrimaryRequest);

        //  handler
        // we must never reject on because of thread pool capacity on replicas
        transportService.registerRequestHandler(transportReplicaAction, executor, true, true,
            in -> new ConcreteReplicaRequest<>(replicaRequestReader, in), this::handleReplicaRequest);

그래서 메인 복사본을 실행하는 방법은 org입니다.elasticsearch.action.support.replication.TransportReplicationAction#handlePrimaryRequest.마지막으로 org.elasticsearch.action.support.replication.TransportReplicationAction.AsyncPrimary Action #doRun 이 안에서 메인 필름의 쓰기 방법을 실행하고 많은 방법을 걸어서 쓰기의 최종 방법 org에 도달합니다.elasticsearch.index.engine.Internal Engine #index, 구체적인 경로는 여기서 말하지 말고 이 index 방법을 보십시오.
//  
                final IndexingStrategy plan = indexingStrategyForOperation(index);

                final IndexResult indexResult;
                if (plan.earlyResultOnPreFlightError.isPresent()) {
                    indexResult = plan.earlyResultOnPreFlightError.get();
                    assert indexResult.getResultType() == Result.Type.FAILURE : indexResult.getResultType();
                } else {
                    // generate or register sequence number
                    if (index.origin() == Operation.Origin.PRIMARY) {
                        index = new Index(index.uid(), index.parsedDoc(), generateSeqNoForOperationOnPrimary(index), index.primaryTerm(),
                            index.version(), index.versionType(), index.origin(), index.startTime(), index.getAutoGeneratedIdTimestamp(),
                            index.isRetry(), index.getIfSeqNo(), index.getIfPrimaryTerm());

                        final boolean toAppend = plan.indexIntoLucene && plan.useLuceneUpdateDocument == false;
                        if (toAppend == false) {
                            advanceMaxSeqNoOfUpdatesOrDeletesOnPrimary(index.seqNo());
                        }
                    } else {
                        markSeqNoAsSeen(index.seqNo());
                    }

                    assert index.seqNo() >= 0 : "ops should have an assigned seq no.; origin: " + index.origin();

                    if (plan.indexIntoLucene || plan.addStaleOpToLucene) {
                        //  lucene
                        indexResult = indexIntoLucene(index, plan);
                    } else {
                        indexResult = new IndexResult(
                            plan.versionForIndexing, getPrimaryTerm(), index.seqNo(), plan.currentNotFoundOrDeleted);
                    }
                }
                //  translog
                if (index.origin().isFromTranslog() == false) {
                    final Translog.Location location;
                    if (indexResult.getResultType() == Result.Type.SUCCESS) {
                        // Lucene translog
                        location = translog.add(new Translog.Index(index, indexResult));
                    } else if (indexResult.getSeqNo() != SequenceNumbers.UNASSIGNED_SEQ_NO) {
                        // if we have document failure, record it as a no-op in the translog and Lucene with the generated seq_no
                        //  , no-op translog , Lucene seq_no
                        final NoOp noOp = new NoOp(indexResult.getSeqNo(), index.primaryTerm(), index.origin(),
                            index.startTime(), indexResult.getFailure().toString());
                        location = innerNoOp(noOp).getTranslogLocation();
                    } else {
                        location = null;
                    }
                    //  translog 
                    indexResult.setTranslogLocation(location);
                }
                if (plan.indexIntoLucene && indexResult.getResultType() == Result.Type.SUCCESS) {
                    final Translog.Location translogLocation = trackTranslogLocation.get() ? indexResult.getTranslogLocation() : null;
                    versionMap.maybePutIndexUnderLock(index.uid().bytes(),
                        new IndexVersionValue(translogLocation, plan.versionForIndexing, index.seqNo(), index.primaryTerm()));
                }
                localCheckpointTracker.markSeqNoAsCompleted(indexResult.getSeqNo());
                indexResult.setTook(System.nanoTime() - index.startTime());
                indexResult.freeze();
                return indexResult;

이곳의 메인 블록을 다 쓰면 사본을 써서 블록을 나누어야 하는데,
performOnReplicas(replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes, replicationGroup);

 private void performOnReplicas(final ReplicaRequest replicaRequest, final long globalCheckpoint,
                                   final long maxSeqNoOfUpdatesOrDeletes, final ReplicationGroup replicationGroup) {
        // for total stats, add number of unassigned shards and
        // number of initializing shards that are not ready yet to receive operations (recovery has not opened engine yet on the target)
        totalShards.addAndGet(replicationGroup.getSkippedShards().size());

        final ShardRouting primaryRouting = primary.routingEntry();

        for (final ShardRouting shard : replicationGroup.getReplicationTargets()) {
            if (shard.isSameAllocation(primaryRouting) == false) {
            	//  
                performOnReplica(shard, replicaRequest, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes);
            }
        }
    }

private void performOnReplica(final ShardRouting shard, final ReplicaRequest replicaRequest,
                                  final long globalCheckpoint, final long maxSeqNoOfUpdatesOrDeletes) {
        if (logger.isTraceEnabled()) {
            logger.trace("[{}] sending op [{}] to replica {} for request [{}]", shard.shardId(), opType, shard, replicaRequest);
        }

        totalShards.incrementAndGet();
        pendingActions.incrementAndGet();
        replicasProxy.performOn(shard, replicaRequest, primaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes,
            new ActionListener<ReplicaResponse>() {
                @Override
                public void onResponse(ReplicaResponse response) {
                    successfulShards.incrementAndGet();
                    try {
                        primary.updateLocalCheckpointForShard(shard.allocationId().getId(), response.localCheckpoint());
                        primary.updateGlobalCheckpointForShard(shard.allocationId().getId(), response.globalCheckpoint());
                    } catch (final AlreadyClosedException e) {
                        // the index was deleted or this shard was never activated after a relocation; fall through and finish normally
                    } catch (final Exception e) {
                        // fail the primary but fall through and let the rest of operation processing complete
                        final String message = String.format(Locale.ROOT, "primary failed updating local checkpoint for replica %s", shard);
                        primary.failShard(message, e);
                    }
                    decPendingAndFinishIfNeeded();
                }

                @Override
                public void onFailure(Exception replicaException) {
                    logger.trace(() -> new ParameterizedMessage(
                        "[{}] failure while performing [{}] on replica {}, request [{}]",
                        shard.shardId(), opType, shard, replicaRequest), replicaException);
                    // Only report "critical" exceptions - TODO: Reach out to the master node to get the latest shard state then report.
                    if (TransportActions.isShardNotAvailableException(replicaException) == false) {
                        RestStatus restStatus = ExceptionsHelper.status(replicaException);
                        shardReplicaFailures.add(new ReplicationResponse.ShardInfo.Failure(
                            shard.shardId(), shard.currentNodeId(), replicaException, restStatus, false));
                    }
                    String message = String.format(Locale.ROOT, "failed to perform %s on replica %s", opType, shard);
                    replicasProxy.failShardIfNeeded(shard, primaryTerm, message, replicaException,
                        ActionListener.wrap(r -> decPendingAndFinishIfNeeded(), ReplicationOperation.this::onNoLongerPrimary));
                }

                @Override
                public String toString() {
                    return "[" + replicaRequest + "][" + shard + "]";
                }
            });
    }

복사본 쓰기가 실패하면 마스터에게 조각 나누기 실패 요청을 보내고 이 조각을 걷어차버릴 것입니다.여기에 자세히 쓰지 않아서 코드가 비교적 번거로우니 나중에 시간이 있으면 다시 상세하게 업데이트합시다.

좋은 웹페이지 즐겨찾기