Kafka 설치/케이스 실행/Java에서 실행/신뢰성 & 처리량 향상 대책 검토

14019 단어 ZooKeeperKafkaJava
이 기사는 2014/07이 다른 블로그에 쓴 글을 Qita에 옮기는 것이다.정보는 오래된 가능성이 있으니 양해해 주십시오.
Kafka에 대한 개요는 이 페이지에서 확인하기 쉽습니다.
http://d.hatena.ne.jp/kimutansk/20130520
http://www.slideshare.net/yanaoki/kafka-10346557

개요


여러 데이터 원본에서 데이터를 수신하고 그 다음에 메시지 흐름으로 하는 실시간 분포식 메시지 처리에 사용되는 대기열 시스템

채용 실적


LinkedIn
開発元
キーワードランキング
PV/インプレッション
google analystic
アクセス解析
PV/UU/検索キーワードのリアルタイム集計
キャンペーンのソーシャルメディア反応
facebook
CTR(インターネット広告が表示された回数のうち、クリックされた数の割合を表す)
facebookのIN/OUTアクセスログ解析
Twitter analystic
Twitterにてどれだけそのページが呟かれているか
Twitterリンクボタンがどれだけ押されたか
구성도
· 아파치 카프카 내부

・파티션 적용 구성(샘플)

※A Kafka cluster with 3 brokers. 1 topic and 2 partitions,each with 2 replicas.

설치 단계


Zookeeper 설치(-v3.4.6)
http://mirror.reverse.net/pub/apache/zookeeper/
로컬에서 이진 데이터를 다운로드하고 압축을 풀다.
Kafka 설치(-v0.8.1.1)
http://kafka.apache.org/downloads.html
로컬에서 이진 데이터를 다운로드하고 압축을 풀다.

동작 확인(단일 구성)


ZooKeeper->port:2181 시작cd /home/centos/Downloads/zookeeper-3.4.6 bin/zkServer.sh start conf/zoo.cfg
JMX enabled by default
Using config: conf/zoo.cfg
Starting zookeeper ... STARTED
Kafka->port:9092 시작cd /home/centos/Downloads/kafka_2.10-0.8.1.1 bin/kafka-server-start.sh config/server.properties
[2014-07-07 01:19:18,168] INFO [Socket Server on Broker 0], Started (kafka.network.SocketServer)
[2014-07-07 01:19:18,283] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$)
[2014-07-07 01:19:18,316] INFO 0 successfully elected as leader (kafka.server.ZookeeperLeaderElector)
[2014-07-07 01:19:18,629] INFO Registered broker 0 at path /brokers/ids/0 with address localhost:9092. (kafka.utils.ZkUtils$)
[2014-07-07 01:19:18,631] INFO New leader is 0 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
[2014-07-07 01:19:18,652] INFO [Kafka Server 0], started (kafka.server.KafkaServer)
[2014-07-07 01:19:18,851] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [my-replicated-topic,0],[test,0] (kafka.server.ReplicaFetcherManager)
[2014-07-07 01:19:18,869] INFO Truncating log my-replicated-topic-0 to offset 5. (kafka.log.Log)
[2014-07-07 01:19:18,871] INFO Truncating log test-0 to offset 3. (kafka.log.Log)
[2014-07-07 01:19:18,895] INFO [ReplicaFetcherManager on broker 0] Added fetcher for partitions ArrayBuffer() (kafka.server.ReplicaFetcherManager)
[2014-07-07 01:19:18,936] INFO [ReplicaFetcherManager on broker 0] Removed fetcher for partitions [my-replicated-topic,0],[test,0] (kafka.server.ReplicaFetcherManager)
테마 만들기(※ 여기서 "test"테마 만들기)
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
목록에서 주제 확인bin/kafka-topics.sh --list --zookeeper localhost:2181
test
제품 관리자에게 메시지 보내기bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message
consumer 추가bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
This is a message
This is another message

