Kafka에서 Treasure Data로 브리지하기 Docker Compose

td-agent 컨테이너와 Kafka Consumer 컨테이너를 사용하여 Kafka에서 Treasure Data로 브리지하는 Docker Compose 서비스를 시작합니다. 다른 포스트 에서는 PySpark Streaming 창 집계 결과를 Kafka 주제에 출력하는 코드를 작성했습니다. 이 스트림 처리는 데이터 파이프라인의 전처리나 인리치먼트에 상당합니다. 다음에 빅데이터의 배치 처리를 상정해 Treasure Data에 보존합니다.

Docker Compose



처음에 이번에 작성하는 프로젝트의 디렉토리 구성입니다.
$ tree -a
.
├── docker-compose.yml
├── .env
├── .gitignore
├── kafka-bridge
│   ├── Dockerfile
│   ├── fluentd-consumer.properties
│   └── log4j.properties
└── td-agent2
    ├── Dockerfile
    └── td-agent.conf

2 directories, 9 files


docker-compose.yml



td-agent와 Kafka Consumer 서비스는 각각 Dockefile을 작성하고 빌드합니다. Kafka는 landoop/fast-data-dev을 사용합니다. Confluent Open Source이 포함되어 있으므로 Kafka와 ZooKeeper도 시작됩니다.

docker-compose.yml
version: '2'
services:
  kafka-stack:
    image: landoop/fast-data-dev
    environment:
      - FORWARDLOGS=0
      - RUNTESTS=0
      - ADV_HOST=<仮想マシンのパブリックIPアドレス>
    ports:
      - 3030:3030
      - 9092:9092
      - 2181:2181
      - 8081:8081
  td-agent2:
    build: ./td-agent2
    env_file:
      - ./.env
    ports:
      - 24224:24224
  kafka-bridge:
    build: ./kafka-bridge
    depends_on:
      - td-agent

.env



Treasure Data의 접속 정보는 환경 변수 파일의 .env에 기술해 Docker Compose로부터 읽어들입니다.

td-agent2/.env
TD_API_KEY=<YOUR API KEY>
TD_ENDPOINT=<TD ENDPOINT>

td-agent2



td-agent의 Docker 이미지를 만듭니다.

Dockerfile



Overview of Server-Side Agent (td-agent)의 설치 지침을 따릅니다. 인스 타르 우분 투 니어 ltd - 겐 t2. sh 중에서는 sudo도 필요합니다.

td-agent2/Dockerfile
FROM ubuntu:xenial

RUN apt-get update && apt-get install sudo curl -y
RUN curl -L https://toolbelt.treasuredata.com/sh/install-ubuntu-xenial-td-agent2.sh | sh
RUN apt-get clean && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*

ADD td-agent.conf /etc/td-agent/
EXPOSE 24224
CMD ["/usr/sbin/td-agent"]

td-agent.conf



td-agent.conf는 환경 변수를 참조 할 수 있습니다. Treasure Data에 대한 연결 정보를 .env 파일에서 가져옵니다.

td-agent2/td-agent.conf
<match td.*.*>
  @type tdlog
  endpoint "#{ENV['TD_ENDPOINT']}"
  apikey "#{ENV['TD_API_KEY']}"
  auto_create_table
  buffer_type file
  buffer_path /var/log/td-agent/buffer/td
  use_ssl true
  num_threads 8
</match>

<source>
  @type forward
</source>

kafka-fluentd-consumer



Kafka에서 Treasure Data로의 브리지에는 kafka-fluentd-consumer의 Jar를 이용합니다.

Dockerfile



컴파일된 kafka-fluentd-consumer-0.3.1-all.jar을 다운로드합니다.

kafka-bridge/Dockerfile
FROM java:8-jre
ARG KAFKA_FLUENTD_CONSUMER_VERSION=0.3.1

WORKDIR /app

