Flume 학습 노트 (2) 문제 정리

13582 단어 hadoopFlumekafka
운영 체제: CentOS 7.2.1511 64비트 Flume 버전: 1.6.0

1. Flume이 Hadoop과 같은 서버에 없는 경우


Flume이 Hadoop과 같은 서버에 없을 때 쓰기 HDFS를 구성하면 Flume이 시작할 때 클래스를 찾을 수 없는 오류가 발생합니다.Hadoop과 관련된 패키지를 flume의classpath 설정에 추가하거나 flume의lib 폴더에 직접 복사해야 합니다.구체적인 패키지는 maven 프로젝트에서 설정합니다.
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>2.6.4</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-hdfs</artifactId>
    <version>2.6.4</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.6.4</version>
</dependency>

그리고 모든 의존하는 가방(총 82개)을 flume의lib에 복사합니다. (일부 가방은lib와 중복됩니다.)실제 상황은 사용하지 않은 가방이 많을 것이니 나중에 시간이 있으면 다시 간소화할 것이다.

2. HA가 구성된 HDFS에 쓰기


Flume에서 HDFS에 데이터를 써야 하고 Hadoop 서버에서 HA를 설정해야 할 때 두 가지 설정 방안을 시도했습니다.

시나리오 1


그 중 하나의namenode를 설정하고host에 추가합니다.이 방안은 쓸 수 있지만 Hadoop의 HA는 작용하지 않는다.노드가 끊어지면flume도 수동으로 설정을 수정해야만 다른namenode를 사용할 수 있습니다.

시나리오 2


Hadoop의nameservices(xxfs)를flume의 hdfs.path 속성에 직접 설정합니다.이 시나리오는 다음 오류를 보고합니다.
2016-08-04 13:34:55,535 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: java.lang.IllegalArgumentException: java.net.UnknownHostException: xxfs
    at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:463)
    at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
    at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException: java.net.UnknownHostException: xxfs
    at org.apache.hadoop.security.SecurityUtil.buildTokenService(SecurityUtil.java:374)
    at org.apache.hadoop.hdfs.NameNodeProxies.createNonHAProxy(NameNodeProxies.java:310)
    at org.apache.hadoop.hdfs.NameNodeProxies.createProxy(NameNodeProxies.java:176)
    at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:668)
    at org.apache.hadoop.hdfs.DFSClient.<init>(DFSClient.java:604)
    at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:148)
    at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2596)
    at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
    at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)
    at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)
    at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
    at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
    at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:243)
    at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:235)
    at org.apache.flume.sink.hdfs.BucketWriter$9$1.run(BucketWriter.java:679)
    at org.apache.flume.auth.SimpleAuthenticator.execute(SimpleAuthenticator.java:50)
    at org.apache.flume.sink.hdfs.BucketWriter$9.call(BucketWriter.java:676)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    ... 1 more
Caused by: java.net.UnknownHostException: xxfs
    ... 21 more

최종 방안


(1) Hadoop의nameservices(xxfs으로 가정)를 flume의 hdfs.path 속성에 설정합니다.예:
a1.sinks.userSink.hdfs.path = hdfs://xxfs/flume

(2) Hadoop 서버에 구성된 core-site.xml、hdfs-site.xml은flume의conf 폴더로 복사됩니다.Flume을 다시 시작하면 사용할 수 있습니다.(3) Hadoop에서 사용하는 여러 서버를 host로 만듭니다.

3. Kafka Channel의 parse AsFlume Event


프로젝트에 Flume의 일부 데이터를 Kafka로 써야 한다는 수요가 있기 때문에 테스트를 했습니다. Memory Channel+Kafka Sink의 성능을 통해 Kafka Channel을 직접 사용하는 것보다 성능이 떨어지는 것을 배경으로 했습니다.실제로 사용하는 과정에서 parseAsFlume Event라는 설정이 작동하지 않는 것을 발견했습니다.즉, parseAsFlume Event가true로 설정되었든 false로 설정되었든 모두 Flume Event로 전환됩니다.이렇게 되면 Flume의 헤더에 있는 정보를 끝까지 섞어서 Kafka의 메시지에 쓴다는 결과가 나온다. 이것은 분명히 내가 필요로 하는 것이 아니다. 나는 단지 내용을 쓰면 된다.나중에 제가 몇 가지 자료를 조회했는데 인터넷에서도 이 버그를 발견했고 버그fix를 Flume 정부에 제출했지만 다음 버전(1.7)이 되어야 해결할 수 있습니다.어쩔 수 없이 Memory Channel+Kafka Sink를 대체할 수밖에 없었다.

좋은 웹페이지 즐겨찾기