03-springboot 통합elasticsearch-원본 초식

10656 단어
앞의 두 소절은springboot이es를 어떻게 통합시키는지, 그리고es의 간단한 사용을 알고 있지만springboot에서es서버와 어떻게 상호작용하는지 알고 있습니다.우리는 간단하게 이해할 수 있다.원본을 보려면 원본을 보는 동시에springboot이 ES 서버를 요청하는 원리에 대해 알아야 한다. ES 홈페이지(https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-docs-update.html)에 대한 상세한 설명이 있어 스스로 이해할 수 있다.

1. RestClient es 상호 작용 기반 서버


단기 es의 경우 일반적으로 ElasticsearchOperations를 사용합니다


1.1 데이터 저장의 구체적인 과정


본질적으로 ElasticsearchRestTemplate를 사용하여 작업합니다.데이터 스토리지 시작
1. 
String documentId = operations.index(indexQuery,indexCoordinates);

2.index 
@Override
	public String index(IndexQuery query, IndexCoordinates index) {

		maybeCallbackBeforeConvertWithQuery(query, index);// 

		IndexRequest request = requestFactory.indexRequest(query, index);//== indexRequest ==
		String documentId = execute(client -> client.index(request, RequestOptions.DEFAULT).getId());//== ES ==

		// We should call this because we are not going through a mapper.
		Object queryObject = query.getObject();
		if (queryObject != null) {
			setPersistentEntityId(queryObject, documentId);
		}

		maybeCallbackAfterSaveWithQuery(query, index);// 

		return documentId;
	}


다음과 같이 IndexRequest 작성 과정을 볼 수 있습니다.
	public IndexRequest indexRequest(IndexQuery query, IndexCoordinates index) {
		String indexName = index.getIndexName();

		IndexRequest indexRequest;

		if (query.getObject() != null) {
			String id = StringUtils.isEmpty(query.getId()) ? getPersistentEntityId(query.getObject()) : query.getId();
			// If we have a query id and a document id, do not ask ES to generate one.
			if (id != null) {
				indexRequest = new IndexRequest(indexName).id(id);
			} else {
				indexRequest = new IndexRequest(indexName);
			}
                        /**
                         * 1. object Map, json 
                         * 2. Object json BytesReference, ContentType Request.JSON 
                         */     
			indexRequest.source(elasticsearchConverter.mapObject(query.getObject()).toJson(), Requests.INDEX_CONTENT_TYPE);
		} 
                //  。。。。。
		return indexRequest;
	}

요청 데이터 처리 프로세스
public final IndexResponse index(IndexRequest indexRequest, RequestOptions options) throws IOException {
        return performRequestAndParseEntity(indexRequest, RequestConverters::index, options, IndexResponse::fromXContent, emptySet());
    }

1.RequestConverters::index  Request 

static Request index(IndexRequest indexRequest) {
        String method = Strings.hasLength(indexRequest.id()) ? HttpPut.METHOD_NAME : HttpPost.METHOD_NAME; // ID PUT POST
        boolean isCreate = (indexRequest.opType() == DocWriteRequest.OpType.CREATE);
        // uri
        String endpoint = endpoint(indexRequest.index(), indexRequest.type(), indexRequest.id(), isCreate ? "_create" : null);
        Request request = new Request(method, endpoint);
        // request     
        Params parameters = new Params(request);
        parameters.withRouting(indexRequest.routing());
        parameters.withParent(indexRequest.parent());
        parameters.withTimeout(indexRequest.timeout());
        parameters.withVersion(indexRequest.version());
        parameters.withVersionType(indexRequest.versionType());
        parameters.withIfSeqNo(indexRequest.ifSeqNo());
        parameters.withIfPrimaryTerm(indexRequest.ifPrimaryTerm());
        parameters.withPipeline(indexRequest.getPipeline());
        parameters.withRefreshPolicy(indexRequest.getRefreshPolicy());
        parameters.withWaitForActiveShards(indexRequest.waitForActiveShards(), ActiveShardCount.DEFAULT);
        // byte[]    
        BytesRef source = indexRequest.source().toBytesRef();
        ContentType contentType = createContentType(indexRequest.getContentType());
        request.setEntity(new NByteArrayEntity(source.bytes, source.offset, source.length, contentType));
        return request;
    }

2. response

3.  : new EmptySet<>();

