flume 데이터를elasticsearch 5로 보내기X 솔루션

10943 단어 elasticsearch
elasticsearch를 5로 업그레이드합니다.X 이후,flume를 사용하여kafka 데이터를 읽고es에 오류를 보고하고github에서 항목을 찾지만,readme.md 예시에 오류가 있습니다. 원본 코드를 내리고 해결 방안을 첨부합니다.
  • 잘 싸인 가방과 의존을 다운로드(문장 말미에 따라github에서 다운로드하여 의존을 제거할 수도 있음)
  • https://download.csdn.net/download/qq_25067199/10305213

  • 플럼의lib 아래로 압축을 풀다
  • flumed의lib에서 jackson-core-2.3.1을 삭제합니다.jar
  • 에이전트 설정을 추가하여 zookeeperConnect, topic,cluster를 자체적으로 설정합니다.name,client.hosts 등
  • 시동에 성공했습니다
  • agent.sources = r1
    agent.sinks = es_sink
    agent.channels = c1
    
    # Describe/configure the source
    agent.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
    agent.sources.r1.zookeeperConnect = localhost:2181,localhost1:2181,
    agent.sources.r1.topic =test
    agent.sources.r1.groupId = flume
    agent.sources.r1.kafka.consumer.timeout.ms = 100
    
    # Describe the sink
    agent.sinks.es_sink.type=com.cognitree.flume.sink.elasticsearch.ElasticSearchSink
    agent.sinks.es_sink.es.bulkActions=5
    agent.sinks.es_sink.es.bulkProcessor.name=bulkprocessor
    agent.sinks.es_sink.es.bulkSize=5
    agent.sinks.es_sink.es.bulkSize.unit=MB
    agent.sinks.es_sink.es.concurrent.request=1
    agent.sinks.es_sink.es.flush.interval.time=5m
    agent.sinks.es_sink.es.backoff.policy.time.interval=50M
    agent.sinks.es_sink.es.backoff.policy.retries=8
    agent.sinks.es_sink.es.cluster.name=mycluster
    agent.sinks.es_sink.es.client.transport.sniff=false
    agent.sinks.es_sink.es.client.transport.ignore_cluster_name=false
    agent.sinks.es_sink.es.client.transport.ping_timeout=5s
    agent.sinks.es_sink.es.client.transport.nodes_sampler_interval=5s
    agent.sinks.es_sink.es.client.hosts=localhost
    agent.sinks.es_sink.es.client.port=9300
    agent.sinks.es_sink.es.index=flume
    agent.sinks.es_sink.es.type=flume-%y-%m-%d
    agent.sinks.es_sink.es.index.builder=com.cognitree.flume.sink.elasticsearch.HeaderBasedIndexBuilder
    agent.sinks.es_sink.es.serializer=com.cognitree.flume.sink.elasticsearch.SimpleSerializer
    agent.sinks.es_sink.es.serializer.csv.fields=id:int,name:string,isemployee:boolean,leaves:float
    agent.sinks.es_sink.es.serializer.csv.delimiter=,
    agent.sinks.es_sink.es.serializer.avro.schema.file=/usr/local/schema.avsc
    
    
    # Use a channel which buffers events in memory
    agent.channels.c1.type = memory
    agent.channels.c1.capacity = 100000
    agent.channels.c1.transactionCapacity = 10000
    
    # Bind the source and sink to the channel
    agent.sources.r1.channels = c1
    agent.sinks.es_sink.channel = c1
  • 원작자github 주소 첨부https://github.com/cognitree/flume-elasticsearch-sink
  • 자체 다운로드: 릴리스에서 flume-ng-elasticsearch5.4-sink-1.0-bundle를 다운로드합니다.zip, 압축해제 lib, libext 디렉터리에 있는jar 패키지(아래 정리 참조),flume lib에 업로드하면 됩니다(원본 패키지는es-type에 시간 교체부호%y-%m-%d 등을 지원하지 않습니다)
  • flume-ng-elasticsearch5.4-sink-1.0.0
  • lang-mustache-client-5.4.1
  • netty-buffer-4.1.11.Final
  • netty-codec-4.1.11.Final
  • netty-codec-http-4.1.11.Final
  • netty-common-4.1.11.Final
  • netty-handler-4.1.11.Final
  • netty-resolver-4.1.11.Final
  • netty-transport-4.1.11.Final
  • percolator-client-5.4.1
  • reindex-client-5.4.1
  • transport-5.4.1
  • transport-netty3-client-5.4.1
  • transport-netty4-client-5.4.1

  • 좋은 웹페이지 즐겨찾기