Elasticsearch5.4 클러스터(3) Flume1.6sink 호환
8105 단어 ElasticSearch
flume로 kafka에서 메시지를 소비하고 ElasticsearchSink로 ES에 데이터를 기록합니다.ES가 1.7.1에서 5.4.1로 업그레이드된 후flume의 코드가 바뀌지 않고 시작된 후sink가 대량으로 오류를 보고하여 모든 데이터를 기록하는 데 실패했습니다.JDK는 1.8, elasticsearch 버전은 5.4.1로 올라갔고, Log4j도 덧붙여서 안 된다.
org.apache.logging.log4jlog4j-api2.8.2org.apache.logging.log4jlog4j-core2.8.2
flume 최신 버전 1.7을 찾았지만 ES2는 지원되지 않습니다.X(https://stackoverflow.com/questions/36614488/flume-1-6-compatibility-with-elasticsearch-2-3-1). GitHub에서 ES2를 지원하는 항목이 있습니다.x(https://github.com/lucidfrontier45/ElasticsearchSink2). 사실sink는 매우 간단하다. 단지 kafka에서 끌어낸 데이터를 간단하게 해석한 후에 ES에 쓴다. 호환 문제를 해결하기 위해 다음 두 가지 방안을 고려한다
시나리오 1 ESSink 다시 쓰기
flume의 구조는 비교적 간단하다. ESSink의 실현, configure(Context context) 초기화 방법을 수정하고 여분의 초기화 파라미터를 제거한다.start() 방법에서 esClient 연결을 구성하고 차례대로 완성한 후에 제출한다.마지막으로 프로세스 () 방법에서 esClient로 직접 제출합니다.이전 코드: esClient.addEvent(event, indexNameBuilder, logType, ttlMs); ... client.execute(); 새 코드: bulkRequestBuilder.add(esClient.prepareIndex(indexNameBuilder.getIndexName(event), logType).setSource(leidaSerializer.getContentBuilder(event))); ... BulkResponse bulkResponse = bulkRequestBuilder.execute().actionGet(); bulkRequestBuilder.request().requests().clear();
ESSink의 start() 메소드 수정 코드
//
Settings settings = Settings.builder().put("cluster.name", clusterName).build();
// client
for (String esIpTcpport : serverAddresses) {
String[] hostPort = esIpTcpport.trim().split(":");
try {
if (null == esClient) {
esClient = new PreBuiltTransportClient(settings)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(hostPort[0]), Integer.parseInt(hostPort[1])));
} else {
esClient = esClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(hostPort[0]), Integer.parseInt(hostPort[1])));
}
} catch (UnknownHostException e) {
// e.printStackTrace();
}
}
//
bulkRequestBuilder = esClient.prepareBulk();
ESSink의 process() 메소드 수정public Status process() {
Status status = Status.READY;
Channel channel = getChannel();
Transaction txn = channel.getTransaction();
Event event = null;
try {
txn.begin();
int count;
for (count = 0; count < batchSize; ++count) {
event = channel.take();
if (event == null) {
break;
}
if (event.getBody().length == 0) {
continue;
}
//esClient.addEvent(event, indexNameBuilder, logType, ttlMs);
bulkRequestBuilder.add(esClient.prepareIndex(indexNameBuilder.getIndexName(event),
logType).setSource(leidaSerializer.getContentBuilder(event)));
}
if (count <= 0) {
sinkCounter.incrementBatchEmptyCount();
counterGroup.incrementAndGet("channel.underflow");
status = Status.BACKOFF;
} else {
if (count < batchSize) {
sinkCounter.incrementBatchUnderflowCount();
} else {
sinkCounter.incrementBatchCompleteCount();
}
sinkCounter.addToEventDrainAttemptCount(count);
//esClient.execute();
BulkResponse bulkResponse = bulkRequestBuilder.execute().actionGet();
bulkRequestBuilder.request().requests().clear();
if (bulkResponse.hasFailures()){
//System.out.println("failure");
}
}
sinkCounter.addToEventDrainSuccessCount(count);
counterGroup.incrementAndGet("transaction.success");
} catch (Throwable ex) {
....
} finally {
...
}
return status;
}
재포장, 정상 작동, ES 쓰기 가능방안 2flume-ng-elasticsearch-sink 패키지 코드 수정
원본 코드를 다운로드하여 마븐 프로젝트를 가져오면elasticsearch를 5.4.1의 패키지 2개로 바꾸고 컴파일과 호환 오류를 수정합니다
org.elasticsearchelasticsearch5.4.1org.elasticsearch.clienttransport5.4.1
ElasticSearchTransportClient 클래스의configureHostnames () 방법을 수정하고, InetSocketTransportAddress의 구조 방법을 변경했습니다.
serverAddresses[i] = new InetSocketTransportAddress(host, port);
다음과 같이 바꿉니다. try {
serverAddresses[i] = new InetSocketTransportAddress(InetAddress.getByName(host), port);
} catch (UnknownHostException e) {
e.printStackTrace();
}
ElasticSearchTransportClient 클래스의addEvent() 방법을 수정하고 다음과 같이 수정하며 indexRequestBuilder를 제거합니다.setTTL(ttlMs), ES5는 ttl를 지원하지 않습니다. indexRequestBuilder = client
.prepareIndex(indexNameBuilder.getIndexName(event), indexType)
.setSource(serializer.getContentBuilder(event).bytes());
다음과 같이 바꿉니다. indexRequestBuilder = client
.prepareIndex(indexNameBuilder.getIndexName(event), indexType)
.setSource(serializer.getContentBuilder(event));// .bytes()
ElasticSearchTransportClient 클래스의 openClient() 방법도 컴파일 오류가 있습니다. 다음과 같이 바꿉니다. Settings settings = Settings.builder()
.put("cluster.name", clusterName).build();
// TransportClient transportClient = new TransportClient(settings);
// for (InetSocketTransportAddress host : serverAddresses) {
// transportClient.addTransportAddress(host);
// }
TransportClient transportClient = null;
for (InetSocketTransportAddress host : serverAddresses) {
if (null == transportClient) {
transportClient = new PreBuiltTransportClient(settings).addTransportAddress(host);
} else {
transportClient = transportClient.addTransportAddress(host);
}
}
if (client != null) {
client.close();
}
client = transportClient;
ElasticSearchTransportClient 클래스의 openLocalDiscoveryClient() 방법은 일반적으로 테스트용으로 코드를 직접 주석하고 빈 방법만 남긴다ElasticSearchRestClient 클래스의addEvent() 메서드에서 한 줄 수정
bulkBuilder.append(content.toBytesArray().toUtf8());
bulkBuilder.append(content.utf8ToString());//.toBytesArray().toUtf8();
ElasticSearchLogStash EventSerializer 클래스의 appendHeaders() 방법에 컴파일 오류가 있습니다. 다음과 같이 변경합니다.
//Map headers = Maps.newHashMap(event.getHeaders());
Map headers = new HashMap<>(event.getHeaders());
ElasticSearchEventSerializer 클래스의 getContentBuilder() 메소드 반환 값은 XContentBuilder로 변경됩니다.ElasticSearchEventSerializer 클래스를 사용했습니다. ES5.4 새 클라이언트 setSource() 매개 변수는 XContentBuilder 형식입니다. 이 런타임 오류를 수정하지 않습니다.
java.lang.IllegalArgumentException: The number of object passed must be even but was [1]
at org.elasticsearch.action.index.IndexRequest.source(IndexRequest.java:431)
at org.elasticsearch.action.index.IndexRequest.source(IndexRequest.java:418)
at org.elasticsearch.action.index.IndexRequestBuilder.setSource(IndexRequestBuilder.java:210)
ContentBuilderUtil 클래스의 addComplexField() 메서드에서 코드 행을 수정해야 합니다.
XContentFactory.xContent(contentType).createParser(data);
다음과 같이 바꿉니다. XContentFactory.xContent(contentType).createParser(NamedXContentRegistry.EMPTY, data);//.createParser(data);
번역 오류가 없어졌으니 다시 포장하여 발표합니다.정상적으로 실행하면 es5.4 집단에 데이터를 쓸 수 있습니다.다른 것은 아직 고칠 것이 있을 수 있습니다. 제 flume2es는 사용하지 않았습니다. 잠시 잘못 보고하지 않았습니다.개인적으로는 첫 번째 방식이 좋다고 생각합니다. 간단하고 제어할 수 있고flume-ng-elasticsearch-sink 패키지에 의존하지 않습니다.
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
spring-data-elasticsearch 페이지 조회부록: 1. 이름에서 알 수 있듯이QueryBuilder는 검색 조건, 필터 조건을 구축하는 데 사용되고 SortBuilder는 정렬을 구축하는 데 사용된다. 예를 들어 우리는 어느 위치에서 100미터 범위 내의 모...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.