4.  ES 
 
  private  Resp internalPerformRequest(Req request,
                                            CheckedFunction requestConverter,
                                            RequestOptions options,
                                            CheckedFunction responseConverter,
                                            Set ignores) throws IOException {
        Request req = requestConverter.apply(request);
        req.setOptions(options);
        Response response;
        try {
            response = client.performRequest(req);
        } catch (ResponseException e) {
            if (ignores.contains(e.getResponse().getStatusLine().getStatusCode())) {
                try {
                    return responseConverter.apply(e.getResponse());
                } catch (Exception innerException) {
                    throw parseResponseException(e);
                }
            }
            throw parseResponseException(e);
        }

        try {
            return responseConverter.apply(response);
        } catch(Exception e) {
            throw new IOException("Unable to parse response body for " + response, e);
        }
    }

RestClien 요청 전송 절차
1. 
 public Response performRequest(Request request) throws IOException {
        SyncResponseListener listener = new SyncResponseListener(maxRetryTimeoutMillis);
        performRequestAsyncNoCatch(request, listener);
        return listener.get();
    }
  // url, request 
 void performRequestAsyncNoCatch(Request request, ResponseListener listener) throws IOException {
        Map requestParams = new HashMap<>(request.getParameters());
        String ignoreString = requestParams.remove("ignore");
        Set ignoreErrorCodes;
        if (ignoreString == null) {
            if (HttpHead.METHOD_NAME.equals(request.getMethod())) {
                //404 never causes error if returned for a HEAD request
                ignoreErrorCodes = Collections.singleton(404);
            } else {
                ignoreErrorCodes = Collections.emptySet();
            }
        } else {
            String[] ignoresArray = ignoreString.split(",");
            ignoreErrorCodes = new HashSet<>();
            if (HttpHead.METHOD_NAME.equals(request.getMethod())) {
                //404 never causes error if returned for a HEAD request
                ignoreErrorCodes.add(404);
            }
            for (String ignoreCode : ignoresArray) {
                try {
                    ignoreErrorCodes.add(Integer.valueOf(ignoreCode));
                } catch (NumberFormatException e) {
                    throw new IllegalArgumentException("ignore value should be a number, found [" + ignoreString + "] instead", e);
                }
            }
        }
        URI uri = buildUri(pathPrefix, request.getEndpoint(), requestParams);// url 
        HttpRequestBase httpRequest = createHttpRequest(request.getMethod(), uri, request.getEntity());// request 
        setHeaders(httpRequest, request.getOptions().getHeaders());
        FailureTrackingResponseListener failureTrackingResponseListener = new FailureTrackingResponseListener(listener);
        long startTime = System.nanoTime();
        performRequestAsync(startTime, nextNode(), httpRequest, ignoreErrorCodes,
                request.getOptions().getWarningsHandler() == null ? warningsHandler : request.getOptions().getWarningsHandler(),
                request.getOptions().getHttpAsyncResponseConsumerFactory(), failureTrackingResponseListener);// 
    }

  // , response 
 private void performRequestAsync(final long startTime, final NodeTuple> nodeTuple, final HttpRequestBase request,
                                     final Set ignoreErrorCodes,
                                     final WarningsHandler thisWarningsHandler,
                                     final HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory,
                                     final FailureTrackingResponseListener listener) {
        final Node node = nodeTuple.nodes.next(); // 
        final HttpAsyncRequestProducer requestProducer = HttpAsyncMethods.create(node.getHost(), request);
        final HttpAsyncResponseConsumer asyncResponseConsumer =
            httpAsyncResponseConsumerFactory.createHttpAsyncResponseConsumer();
        final HttpClientContext context = HttpClientContext.create();
        context.setAuthCache(nodeTuple.authCache);
        client.execute(requestProducer, asyncResponseConsumer, context, new FutureCallback() {
            @Override
            public void completed(HttpResponse httpResponse) { // 
                try {
                    RequestLogger.logResponse(logger, request, node.getHost(), httpResponse);
                    int statusCode = httpResponse.getStatusLine().getStatusCode();
                    Response response = new Response(request.getRequestLine(), node.getHost(), httpResponse);
                    if (isSuccessfulResponse(statusCode) || ignoreErrorCodes.contains(response.getStatusLine().getStatusCode())) {
                        onResponse(node);
                        if (thisWarningsHandler.warningsShouldFailRequest(response.getWarnings())) {
                            listener.onDefinitiveFailure(new WarningFailureException(response));
                        } else {
                            listener.onSuccess(response);
                        }
                    } else {
                        ResponseException responseException = new ResponseException(response);
                        if (isRetryStatus(statusCode)) {
                            //mark host dead and retry against next one
                            onFailure(node);
                            retryIfPossible(responseException);
                        } else {
                            //mark host alive and don't retry, as the error should be a request problem
                            onResponse(node);
                            listener.onDefinitiveFailure(responseException);
                        }
                    }
                } catch(Exception e) {
                    listener.onDefinitiveFailure(e);
                }
            }
        });
    }



좋은 웹페이지 즐겨찾기