빅데이터 학습 hadoop3.1.3 - Flume 개발 사용자 정의 Interceptor(실전 개발)
19661 단어 Hadoop
Flume 채집 서버의 로컬 로그를 사용하려면 로그 유형에 따라 다양한 종류의 로그를 분석 시스템으로 보내야 합니다.
2) 수요 분석
실제 개발에서 한 서버에서 발생하는 로그 유형은 여러 가지가 있을 수 있고 서로 다른 유형의 로그는 서로 다른 분석 시스템에 보내야 할 수 있다.이 때 Flume 토폴로지 구조의 Multiplexing 구조에 사용됩니다. Multiplexing의 원리는 이벤트에서 Header의 어떤 키의 값에 따라 서로 다른 이벤트를 서로 다른 Channel에 보내는 것입니다. 그래서 우리는 인터셉터를 사용자 정의하여 서로 다른 유형의 이벤트의 Header의value에 서로 다른 값을 부여해야 합니다.
이 사례에서 우리는 포트 데이터로 로그를 시뮬레이션하고 숫자(단일)와 알파벳(단일)으로 서로 다른 유형의 로그를 시뮬레이션합니다. 우리는 사용자 정의interceptor로 숫자와 알파벳을 구분하여 각각 다른 분석 시스템(Channel)으로 보내야 합니다.
3) 실현 단계 (1) 마븐 프로젝트를 만들고 다음과 같은 의존을 도입한다
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>HdfsTest</artifactId>
<groupId>com.caron.hdfs</groupId>
<version>1.0-SNAPSHOT</version>
<relativePath>../HdfsTest/pom.xml</relativePath>
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>com.caron.flume</groupId>
<artifactId>flume</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.9.0</version>
</dependency>
</dependencies>
</project>
(2) MyInterceptor 클래스를 정의하고 Interceptor 인터페이스 구현
package com.caron.flume.interceptor;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.List;
import java.util.Map;
/**
* @author Caron
* @create 2020-05-05-9:33
* @Description
* @Version
*/
public class MyInterceptor implements Interceptor {
public void initialize() {
}
public Event intercept(Event event) {
// Header
Map<String,String> headers = event.getHeaders();
// body
byte[] body = event.getBody();
//
String s = new String(body);
char c = s.charAt(0);
if((c <= 'z' && c >= 'a')|| (c <= 'Z' && c >= 'A') ){
headers.put("xxx","aaa");
}else {
headers.put("xxx","bbb");
}
return event;
}
public List<Event> intercept(List<Event> list) {
for (Event event :
list) {
intercept(event);
}
return list;
}
public void close() {
}
/**
* Builder Interceptor
*/
public static class MyBuilder implements Interceptor.Builder {
/**
*
* @return Interceptor
*/
public Interceptor build() {
return new MyInterceptor();
}
/**
*
* @param context
*/
public void configure(Context context) {
}
}
}
flume/lib 폴더에 압축 완료
(3)flume 프로필 편집
hadoop101의 Flume1에 넷캣 소스 1개,sink 그룹 1개(avro sink 2개)를 설정하고 해당하는 채널 셀렉터와interceptor를 설정합니다.101:
sudo vim /opt/module/flume/job/group4/flume1.conf
#Name the components on this agent
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 44444
#
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type =com.caron.flume.interceptor.MyInterceptor$MyBuilder
#
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = xxx
a1.sources.r1.selector.mapping.aaa = c1
a1.sources.r1.selector.mapping.bbb = c2
# Describe the sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop102
a1.sinks.k1.port = 4141
a1.sinks.k2.type=avro
a1.sinks.k2.hostname = hadoop103
a1.sinks.k2.port = 4242
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Use a channel which buffers events in memory
a1.channels.c2.type = memory
a1.channels.c2.capacity = 1000
a1.channels.c2.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1 c2
a1.sinks.k1.channel = c1
a1.sinks.k2.channel = c2
102:
sudo vim /opt/module/flume/job/group4/flume2.conf
a2.sources = r1
a2.sinks = k1
a2.channels = c1
a2.sources.r1.type = avro
a2.sources.r1.bind = hadoop102
a2.sources.r1.port = 4141
a2.sinks.k1.type = logger
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 100
a2.sinks.k1.channel = c1
a2.sources.r1.channels = c1
103:
sudo vim /opt/module/flume/job/group4/flume3.conf
a3.sources = r1
a3.sinks = k1
a3.channels = c1
a3.sources.r1.type = avro
a3.sources.r1.bind = hadoop103
a3.sources.r1.port = 4242
a3.sinks.k1.type = logger
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 100
a3.sinks.k1.channel = c1
a3.sources.r1.channels = c1
(4) 각각hadoop101,hadoop102,hadoop103에서flume 프로세스를 시작하고 선후 순서에 주의한다.
103:
bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group4/flume3.conf -Dflume.root.logger=INFO,console
102:
bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group4/flume2.conf -Dflume.root.logger=INFO,console
101:
bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group4/flume1.conf -Dflume.root.logger=INFO,console
(5)hadoop101에서netcat을 사용하여localhost:44444에 자모와 숫자를 보냅니다.
(6)hadoop102와hadoop103이 인쇄한 로그를 관찰한다.
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Java 액세스 Hadoop 분산 파일 시스템 HDFS 구성 설명프로파일 m103은hdfs 서비스 주소로 바꿉니다. Java 클라이언트를 이용하여 HDFS의 파일을 액세스하려면 프로필hadoop-0.20.2/conf/core-site를 사용해야 합니다.xml입니다. 처음에 저는 ...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.