03-springboot 통합elasticsearch-원본 초식
1. RestClient es 상호 작용 기반 서버
단기 es의 경우 일반적으로 ElasticsearchOperations를 사용합니다
1.1 데이터 저장의 구체적인 과정
본질적으로 ElasticsearchRestTemplate를 사용하여 작업합니다.데이터 스토리지 시작
1.
String documentId = operations.index(indexQuery,indexCoordinates);
2.index
@Override
public String index(IndexQuery query, IndexCoordinates index) {
maybeCallbackBeforeConvertWithQuery(query, index);//
IndexRequest request = requestFactory.indexRequest(query, index);//== indexRequest ==
String documentId = execute(client -> client.index(request, RequestOptions.DEFAULT).getId());//== ES ==
// We should call this because we are not going through a mapper.
Object queryObject = query.getObject();
if (queryObject != null) {
setPersistentEntityId(queryObject, documentId);
}
maybeCallbackAfterSaveWithQuery(query, index);//
return documentId;
}
다음과 같이 IndexRequest 작성 과정을 볼 수 있습니다.
public IndexRequest indexRequest(IndexQuery query, IndexCoordinates index) {
String indexName = index.getIndexName();
IndexRequest indexRequest;
if (query.getObject() != null) {
String id = StringUtils.isEmpty(query.getId()) ? getPersistentEntityId(query.getObject()) : query.getId();
// If we have a query id and a document id, do not ask ES to generate one.
if (id != null) {
indexRequest = new IndexRequest(indexName).id(id);
} else {
indexRequest = new IndexRequest(indexName);
}
/**
* 1. object Map, json
* 2. Object json BytesReference, ContentType Request.JSON
*/
indexRequest.source(elasticsearchConverter.mapObject(query.getObject()).toJson(), Requests.INDEX_CONTENT_TYPE);
}
// 。。。。。
return indexRequest;
}
요청 데이터 처리 프로세스
public final IndexResponse index(IndexRequest indexRequest, RequestOptions options) throws IOException {
return performRequestAndParseEntity(indexRequest, RequestConverters::index, options, IndexResponse::fromXContent, emptySet());
}
1.RequestConverters::index Request
static Request index(IndexRequest indexRequest) {
String method = Strings.hasLength(indexRequest.id()) ? HttpPut.METHOD_NAME : HttpPost.METHOD_NAME; // ID PUT POST
boolean isCreate = (indexRequest.opType() == DocWriteRequest.OpType.CREATE);
// uri
String endpoint = endpoint(indexRequest.index(), indexRequest.type(), indexRequest.id(), isCreate ? "_create" : null);
Request request = new Request(method, endpoint);
// request
Params parameters = new Params(request);
parameters.withRouting(indexRequest.routing());
parameters.withParent(indexRequest.parent());
parameters.withTimeout(indexRequest.timeout());
parameters.withVersion(indexRequest.version());
parameters.withVersionType(indexRequest.versionType());
parameters.withIfSeqNo(indexRequest.ifSeqNo());
parameters.withIfPrimaryTerm(indexRequest.ifPrimaryTerm());
parameters.withPipeline(indexRequest.getPipeline());
parameters.withRefreshPolicy(indexRequest.getRefreshPolicy());
parameters.withWaitForActiveShards(indexRequest.waitForActiveShards(), ActiveShardCount.DEFAULT);
// byte[]
BytesRef source = indexRequest.source().toBytesRef();
ContentType contentType = createContentType(indexRequest.getContentType());
request.setEntity(new NByteArrayEntity(source.bytes, source.offset, source.length, contentType));
return request;
}
2. response
3. : new EmptySet<>();
4. ES
private Resp internalPerformRequest(Req request,
CheckedFunction requestConverter,
RequestOptions options,
CheckedFunction responseConverter,
Set ignores) throws IOException {
Request req = requestConverter.apply(request);
req.setOptions(options);
Response response;
try {
response = client.performRequest(req);
} catch (ResponseException e) {
if (ignores.contains(e.getResponse().getStatusLine().getStatusCode())) {
try {
return responseConverter.apply(e.getResponse());
} catch (Exception innerException) {
throw parseResponseException(e);
}
}
throw parseResponseException(e);
}
try {
return responseConverter.apply(response);
} catch(Exception e) {
throw new IOException("Unable to parse response body for " + response, e);
}
}
RestClien 요청 전송 절차
1.
public Response performRequest(Request request) throws IOException {
SyncResponseListener listener = new SyncResponseListener(maxRetryTimeoutMillis);
performRequestAsyncNoCatch(request, listener);
return listener.get();
}
// url, request
void performRequestAsyncNoCatch(Request request, ResponseListener listener) throws IOException {
Map requestParams = new HashMap<>(request.getParameters());
String ignoreString = requestParams.remove("ignore");
Set ignoreErrorCodes;
if (ignoreString == null) {
if (HttpHead.METHOD_NAME.equals(request.getMethod())) {
//404 never causes error if returned for a HEAD request
ignoreErrorCodes = Collections.singleton(404);
} else {
ignoreErrorCodes = Collections.emptySet();
}
} else {
String[] ignoresArray = ignoreString.split(",");
ignoreErrorCodes = new HashSet<>();
if (HttpHead.METHOD_NAME.equals(request.getMethod())) {
//404 never causes error if returned for a HEAD request
ignoreErrorCodes.add(404);
}
for (String ignoreCode : ignoresArray) {
try {
ignoreErrorCodes.add(Integer.valueOf(ignoreCode));
} catch (NumberFormatException e) {
throw new IllegalArgumentException("ignore value should be a number, found [" + ignoreString + "] instead", e);
}
}
}
URI uri = buildUri(pathPrefix, request.getEndpoint(), requestParams);// url
HttpRequestBase httpRequest = createHttpRequest(request.getMethod(), uri, request.getEntity());// request
setHeaders(httpRequest, request.getOptions().getHeaders());
FailureTrackingResponseListener failureTrackingResponseListener = new FailureTrackingResponseListener(listener);
long startTime = System.nanoTime();
performRequestAsync(startTime, nextNode(), httpRequest, ignoreErrorCodes,
request.getOptions().getWarningsHandler() == null ? warningsHandler : request.getOptions().getWarningsHandler(),
request.getOptions().getHttpAsyncResponseConsumerFactory(), failureTrackingResponseListener);//
}
// , response
private void performRequestAsync(final long startTime, final NodeTuple> nodeTuple, final HttpRequestBase request,
final Set ignoreErrorCodes,
final WarningsHandler thisWarningsHandler,
final HttpAsyncResponseConsumerFactory httpAsyncResponseConsumerFactory,
final FailureTrackingResponseListener listener) {
final Node node = nodeTuple.nodes.next(); //
final HttpAsyncRequestProducer requestProducer = HttpAsyncMethods.create(node.getHost(), request);
final HttpAsyncResponseConsumer asyncResponseConsumer =
httpAsyncResponseConsumerFactory.createHttpAsyncResponseConsumer();
final HttpClientContext context = HttpClientContext.create();
context.setAuthCache(nodeTuple.authCache);
client.execute(requestProducer, asyncResponseConsumer, context, new FutureCallback() {
@Override
public void completed(HttpResponse httpResponse) { //
try {
RequestLogger.logResponse(logger, request, node.getHost(), httpResponse);
int statusCode = httpResponse.getStatusLine().getStatusCode();
Response response = new Response(request.getRequestLine(), node.getHost(), httpResponse);
if (isSuccessfulResponse(statusCode) || ignoreErrorCodes.contains(response.getStatusLine().getStatusCode())) {
onResponse(node);
if (thisWarningsHandler.warningsShouldFailRequest(response.getWarnings())) {
listener.onDefinitiveFailure(new WarningFailureException(response));
} else {
listener.onSuccess(response);
}
} else {
ResponseException responseException = new ResponseException(response);
if (isRetryStatus(statusCode)) {
//mark host dead and retry against next one
onFailure(node);
retryIfPossible(responseException);
} else {
//mark host alive and don't retry, as the error should be a request problem
onResponse(node);
listener.onDefinitiveFailure(responseException);
}
}
} catch(Exception e) {
listener.onDefinitiveFailure(e);
}
}
});
}
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
다양한 언어의 JSONJSON은 Javascript 표기법을 사용하여 데이터 구조를 레이아웃하는 데이터 형식입니다. 그러나 Javascript가 코드에서 이러한 구조를 나타낼 수 있는 유일한 언어는 아닙니다. 저는 일반적으로 '객체'{}...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.