빅데이터 학습 hadoop3.1.3 - Flume 개발 사용자 정의 Interceptor(실전 개발)

19661 단어 Hadoop
1) 사례 요구 사항
Flume 채집 서버의 로컬 로그를 사용하려면 로그 유형에 따라 다양한 종류의 로그를 분석 시스템으로 보내야 합니다.
2) 수요 분석
실제 개발에서 한 서버에서 발생하는 로그 유형은 여러 가지가 있을 수 있고 서로 다른 유형의 로그는 서로 다른 분석 시스템에 보내야 할 수 있다.이 때 Flume 토폴로지 구조의 Multiplexing 구조에 사용됩니다. Multiplexing의 원리는 이벤트에서 Header의 어떤 키의 값에 따라 서로 다른 이벤트를 서로 다른 Channel에 보내는 것입니다. 그래서 우리는 인터셉터를 사용자 정의하여 서로 다른 유형의 이벤트의 Header의value에 서로 다른 값을 부여해야 합니다.
이 사례에서 우리는 포트 데이터로 로그를 시뮬레이션하고 숫자(단일)와 알파벳(단일)으로 서로 다른 유형의 로그를 시뮬레이션합니다. 우리는 사용자 정의interceptor로 숫자와 알파벳을 구분하여 각각 다른 분석 시스템(Channel)으로 보내야 합니다.
3) 실현 단계 (1) 마븐 프로젝트를 만들고 다음과 같은 의존을 도입한다
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>HdfsTest</artifactId>
        <groupId>com.caron.hdfs</groupId>
        <version>1.0-SNAPSHOT</version>
        <relativePath>../HdfsTest/pom.xml</relativePath>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.caron.flume</groupId>
    <artifactId>flume</artifactId>

    <dependencies>
        <dependency>
            <groupId>org.apache.flume</groupId>
            <artifactId>flume-ng-core</artifactId>
            <version>1.9.0</version>
        </dependency>
    </dependencies>

</project>

(2) MyInterceptor 클래스를 정의하고 Interceptor 인터페이스 구현
package com.caron.flume.interceptor;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import java.util.List;
import java.util.Map;
/**
 * @author Caron
 * @create 2020-05-05-9:33
 * @Description
 * @Version
 */
public class MyInterceptor implements Interceptor {
    public void initialize() {
    }
    public Event intercept(Event event) {
        // Header
        Map<String,String> headers = event.getHeaders();
        // body
        byte[] body = event.getBody();
        // 
        String s = new String(body);
        char c = s.charAt(0);
        if((c <= 'z' && c >= 'a')|| (c <= 'Z' && c >= 'A') ){
            headers.put("xxx","aaa");
        }else {
            headers.put("xxx","bbb");
        }
        return event;
    }
    public List<Event> intercept(List<Event> list) {
        for (Event event :
                list) {
            intercept(event);
        }
        return list;
    }
    public void close() {
    }
    /**
     *  Builder Interceptor 
     */
    public static class MyBuilder implements Interceptor.Builder {
        /**
         *  
         * @return  Interceptor
         */
        public Interceptor build() {
            return new MyInterceptor();
        }
        /**
         *  
         * @param context  
         */
        public void configure(Context context) {
        }
    }
}

flume/lib 폴더에 압축 완료
(3)flume 프로필 편집
hadoop101의 Flume1에 넷캣 소스 1개,sink 그룹 1개(avro sink 2개)를 설정하고 해당하는 채널 셀렉터와interceptor를 설정합니다.101:
 sudo vim /opt/module/flume/job/group4/flume1.conf
#Name the components on this agent
 a1.sources = r1
 a1.sinks = k1 k2
 a1.channels = c1 c2
 # Describe/configure the source
 a1.sources.r1.type = netcat
 a1.sources.r1.bind = 0.0.0.0
 a1.sources.r1.port = 44444
 
 # 
 a1.sources.r1.interceptors = i1
 a1.sources.r1.interceptors.i1.type =com.caron.flume.interceptor.MyInterceptor$MyBuilder
 
 # 
 a1.sources.r1.selector.type = multiplexing
 a1.sources.r1.selector.header = xxx
 a1.sources.r1.selector.mapping.aaa = c1
 a1.sources.r1.selector.mapping.bbb = c2
 
 # Describe the sink
 a1.sinks.k1.type = avro
 a1.sinks.k1.hostname = hadoop102
 a1.sinks.k1.port = 4141
 
 a1.sinks.k2.type=avro
 a1.sinks.k2.hostname = hadoop103
 a1.sinks.k2.port = 4242
 
 # Use a channel which buffers events in memory
 a1.channels.c1.type = memory
 a1.channels.c1.capacity = 1000
 a1.channels.c1.transactionCapacity = 100
 
 # Use a channel which buffers events in memory
 a1.channels.c2.type = memory
 a1.channels.c2.capacity = 1000
 a1.channels.c2.transactionCapacity = 100
 
 # Bind the source and sink to the channel
 a1.sources.r1.channels = c1 c2
 a1.sinks.k1.channel = c1
 a1.sinks.k2.channel = c2

102:
sudo vim /opt/module/flume/job/group4/flume2.conf
 a2.sources = r1
 a2.sinks = k1
 a2.channels = c1
 
 a2.sources.r1.type = avro
 a2.sources.r1.bind = hadoop102
 a2.sources.r1.port = 4141
 
 a2.sinks.k1.type = logger
 
 a2.channels.c1.type = memory
 a2.channels.c1.capacity = 1000
 a2.channels.c1.transactionCapacity = 100
 
 a2.sinks.k1.channel = c1
 a2.sources.r1.channels = c1

103:
sudo vim /opt/module/flume/job/group4/flume3.conf
 a3.sources = r1
 a3.sinks = k1
 a3.channels = c1
 
 a3.sources.r1.type = avro
 a3.sources.r1.bind = hadoop103
 a3.sources.r1.port = 4242
 
 a3.sinks.k1.type = logger
 
 a3.channels.c1.type = memory
 a3.channels.c1.capacity = 1000
 a3.channels.c1.transactionCapacity = 100
 
 a3.sinks.k1.channel = c1
 a3.sources.r1.channels = c1

(4) 각각hadoop101,hadoop102,hadoop103에서flume 프로세스를 시작하고 선후 순서에 주의한다.
103:
bin/flume-ng agent --conf conf/ --name a3 --conf-file job/group4/flume3.conf -Dflume.root.logger=INFO,console

102:
bin/flume-ng agent --conf conf/ --name a2 --conf-file job/group4/flume2.conf -Dflume.root.logger=INFO,console

101:
bin/flume-ng agent --conf conf/ --name a1 --conf-file job/group4/flume1.conf -Dflume.root.logger=INFO,console

(5)hadoop101에서netcat을 사용하여localhost:44444에 자모와 숫자를 보냅니다.
(6)hadoop102와hadoop103이 인쇄한 로그를 관찰한다.

좋은 웹페이지 즐겨찾기