동작 확인(클러스터 구조)


서버 속성 3개 준비(config/server.properties 복사)
config/server-1.propertie
broker.id = 1
ポート= 9092
log.dir = /log/kafka1
config/server-2.propertie
broker.id = 2
ポート= 9093
log.dir = /log/kafka2
config/server-3.propertie
broker.id = 3
ポート= 9094
log.dir = /log/kafka3
서버 시작bin/kafka-server-start.sh config/server-1.properties
[2014-07-07 00:33:26,348] INFO Awaiting socket connections on 0.0.0.0:9092. (kafka.network.Acceptor)
[2014-07-07 00:33:26,354] INFO [Socket Server on Broker 1], Started (kafka.network.SocketServer)
[2014-07-07 00:33:26,451] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$)
[2014-07-07 00:33:26,472] INFO 1 successfully elected as leader (kafka.server.ZookeeperLeaderElector) // <- broker.id=1がリーダーになった
[2014-07-07 00:33:26,801] INFO Registered broker 1 at path /brokers/ids/1 with address localhost:9092. (kafka.utils.ZkUtils$)
[2014-07-07 00:33:26,818] INFO [Kafka Server 1], started (kafka.server.KafkaServer)
[2014-07-07 00:33:26,822] INFO New leader is 1 (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
[2014-07-07 00:33:26,997] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions [my-replicated-topic,0] (kafka.server.ReplicaFetcherManager)
[2014-07-07 00:33:27,013] INFO Truncating log my-replicated-topic-0 to offset 8. (kafka.log.Log)
[2014-07-07 00:33:27,033] INFO [ReplicaFetcherManager on broker 1] Added fetcher for partitions ArrayBuffer() (kafka.server.ReplicaFetcherManager)
[2014-07-07 00:33:27,064] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions [my-replicated-topic,0] (kafka.server.ReplicaFetcherManager)
[2014-07-07 00:33:29,529] INFO Partition [my-replicated-topic,0] on broker 1: Expanding ISR for partition [my-replicated-topic,0] from 1 to 1,2 (kafka.cluster.Partition)
bin/kafka-server-start.sh config/server-2.properties
[2014-07-07 00:33:28,643] INFO [Socket Server on Broker 2], Started (kafka.network.SocketServer)
[2014-07-07 00:33:28,733] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$)
[2014-07-07 00:33:28,779] INFO conflict in /controller data: {"version":1,"brokerid":2,"timestamp":"1404718408740"} stored data: {"version":1,"brokerid":1,"timestamp":"1404718406460"} (kafka.utils.ZkUtils$)
[2014-07-07 00:33:28,904] INFO Registered broker 2 at path /brokers/ids/2 with address localhost:9093. (kafka.utils.ZkUtils$)
[2014-07-07 00:33:28,991] INFO [Kafka Server 2], started (kafka.server.KafkaServer)
[2014-07-07 00:33:29,416] INFO [ReplicaFetcherManager on broker 2] Removed fetcher for partitions [my-replicated-topic,0] (kafka.server.ReplicaFetcherManager)
[2014-07-07 00:33:29,420] INFO Truncating log my-replicated-topic-0 to offset 8. (kafka.log.Log)
[2014-07-07 00:33:29,471] INFO [ReplicaFetcherManager on broker 2] Added fetcher for partitions ArrayBuffer([[my-replicated-topic,0], initOffset 8 to broker id:1,host:localhost,port:9092] ) (kafka.server.ReplicaFetcherManager)
bin/kafka-server-start.sh config/server-3.properties
[2014-07-07 00:33:30,433] INFO Awaiting socket connections on 0.0.0.0:9094. (kafka.network.Acceptor)
[2014-07-07 00:33:30,437] INFO [Socket Server on Broker 3], Started (kafka.network.SocketServer)
[2014-07-07 00:33:30,530] INFO Will not load MX4J, mx4j-tools.jar is not in the classpath (kafka.utils.Mx4jLoader$)
[2014-07-07 00:33:30,560] INFO conflict in /controller data: {"version":1,"brokerid":3,"timestamp":"1404718410540"} stored data: {"version":1,"brokerid":1,"timestamp":"1404718406460"} (kafka.utils.ZkUtils$)
[2014-07-07 00:33:30,631] INFO Registered broker 3 at path /brokers/ids/3 with address localhost:9094. (kafka.utils.ZkUtils$)
[2014-07-07 00:33:30,652] INFO [Kafka Server 3], started (kafka.server.KafkaServer)
주제 만들기bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 2 --topic my-replicated-topic매니저 상태 확인.bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topicTopic: my-replicated-topi Partition: 0 Leader: 1 Replicas: 2,3,1Isr: 1,2,3
Topic: my-replicated-topi Partition: 1 Leader: 1 Replicas: 3,1,2Isr: 1,2,3
집단의 지도적 지위를 낮추려 하다
상술한 상태에서.id=2가 리더이기 때문에 그 과정을 없애세요.
^C
[2014-07-07 01:29:46,791] INFO Closing socket connection to /127.0.0.1. (kafka.network.Processor)
[2014-07-07 01:29:46,810] INFO 3 successfully elected as leader (kafka.server.ZookeeperLeaderElector) // -> broker.id=3がリーダーに選出
[2014-07-07 01:29:47,244] INFO New leader is 3 

