Kafka 자바 api-소비자 코드 와 소비 분석,생산자 소비자 프로필 상세 설명
32358 단어 kafka
<dependencies>
<dependency>
<groupId>com.alibaba.jstormgroupId>
<artifactId>jstorm-coreartifactId>
<version>2.1.1version>
<exclusions>
<exclusion>
<groupId>org.slf4jgroupId>
<artifactId>slf4j-nopartifactId>
exclusion>
<exclusion>
<groupId>org.slf4jgroupId>
<artifactId>slf4j-jdk14artifactId>
exclusion>
exclusions>
dependency>
<dependency>
<groupId>org.apache.kafkagroupId>
<artifactId>kafka_2.8.2artifactId>
<version>0.8.1version>
<exclusions>
<exclusion>
<artifactId>jmxtoolsartifactId>
<groupId>com.sun.jdmkgroupId>
exclusion>
<exclusion>
<artifactId>jmxriartifactId>
<groupId>com.sun.jmxgroupId>
exclusion>
<exclusion>
<artifactId>jmsartifactId>
<groupId>javax.jmsgroupId>
exclusion>
<exclusion>
<groupId>org.apache.zookeepergroupId>
<artifactId>zookeeperartifactId>
exclusion>
<exclusion>
<groupId>org.slf4jgroupId>
<artifactId>slf4j-log4j12artifactId>
exclusion>
<exclusion>
<groupId>org.slf4jgroupId>
<artifactId>slf4j-apiartifactId>
exclusion>
exclusions>
dependency>
dependencies>
/**
* Kafka producer
* :
* 1、
* 2、 partition
*
*
* KafkaSpout
*/
public class KafkaProducerSimple {
public static void main(String[] args) {
/**
* 1、 kafka producer
* topic , kafka 。
* bin/kafka-topics.sh --create --zookeeper mini1:2181 --replication-factor 2 --partitions 3 --topic test
*/
String TOPIC = "orderMq";
/**
* 2、
*/
Properties props = new Properties();
/*
* key.serializer.class serializer.class key
*/
props.put("serializer.class", "kafka.serializer.StringEncoder");
/*
* kafka broker , host1:port1,host2:port2
*/
props.put("metadata.broker.list", "mini1:9092,mini2:9092,mini3:9092");
/*
* request.required.acks, , 0,1,-1
* 0, producer broker ack, 0.7 。
* , , server 。
* 1, leader replica ,producer ack。
* , server ,client 。
* leader , leader , 。
* -1, ISR ,producer ack。
* , replica ,
*/
props.put("request.required.acks", "1");
/*
* , , partitioner partitioner.class
* :kafka.producer.DefaultPartitioner
* partition , key hash。
*/
props.put("partitioner.class", "com.scu.kafka.MyLogPartitioner");
// props.put("partitioner.class", "kafka.producer.DefaultPartitioner");
/**
* 3、 ,
*/
Producer producer = new Producer(new ProducerConfig(props));
/**
* 4、 for
*/
for (int messageNo = 1; messageNo < 100000; messageNo++) {
/**
* 5、 producer send
* : partitionKey, MyLogPartitioner
*/
producer.send(new KeyedMessage(TOPIC, messageNo + "", "appid" + UUID.randomUUID() + "itcast"));
}
}
}
public class MyLogPartitioner implements Partitioner {
private static Logger logger = Logger.getLogger(MyLogPartitioner.class);
public MyLogPartitioner(VerifiableProperties props) {
}
/**
*
* @param obj key hash partition
* @param numPartitions partition topic, partition , 2
* @return partition
*/
public int partition(Object obj, int numPartitions) {
return Integer.parseInt(obj.toString())%numPartitions;
}
}
주:orderMq 라 는 topic 는 일찍부터 명령 행 을 통 해 만 들 었 습 니 다.partition 은 3 개 로 지정 되 었 습 니 다.다음은 소비자 코드.
public class KafkaConsumerSimple implements Runnable {
public String title;
public KafkaStream<byte[], byte[]> stream;
public KafkaConsumerSimple(String title, KafkaStream<byte[], byte[]> stream) {
this.title = title;
this.stream = stream;
}
public void run() {
System.out.println(" " + title);
ConsumerIterator<byte[], byte[]> it = stream.iterator();
/**
* stream , ,hasNext()
* `ConsumerConnector#shutdown`, `hasNext` false
* */
while (it.hasNext()) {
MessageAndMetadata<byte[], byte[]> data = it.next();
String topic = data.topic();
int partition = data.partition();
long offset = data.offset();
String msg = new String(data.message());
System.out.println(String.format(
"Consumer: [%s], Topic: [%s], PartitionId: [%d], Offset: [%d], msg: [%s]",
title, topic, partition, offset, msg));
}
System.out.println(String.format("Consumer: [%s] exiting ...", title));
}
public static void main(String[] args) throws Exception{
Properties props = new Properties();
props.put("group.id", "dashujujiagoushi");// ,
props.put("zookeeper.connect", "mini1:2181,mini2:2181,mini3:2181");//zookeeper
props.put("auto.offset.reset", "largest");//
props.put("auto.commit.interval.ms", "1000");
props.put("partition.assignment.strategy", "roundrobin");//
ConsumerConfig config = new ConsumerConfig(props);
String topic1 = "orderMq";
String topic2 = "paymentMq";
// ConsumerConnector ,consumer ,
ConsumerConnector consumerConn = Consumer.createJavaConsumerConnector(config);
// map
Map topicCountMap = new HashMap();
topicCountMap.put(topic1, 3);
//Map> String topic, List
Mapbyte[], byte[]>>> topicStreamsMap = consumerConn.createMessageStreams(topicCountMap);
// `kafkaTest` streams
Listbyte[], byte[]>> streams = topicStreamsMap.get(topic1);
// 4
ExecutorService executor = Executors.newFixedThreadPool(3);
// 20 consumer threads
for (int i = 0; i < streams.size(); i++)
executor.execute(new KafkaConsumerSimple(" " + (i + 1), streams.get(i)));
}
}
테스트:먼저 소비자 프로그램 을 실행 합 니 다.파 티 션 디 렉 터 리 에 있 는 segment 파일 은 이전에 생 성 된 데이터 가 있 지만 인쇄 되 지 않 고 계속 알림 을 줍 니 다.(이미 소비 상태 로 표 시 된 것 은 더 이상 소비 하지 않 습 니 다.기본 적 인 상황 은 이 렇 습 니 다.0 부터 소 비 를 시작 할 수 있 습 니 다)
15:10:38.228 [main-SendThread(mini1:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for sessionid: 0x15fdedc70380022 after 1ms
15:10:40.230 [main-SendThread(mini1:2181)] DEBUG org.apache.zookeeper.ClientCnxn - Got ping response for sessionid: 0x15fdedc70380022 after 4ms
생산 이 필요 하기 때문에 생산자 프로그램 을 실행 하고 콘 솔 은 다음 과 같이 인쇄 합 니 다.
...
Consumer: [ 1], Topic: [orderMq], PartitionId: [1], Offset: [17857], msg: [appidc977abb2-f0bc-41da-9daa-6b080321947bitcast]
Consumer: [ 2], Topic: [orderMq], PartitionId: [0], Offset: [17724], msg: [appid9101368e-ac81-4bbf-b2b5-8f2facd41f54itcast]
Consumer: [ 1], Topic: [orderMq], PartitionId: [1], Offset: [17858], msg: [appidb145da08-bb61-42e7-b140-9fed576c2faeitcast]
Consumer: [ 1], Topic: [orderMq], PartitionId: [1], Offset: [17859], msg: [appid909a90ae-c0fb-42ac-97de-6d7438895e07itcast]
Consumer: [ 3], Topic: [orderMq], PartitionId: [2], Offset: [17713], msg: [appid157754b5-6958-4286-9c25-ff67ccc61a42itcast]
Consumer: [ 3], Topic: [orderMq], PartitionId: [2], Offset: [17714], msg: [appidb93b9355-4713-4e22-823a-756b4fe75bdfitcast]
Consumer: [ 3], Topic: [orderMq], PartitionId: [2], Offset: [17715], msg: [appidf82ca658-528a-4f40-a023-8a155c15eaa1itcast]
...
요약 하면 다음 과 같다.
Consumer: [ 1], Topic: [orderMq], PartitionId: [1], Offset: [17857], msg: [appidc977abb2-f0bc-41da-9daa-6b080321947bitcast]
Consumer: [ 2], Topic: [orderMq], PartitionId: [0], Offset: [17724], msg: [appid9101368e-ac81-4bbf-b2b5-8f2facd41f54itcast]
Consumer: [ 3], Topic: [orderMq], PartitionId: [2], Offset: [17713], msg: [appid157754b5-6958-4286-9c25-ff67ccc61a42itcast]
세 소비자 가 소비 에 대응 하 는 파 티 션 을 볼 수 있다.그러면 다음 문 제 를 고려 하여 orderMq 를 만 들 때 partition 을 3 개 로 지정 합 니 다.이때 제 가 KafkaStream 5 개 를 만 들 기로 지정 하면 어떻게 소비 할 까요?소비자 코드 는 다음 과 같이 두 번 수정 합 니 다.
topicCountMap.put(topic1, 5);
ExecutorService executor = Executors.newFixedThreadPool(5);
다시 한 번 위 와 같이 실행 하면 수출 결 과 는 3 명의 소비자 만 볼 수 있 기 때문에 KafkaStream 을 partition 보다 많이 지정 하 는 것 은 소 용이 없고 해당 수량의 소비자 만 해당 하 는 partition 의 데 이 터 를 소비 할 수 있다.
Consumer: [ 2], Topic: [orderMq], PartitionId: [2], Offset: [26420], msg: [appid4b778b51-33c7-42de-83c2-5b85f8f2428aitcast]
Consumer: [ 3], Topic: [orderMq], PartitionId: [0], Offset: [26423], msg: [appid86045c25-7b3f-4c82-ad2a-3e8e11958b28itcast]
Consumer: [ 4], Topic: [orderMq], PartitionId: [1], Offset: [26562], msg: [appid213b5a91-a7bf-4a39-b585-456d95748566itcast]
지 정 된 KafkaStream 이 2 밖 에 없다 면?테스트 를 하지 않 은 결과 한 소비 자 는 파 티 션 2 개 를 소비 하고 다른 소비 자 는 파 티 션 1 개 중 데 이 터 를 소비 하 는 것 으로 나 타 났 다.
생산자,소비자 프로필 설명 은 자바 api 로 생산자 코드 를 쓰 든 소비자 코드 를 쓰 든 프로필 을 사용 합 니 다.다음은 생산자 와 소비자 프로필 소개 입 니 다.
생산자 프로필 설명
# kafka , metadata,
metadata.broker.list=kafka01:9092,kafka02:9092
# 。 kafka.producer.DefaultPartitioner, key
#partitioner.class=kafka.producer.DefaultPartitioner
# , 0 ,1 gzip ,2 snappy 。 , 。
compression.codec=none
#
serializer.class=kafka.serializer.DefaultEncoder
# , topic , empty, 。
#compressed.topics=
# , 0,1,-1
# 0: producer broker ack
# 1: leader ack
# -1: follower ack.
request.required.acks=0
# producer ack ,broker , ,broker producer error ACK. ( follower )
request.timeout.ms=10000
# , “sync” ,"async" 。 ,
buffer , ,
producer.type=sync
# async , message , broker, 5000ms
# batch.num.messages .
queue.buffering.max.ms = 5000
# async ,producer buffer
# ,producer broker, producer
# , , producer , 10000
queue.buffering.max.messages=20000
# , , 200
batch.num.messages=500
# producer "queue.buffering.max.meesages"
# , enqueue(producer )
# producer , timeout " "
# -1: ,
# 0: ,
queue.enqueue.timeout.ms=-1
# producer error ACK, ACK ,
# broker , ( ACK )
# broker , 3.
message.send.max.retries=3
# producer topic metada ,producer partition leader , topic
# producer metadata, producer ,
# ( topic ,partition ,leader ), , 600000
topic.metadata.refresh.interval.ms=60000
소비자 프로필 설명
# zookeeper
zookeeper.connect=zk01:2181,zk02:2181,zk03:2181
# zookeeper session , 5000ms,
zookeeper.session.timeout.ms=5000
# ,
zookeeper.connection.timeout.ms=10000
# offset zookeeper 。 offset time 。 zookeeper ,
zookeeper.sync.time.ms=2000
#
group.id=xxx
# consumer , zookeeper offset
# offset zk , ( ), , true
auto.commit.enable=true
# 。 60 * 1000
auto.commit.interval.ms=1000
# consumer , , , ,
conusmer.id=xxx
# , ,
client.id=xxxx
# ( 10)
queued.max.message.chunks=50
# consumer group , reblance, partitions consumer , consumer partition , zk "Partition Owner registry" , consumer , , .
rebalance.max.retries=5
# ,broker consumer chunk feth , , , consumer
fetch.min.bytes=6553600
# ,server , , consumer
fetch.wait.max.ms=5000
socket.receive.buffer.bytes=655360
# zookeeper offset offset 。 offset。 smallest、largest、anything , offset、 offset、 。 largest
auto.offset.reset=smallest
#
derializer.class=kafka.serializer.DefaultDecoder
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Spring Cloud를 사용한 기능적 Kafka - 1부지금까지 찾을 수 없었던 Spring Cloud Kafka의 작업 데모를 만들기 위해 이 기사를 정리했습니다. Confluent 스키마 레지스트리 7.1.0 이 기사는 먼저 Spring Cloud Stream을 사용...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.