RUN wget -q -O kafka-fluentd-consumer-all.jar https://github.com/treasure-data/kafka-fluentd-consumer/releases/download/v$KAFKA_FLUENTD_CONSUMER_VERSION/kafka-fluentd-consumer-$KAFKA_FLUENTD_CONSUMER_VERSION-all.jar

ADD log4j.properties .
ADD fluentd-consumer.properties .

CMD ["java", "-Dlog4j.configuration=file:///app/log4j.properties", "-jar", "kafka-fluentd-consumer-all.jar", "fluentd-consumer.properties"]

fぅ엔 td-콘스눈 r. p로페r지혜s



기본의 설정에서 다음을 변경합니다. fluentd.connectzookeeper.connect는 docker-compose.yml을 사용할 때 각각 서비스 이름을 지정합니다.
  • fluentd.connect=:24224
  • fluentd.tag.prefix=td.<데이터베이스 이름>.
  • fluentd.consumer.topics=<주제 이름>
  • zookeeper.connect=:2181
  • group.id=<소비자 그룹 이름>

  • kafka-bridge/fluentd-consumer.properties
    # Fluentd instance destinations.
    fluentd.connect=td-agent2:24224
    
    # Dynamic event tag with topic name. 
    fluentd.tag.prefix=td.sensortag_dev.
    
    # Consumed topics. 
    fluentd.consumer.topics=sensortag-sink
    
    # The number of threads per consumer streams
    fluentd.consumer.threads=1
    
    # The path for backup un-flushed events during shutdown.
    fluentd.consumer.backup.dir=/tmp/fluentd-consumer-backup/
    
    # Kafka Consumer related parameters
    zookeeper.connect=kafka-stack:2181
    group.id=my-sensortag-sink-group
    zookeeper.session.timeout.ms=400
    zookeeper.sync.time.ms=200
    auto.commit.interval.ms=1000
    

    ぉg4j. p 로페 r 지혜 s



    ぉg4j. p 로페 r 지혜 s 는 디폴트인 채로 사용합니다.

    kafka-bridge/log4j.properties
    # log4j logging configuration.
    # This is based on Pinterest's secor
    
    # root logger.
    log4j.rootLogger=DEBUG, ROLLINGFILE
    
    log4j.appender.ROLLINGFILE = org.apache.log4j.RollingFileAppender
    log4j.appender.ROLLINGFILE.Threshold=INFO
    log4j.appender.ROLLINGFILE.File=/tmp/fluentd-consumer.log
    # keep log files up to 1G
    log4j.appender.ROLLINGFILE.MaxFileSize=20MB
    log4j.appender.ROLLINGFILE.MaxBackupIndex=50
    log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
    log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} [%t] (%C:%L) %-5p %m%n
    

    동작 확인



    Docker Compose 서비스를 시작합니다.
    $ docker-compose up -d
    

    td-agent의 버전을 확인합니다.
    $ docker-compose exec td-agent2 td-agent --version
    td-agent 0.12.35
    

    Spark Streaming을 사용한 윈도우 집계의 샘플 와 같이 fluentd.consumer.topics 로 지정한 topic에 JSON 포맷으로 데이터를 송신합니다.

    테스트로서 kafka-console-producer 로부터 직접 JSON을 송신해 보겠습니다.
    $ docker-compose exec kafka-stack kafka-console-producer \
        --broker-list localhost:9092 \
        --topic sensortag-sink
    

      명령을 실행한 후 대기 상태에서 JSON 문자열을 입력합니다.
    {"bid": "B0:B4:48:BD:DA:03", "time": 1501654353, "humidity": 27.152099609375, "objecttemp": 21.6875, "ambient": 27.09375, "rh": 78.4423828125}
    

    td-agent는 파일 버퍼를 작성해 디폴트로는 5분 간격으로 Treasure Data에 데이터가 업로드합니다.
    $ docker-compose exec td-agent2 ls /var/log/td-agent/buffer
    td.sensortag_dev.sensortag_sink.b555bf24951c65554.log
    

    좋은 웹페이지 즐겨찾기