Java 클라이언트 만들기


Java-Kafka를 사용하여 비동기 작업 대기열을 만듭니다.
Producter: 초당 task 던지기
Producter에서 받은 task 처리이때 처리하는 데 필요한 시간은 2초이다.

Kafka는 Producter가 던진task가 Consumer의 처리 능력을 초과할 때 작업 대기열이 됩니다.
Consuumer의 처리 능력은 Producter가 던진 임무의 양을 처리하면 처리를 따라잡을 수 있다.
이러한 Producter 및 Consuumer 비헤이비어가 있는 Java 클라이언트를 만듭니다.
https://github.com/n01boy/kafka-client-sample
실행 결과
Tue Jul 08 01:09:07 PDT 2014 : task no 1. send from producer
Tue Jul 08 01:09:07 PDT 2014 : message received... : task no 1. 192.168.2.209 Thread : 0
Tue Jul 08 01:09:08 PDT 2014 : task no 2. send from producer
Tue Jul 08 01:09:09 PDT 2014 : task no 3. send from producer
Tue Jul 08 01:09:09 PDT 2014 : message received... : task no 2. 192.168.2.166 Thread : 0
Tue Jul 08 01:09:10 PDT 2014 : task no 4. send from producer
Tue Jul 08 01:09:11 PDT 2014 : message received... : task no 3. 192.168.2.72 Thread : 0
Tue Jul 08 01:09:11 PDT 2014 : task no 5. send from producer
Tue Jul 08 01:09:12 PDT 2014 : task no 6. send from producer
Tue Jul 08 01:09:13 PDT 2014 : message received... : task no 4. 192.168.2.102 Thread : 0
Tue Jul 08 01:09:13 PDT 2014 : task no 7. send from producer
Tue Jul 08 01:09:14 PDT 2014 : task no 8. send from producer
Tue Jul 08 01:09:15 PDT 2014 : message received... : task no 5. 192.168.2.59 Thread : 0
Tue Jul 08 01:09:15 PDT 2014 : task no 9. send from producer
Tue Jul 08 01:09:16 PDT 2014 : task no 10. send from producer
Tue Jul 08 01:09:17 PDT 2014 : message received... : task no 6. 192.168.2.213 Thread : 0
Tue Jul 08 01:09:19 PDT 2014 : message received... : task no 7. 192.168.2.29 Thread : 0
Tue Jul 08 01:09:21 PDT 2014 : message received... : task no 8. 192.168.2.27 Thread : 0
Tue Jul 08 01:09:25 PDT 2014 : message received... : task no 9. 192.168.2.216 Thread : 0
Tue Jul 08 01:09:23 PDT 2014 : message received... : task no 10. 192.168.2.146 Thread : 0
finish!

