Kafka의 High Level Consumer 설계
5738 단어 메시지 대기열
High Level Consumer를 사용하는 이유
High Level Consumer는 다중 스레드 환경에 사용할 수 있고 사용할 수 있습니다. 스레드 모델의 스레드 수(group의 consumer 수)는 topic의partition 수와 관련이 있습니다. 다음은 몇 가지 규칙을 열거합니다.
예.
Maven 의존성
org.apache.kafka
kafka_2.10
0.8.2.0
org.apache.kafka
kafka-clients
0.8.2.0
Consumer 스레드
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.message.MessageAndMetadata;
public class ConsumerThread implements Runnable {
private KafkaStream kafkaStream;
//
private int threadNumber;
public ConsumerThread(KafkaStream kafkaStream, int threadNumber) {
this.threadNumber = threadNumber;
this.kafkaStream = kafkaStream;
}
public void run() {
ConsumerIterator it = kafkaStream.iterator();
StringBuffer sb = new StringBuffer();
// Kafka ,
while (it.hasNext()) {
MessageAndMetadata metaData = it.next();
sb.append("Thread: " + threadNumber + " ");
sb.append("Part: " + metaData.partition() + " ");
sb.append("Key: " + metaData.key() + " ");
sb.append("Message: " + metaData.message() + " ");
sb.append("
");
System.out.println(sb.toString());
}
System.out.println("Shutting down Thread: " + threadNumber);
}
}
나머지 절차
import kafka.consumer.ConsumerConfig;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ConsumerGroupExample {
private final ConsumerConnector consumer;
private final String topic;
private ExecutorService executor;
public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) {
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
createConsumerConfig(a_zookeeper, a_groupId));
this.topic = a_topic;
}
public void shutdown() {
if (consumer != null) consumer.shutdown();
if (executor != null) executor.shutdown();
}
public void run(int a_numThreads) {
Map topicCountMap = new HashMap();
topicCountMap.put(topic, new Integer(a_numThreads));
// Map Topic KafkaStream
Map>> consumerMap = consumer.createMessageStreams(topicCountMap);
List> streams = consumerMap.get(topic);
// Java
executor = Executors.newFixedThreadPool(a_numThreads);
// consume messages
int threadNumber = 0;
for (final KafkaStream stream : streams) {
executor.submit(new ConsumerTest(stream, threadNumber));
threadNumber++;
}
}
private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
Properties props = new Properties();
// Zookeeper , Partition Consumer Offerset
props.put("zookeeper.connect", a_zookeeper);
//consumer group ID
props.put("group.id", a_groupId);
//Kafka Zookeeper ( )
props.put("zookeeper.session.timeout.ms", "400");
//ZooKeeper ‘follower’ Master
props.put("zookeeper.sync.time.ms", "200");
//consumer offerset Zookeeper
props.put("auto.commit.interval.ms", "1000");
return new ConsumerConfig(props);
}
public static void main(String[] args) {
String zooKeeper = args[0];
String groupId = args[1];
String topic = args[2];
int threads = Integer.parseInt(args[3]);
ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic);
example.run(threads);
// consumer offerset zookeeper( ), shutdown Consumer ,
// sleep , consumer offset zookeeper
try {
Thread.sleep(10000);
} catch (InterruptedException ie) {
}
example.shutdown();
}
}
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Kafka의 High Level Consumer 설계텍스트:https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example High Level Consumer를 사용하는 이유 일부 응용 장면에서 우리는...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.