Elasticsearch5.4 클러스터(3) Flume1.6sink 호환

8105 단어 ElasticSearch
Flume1.6sink는 ES5와 호환되지 않음
flume로 kafka에서 메시지를 소비하고 ElasticsearchSink로 ES에 데이터를 기록합니다.ES가 1.7.1에서 5.4.1로 업그레이드된 후flume의 코드가 바뀌지 않고 시작된 후sink가 대량으로 오류를 보고하여 모든 데이터를 기록하는 데 실패했습니다.JDK는 1.8, elasticsearch 버전은 5.4.1로 올라갔고, Log4j도 덧붙여서 안 된다.
org.apache.logging.log4jlog4j-api2.8.2org.apache.logging.log4jlog4j-core2.8.2

flume 최신 버전 1.7을 찾았지만 ES2는 지원되지 않습니다.X(https://stackoverflow.com/questions/36614488/flume-1-6-compatibility-with-elasticsearch-2-3-1). GitHub에서 ES2를 지원하는 항목이 있습니다.x(https://github.com/lucidfrontier45/ElasticsearchSink2). 사실sink는 매우 간단하다. 단지 kafka에서 끌어낸 데이터를 간단하게 해석한 후에 ES에 쓴다. 호환 문제를 해결하기 위해 다음 두 가지 방안을 고려한다
  • flume-ng-elasticsearch-sink 패키지 사용을 포기하고 ESSink를 다시 씁니다
  • flume-ng-elasticsearch-sink 패키지 코드를 수정하고 자체적으로 1.6.1을 업그레이드합니다

  • 시나리오 1 ESSink 다시 쓰기
    flume의 구조는 비교적 간단하다. ESSink의 실현, configure(Context context) 초기화 방법을 수정하고 여분의 초기화 파라미터를 제거한다.start() 방법에서 esClient 연결을 구성하고 차례대로 완성한 후에 제출한다.마지막으로 프로세스 () 방법에서 esClient로 직접 제출합니다.이전 코드: esClient.addEvent(event, indexNameBuilder, logType, ttlMs); ... client.execute(); 새 코드: bulkRequestBuilder.add(esClient.prepareIndex(indexNameBuilder.getIndexName(event), logType).setSource(leidaSerializer.getContentBuilder(event))); ... BulkResponse bulkResponse = bulkRequestBuilder.execute().actionGet(); bulkRequestBuilder.request().requests().clear();
    ESSink의 start() 메소드 수정 코드
                //  
                Settings settings = Settings.builder().put("cluster.name", clusterName).build();
                //  client
                for (String esIpTcpport : serverAddresses) {
                    String[] hostPort = esIpTcpport.trim().split(":");
                    try {
                        if (null == esClient) {
                            esClient = new PreBuiltTransportClient(settings)
                                .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(hostPort[0]), Integer.parseInt(hostPort[1])));
                        } else {
                            esClient = esClient.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(hostPort[0]), Integer.parseInt(hostPort[1])));
                        }
                    } catch (UnknownHostException e) {
                        // e.printStackTrace();
                    }
                }
    			//  
                bulkRequestBuilder = esClient.prepareBulk();
    
    ESSink의 process() 메소드 수정
    public Status process() {
            Status status = Status.READY;
            Channel channel = getChannel();
            Transaction txn = channel.getTransaction();
            Event event = null;
            try {
                txn.begin();
                int count;
                for (count = 0; count < batchSize; ++count) {
                    event = channel.take();
                    if (event == null) {
                        break;
                    }
                    if (event.getBody().length == 0) {
                        continue;
                    }
                    //esClient.addEvent(event, indexNameBuilder, logType, ttlMs);
                    bulkRequestBuilder.add(esClient.prepareIndex(indexNameBuilder.getIndexName(event),
                            logType).setSource(leidaSerializer.getContentBuilder(event)));
                }
    
                if (count <= 0) {
                    sinkCounter.incrementBatchEmptyCount();
                    counterGroup.incrementAndGet("channel.underflow");
                    status = Status.BACKOFF;
                } else {
                    if (count < batchSize) {
                        sinkCounter.incrementBatchUnderflowCount();
                    } else {
                        sinkCounter.incrementBatchCompleteCount();
                    }
    
                    sinkCounter.addToEventDrainAttemptCount(count);
                    //esClient.execute();
                    BulkResponse bulkResponse = bulkRequestBuilder.execute().actionGet();
                    bulkRequestBuilder.request().requests().clear();
                    if (bulkResponse.hasFailures()){
                        //System.out.println("failure");
                    }
                }
    
                sinkCounter.addToEventDrainSuccessCount(count);
                counterGroup.incrementAndGet("transaction.success");
            } catch (Throwable ex) {
                ....
            } finally {
                ...
            }
            return status;
    }
    
    재포장, 정상 작동, ES 쓰기 가능
    방안 2flume-ng-elasticsearch-sink 패키지 코드 수정
    원본 코드를 다운로드하여 마븐 프로젝트를 가져오면elasticsearch를 5.4.1의 패키지 2개로 바꾸고 컴파일과 호환 오류를 수정합니다
    org.elasticsearchelasticsearch5.4.1org.elasticsearch.clienttransport5.4.1

    ElasticSearchTransportClient 클래스의configureHostnames () 방법을 수정하고, InetSocketTransportAddress의 구조 방법을 변경했습니다.
          serverAddresses[i] = new InetSocketTransportAddress(host, port);
    
    다음과 같이 바꿉니다.
          try {
            serverAddresses[i] = new InetSocketTransportAddress(InetAddress.getByName(host), port);
          } catch (UnknownHostException e) {
            e.printStackTrace();
          }
    
    ElasticSearchTransportClient 클래스의addEvent() 방법을 수정하고 다음과 같이 수정하며 indexRequestBuilder를 제거합니다.setTTL(ttlMs), ES5는 ttl를 지원하지 않습니다.
           indexRequestBuilder = client
               .prepareIndex(indexNameBuilder.getIndexName(event), indexType)
               .setSource(serializer.getContentBuilder(event).bytes());
    
    다음과 같이 바꿉니다.
           indexRequestBuilder = client
               .prepareIndex(indexNameBuilder.getIndexName(event), indexType)
               .setSource(serializer.getContentBuilder(event));// .bytes()
    
    ElasticSearchTransportClient 클래스의 openClient() 방법도 컴파일 오류가 있습니다. 다음과 같이 바꿉니다.
        Settings settings = Settings.builder()
            .put("cluster.name", clusterName).build();
    
    //    TransportClient transportClient = new TransportClient(settings);
    //    for (InetSocketTransportAddress host : serverAddresses) {
    //      transportClient.addTransportAddress(host);
    //    }
    
        TransportClient transportClient = null;
        for (InetSocketTransportAddress host : serverAddresses) {
                if (null == transportClient) {
                    transportClient = new PreBuiltTransportClient(settings).addTransportAddress(host);
                } else {
                    transportClient = transportClient.addTransportAddress(host);
                }
        }
        if (client != null) {
          client.close();
        }
        client = transportClient;
    
    ElasticSearchTransportClient 클래스의 openLocalDiscoveryClient() 방법은 일반적으로 테스트용으로 코드를 직접 주석하고 빈 방법만 남긴다
    ElasticSearchRestClient 클래스의addEvent() 메서드에서 한 줄 수정
    bulkBuilder.append(content.toBytesArray().toUtf8());
    
    bulkBuilder.append(content.utf8ToString());//.toBytesArray().toUtf8();
    

    ElasticSearchLogStash EventSerializer 클래스의 appendHeaders() 방법에 컴파일 오류가 있습니다. 다음과 같이 변경합니다.
        //Map headers = Maps.newHashMap(event.getHeaders());
        Map headers = new HashMap<>(event.getHeaders());
    

    ElasticSearchEventSerializer 클래스의 getContentBuilder() 메소드 반환 값은 XContentBuilder로 변경됩니다.ElasticSearchEventSerializer 클래스를 사용했습니다. ES5.4 새 클라이언트 setSource() 매개 변수는 XContentBuilder 형식입니다. 이 런타임 오류를 수정하지 않습니다.
    java.lang.IllegalArgumentException: The number of object passed must be even but was [1]
            at org.elasticsearch.action.index.IndexRequest.source(IndexRequest.java:431)
            at org.elasticsearch.action.index.IndexRequest.source(IndexRequest.java:418)
            at org.elasticsearch.action.index.IndexRequestBuilder.setSource(IndexRequestBuilder.java:210)
    

    ContentBuilderUtil 클래스의 addComplexField() 메서드에서 코드 행을 수정해야 합니다.
          XContentFactory.xContent(contentType).createParser(data);
    
    다음과 같이 바꿉니다.
          XContentFactory.xContent(contentType).createParser(NamedXContentRegistry.EMPTY, data);//.createParser(data);
    
    번역 오류가 없어졌으니 다시 포장하여 발표합니다.정상적으로 실행하면 es5.4 집단에 데이터를 쓸 수 있습니다.다른 것은 아직 고칠 것이 있을 수 있습니다. 제 flume2es는 사용하지 않았습니다. 잠시 잘못 보고하지 않았습니다.
    개인적으로는 첫 번째 방식이 좋다고 생각합니다. 간단하고 제어할 수 있고flume-ng-elasticsearch-sink 패키지에 의존하지 않습니다.

    좋은 웹페이지 즐겨찾기