Kafka에서 Treasure Data로 브리지하기 Docker Compose
11707 단어 docker-composeFluentdKafkatd-agent
Docker Compose
처음에 이번에 작성하는 프로젝트의 디렉토리 구성입니다.
$ tree -a
.
├── docker-compose.yml
├── .env
├── .gitignore
├── kafka-bridge
│ ├── Dockerfile
│ ├── fluentd-consumer.properties
│ └── log4j.properties
└── td-agent2
├── Dockerfile
└── td-agent.conf
2 directories, 9 files
docker-compose.yml
td-agent와 Kafka Consumer 서비스는 각각 Dockefile을 작성하고 빌드합니다. Kafka는 landoop/fast-data-dev을 사용합니다. Confluent Open Source이 포함되어 있으므로 Kafka와 ZooKeeper도 시작됩니다.
docker-compose.yml
version: '2'
services:
kafka-stack:
image: landoop/fast-data-dev
environment:
- FORWARDLOGS=0
- RUNTESTS=0
- ADV_HOST=<仮想マシンのパブリックIPアドレス>
ports:
- 3030:3030
- 9092:9092
- 2181:2181
- 8081:8081
td-agent2:
build: ./td-agent2
env_file:
- ./.env
ports:
- 24224:24224
kafka-bridge:
build: ./kafka-bridge
depends_on:
- td-agent
.env
Treasure Data의 접속 정보는 환경 변수 파일의
.env
에 기술해 Docker Compose로부터 읽어들입니다.td-agent2/.env
TD_API_KEY=<YOUR API KEY>
TD_ENDPOINT=<TD ENDPOINT>
td-agent2
td-agent의 Docker 이미지를 만듭니다.
Dockerfile
Overview of Server-Side Agent (td-agent)의 설치 지침을 따릅니다. 인스 타르 우분 투 니어 ltd - 겐 t2. sh 중에서는 sudo도 필요합니다.
td-agent2/Dockerfile
FROM ubuntu:xenial
RUN apt-get update && apt-get install sudo curl -y
RUN curl -L https://toolbelt.treasuredata.com/sh/install-ubuntu-xenial-td-agent2.sh | sh
RUN apt-get clean && rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
ADD td-agent.conf /etc/td-agent/
EXPOSE 24224
CMD ["/usr/sbin/td-agent"]
td-agent.conf
td-agent.conf는 환경 변수를 참조 할 수 있습니다. Treasure Data에 대한 연결 정보를
.env
파일에서 가져옵니다.td-agent2/td-agent.conf
<match td.*.*>
@type tdlog
endpoint "#{ENV['TD_ENDPOINT']}"
apikey "#{ENV['TD_API_KEY']}"
auto_create_table
buffer_type file
buffer_path /var/log/td-agent/buffer/td
use_ssl true
num_threads 8
</match>
<source>
@type forward
</source>
kafka-fluentd-consumer
Kafka에서 Treasure Data로의 브리지에는 kafka-fluentd-consumer의 Jar를 이용합니다.
Dockerfile
컴파일된 kafka-fluentd-consumer-0.3.1-all.jar을 다운로드합니다.
kafka-bridge/Dockerfile
FROM java:8-jre
ARG KAFKA_FLUENTD_CONSUMER_VERSION=0.3.1
WORKDIR /app
RUN wget -q -O kafka-fluentd-consumer-all.jar https://github.com/treasure-data/kafka-fluentd-consumer/releases/download/v$KAFKA_FLUENTD_CONSUMER_VERSION/kafka-fluentd-consumer-$KAFKA_FLUENTD_CONSUMER_VERSION-all.jar
ADD log4j.properties .
ADD fluentd-consumer.properties .
CMD ["java", "-Dlog4j.configuration=file:///app/log4j.properties", "-jar", "kafka-fluentd-consumer-all.jar", "fluentd-consumer.properties"]
fぅ엔 td-콘스눈 r. p로페r지혜s
기본의 설정에서 다음을 변경합니다.
fluentd.connect
와 zookeeper.connect
는 docker-compose.yml을 사용할 때 각각 서비스 이름을 지정합니다.kafka-bridge/fluentd-consumer.properties
# Fluentd instance destinations.
fluentd.connect=td-agent2:24224
# Dynamic event tag with topic name.
fluentd.tag.prefix=td.sensortag_dev.
# Consumed topics.
fluentd.consumer.topics=sensortag-sink
# The number of threads per consumer streams
fluentd.consumer.threads=1
# The path for backup un-flushed events during shutdown.
fluentd.consumer.backup.dir=/tmp/fluentd-consumer-backup/
# Kafka Consumer related parameters
zookeeper.connect=kafka-stack:2181
group.id=my-sensortag-sink-group
zookeeper.session.timeout.ms=400
zookeeper.sync.time.ms=200
auto.commit.interval.ms=1000
ぉg4j. p 로페 r 지혜 s
ぉg4j. p 로페 r 지혜 s 는 디폴트인 채로 사용합니다.
kafka-bridge/log4j.properties
# log4j logging configuration.
# This is based on Pinterest's secor
# root logger.
log4j.rootLogger=DEBUG, ROLLINGFILE
log4j.appender.ROLLINGFILE = org.apache.log4j.RollingFileAppender
log4j.appender.ROLLINGFILE.Threshold=INFO
log4j.appender.ROLLINGFILE.File=/tmp/fluentd-consumer.log
# keep log files up to 1G
log4j.appender.ROLLINGFILE.MaxFileSize=20MB
log4j.appender.ROLLINGFILE.MaxBackupIndex=50
log4j.appender.ROLLINGFILE.layout=org.apache.log4j.PatternLayout
log4j.appender.ROLLINGFILE.layout.ConversionPattern=%d{ISO8601} [%t] (%C:%L) %-5p %m%n
동작 확인
Docker Compose 서비스를 시작합니다.
$ docker-compose up -d
td-agent의 버전을 확인합니다.
$ docker-compose exec td-agent2 td-agent --version
td-agent 0.12.35
Spark Streaming을 사용한 윈도우 집계의 샘플 와 같이
fluentd.consumer.topics
로 지정한 topic에 JSON 포맷으로 데이터를 송신합니다.테스트로서
kafka-console-producer
로부터 직접 JSON을 송신해 보겠습니다.$ docker-compose exec kafka-stack kafka-console-producer \
--broker-list localhost:9092 \
--topic sensortag-sink
명령을 실행한 후 대기 상태에서 JSON 문자열을 입력합니다.
{"bid": "B0:B4:48:BD:DA:03", "time": 1501654353, "humidity": 27.152099609375, "objecttemp": 21.6875, "ambient": 27.09375, "rh": 78.4423828125}
td-agent는 파일 버퍼를 작성해 디폴트로는 5분 간격으로 Treasure Data에 데이터가 업로드합니다.
$ docker-compose exec td-agent2 ls /var/log/td-agent/buffer
td.sensortag_dev.sensortag_sink.b555bf24951c65554.log
Reference
이 문제에 관하여(Kafka에서 Treasure Data로 브리지하기 Docker Compose), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://qiita.com/masato/items/50400d480ec91c27977e텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)