신뢰성 및 처리량 향상 대책


참조: http://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines

Producter 동기화 모드 또는 비동기 모드

・同期モードだとレプリケーションが多いと遅くなる
ProducerからBrokerにwriteを始めると、log1,log2,log3に書き込みを行い、それが終了することを通知されるまで処理を待つ。
そのためレプリケーションが多い構成では同期モードが遅くなる。
ベンチマークによると3台構成で同期モードは非同期モードに比べるとスループットが6割くらいまで減少する。

비동기 모드는 데이터 전송의 신뢰성을 낮추었다

非同期モードであると、 Producerはデータを一方的に送るだけ。データの保存に失敗したときはProducerは気づけない。

브로커 수

トピック数・パーティションが多いとき、ランダムアクセスになりがちなる。
性能を向上させたいときは、ランダムアクセスになりにくいよう、トピック、クラスタを複数ノードに分けるようにする。

공유수

Consumerの数はBrokerの数と同じかそれ以上となるようにする。
Consumerの数を多くするとその分、直線比例的にスループットが向上するので、
producerが送信するメッセージよりconsumerのスループットの量が多くなるよう設定する。

ConsuumerGroup 수

同一ConsumerGroup内で複数Consumerを立ち上げると、分散処理が出来るようになる。

여러 노드를 뛰어넘는kafka 집단을 구성하는 상황

Aggregate Brokerを用いると、複数のサーバーにあるデータを一つのものとしてみることが出来、ネットワーク効率も上がる。
この時、ファイルサイズが大きくなるのでHDD容量には注意が必要でる。
しかしながら、下図のような構成ではBrokerが持つファイルサイズの合計値と同じだけ、Aggregate Brokerもファイルサイズを持つようになるので、
データが一か所にあるように見せる必要がないときは、この構成は必要ない

음반 수가 늘어날 때


(인용자: https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines)
レコード数が増えてファイルサイズが大きくなったとしても、シーケンシャルwrittenのため、
理論的には初めの100MBを記述するときも、1TB書いた後の100MBを書くのも同じスループットで記述することが出来る。
処理速度はO(1)である。

플래시 동작 타이밍

Brokerが受信したメッセージは一定数および、一定期間毎にファイルに書き出しを行いメモリから削除する。
これはLinuxのI/O管理処理を介して実行している。
この期間を短くするとメッセージの保存の確実性は上がるが、全体的なスループットは下がる傾向にある。
データ量が多い環境では、1日あれば破棄するようにする等の対策が必要。 

기록 크기의 영향


(인용자: https://engineering.linkedin.com/kafka/benchmarking-apache-kafka-2-million-writes-second-three-cheap-machines)
1レコード(1メッセージ)のサイズ数が大きくなるにつれて、受け入れられるレコードの数は下がる(下図)
しかしながら、レコード数×サイズ数では、1レコードのサイズ数が大きいほうが全体的に保持できるログ容量は大きい
これは1レコードを入れる度に、ロックやCPUの動きが大きいことによる。

총결산하다


· 프로듀서의 증감은 토픽의 증감과 비례한다.
·Consuumer Group의 증감은 topic의 증감과 비례한다.
·Consuumer Group 내의 Consuumer 수는 Consuumer 측에서 프로듀서가 보낸 메시지의 양을 초과하여 처리합니다.
· 브로커 수량은 토픽 수량이나 파티션 증감 시 조정됩니다.
※ 주keeper 사용법도 적었지만, 자세히 활용되지 않아 삭제했습니다.

좋은 웹페이지 즐겨찾기