kafka consumer offset 가져오기
1. 기능
topic 읽기consumer_offsets의 데이터, 해석.
2. 코드
public static void main(String[] args) throws Exception {
Consumer consumer = createKafkaConsumer();
consumer.subscribe(Lists.newArrayList("__consumer_offsets"));
while (true) {
ConsumerRecords records = consumer.poll(100);
Iterator> iterator = records.iterator();
Map map = Maps.newHashMap();
while (iterator.hasNext()) {
ConsumerRecord record = iterator.next();
if (record.key() == null) {
continue;
}
BaseKey baseKey = GroupMetadataManager.readMessageKey(ByteBuffer.wrap(record.key()));
byte[] value = record.value();
if (value == null) {
continue;
}
OffsetAndMetadata offset = GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(value));
if (baseKey instanceof OffsetKey) {
OffsetKey newKey = (OffsetKey) baseKey;
String group = newKey.key().group();
TopicPartition tp = newKey.key().topicPartition();
System.out.println(group + "," + tp.topic() + "," + tp.partition() + "," + offset.offsetMetadata().offset()));
}
}
}
}
static Consumer createKafkaConsumer() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test2");
props.put("enable.auto.commit", "false");
props.put("auto.offset.reset", "latest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
return new KafkaConsumer(props);
}
3.pom
org.apache.kafka
kafka-clients
2.0.0
org.apache.kafka
kafka_2.11
2.0.1
log4j
log4j
org.slf4j
slf4j-log4j12
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
다양한 언어의 JSON
JSON은 Javascript 표기법을 사용하여 데이터 구조를 레이아웃하는 데이터 형식입니다.
그러나 Javascript가 코드에서 이러한 구조를 나타낼 수 있는 유일한 언어는 아닙니다.
저는 일반적으로 '객체'{}...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.
public static void main(String[] args) throws Exception {
Consumer consumer = createKafkaConsumer();
consumer.subscribe(Lists.newArrayList("__consumer_offsets"));
while (true) {
ConsumerRecords records = consumer.poll(100);
Iterator> iterator = records.iterator();
Map map = Maps.newHashMap();
while (iterator.hasNext()) {
ConsumerRecord record = iterator.next();
if (record.key() == null) {
continue;
}
BaseKey baseKey = GroupMetadataManager.readMessageKey(ByteBuffer.wrap(record.key()));
byte[] value = record.value();
if (value == null) {
continue;
}
OffsetAndMetadata offset = GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(value));
if (baseKey instanceof OffsetKey) {
OffsetKey newKey = (OffsetKey) baseKey;
String group = newKey.key().group();
TopicPartition tp = newKey.key().topicPartition();
System.out.println(group + "," + tp.topic() + "," + tp.partition() + "," + offset.offsetMetadata().offset()));
}
}
}
}
static Consumer createKafkaConsumer() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test2");
props.put("enable.auto.commit", "false");
props.put("auto.offset.reset", "latest");
props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
return new KafkaConsumer(props);
}
3.pom
org.apache.kafka
kafka-clients
2.0.0
org.apache.kafka
kafka_2.11
2.0.1
log4j
log4j
org.slf4j
slf4j-log4j12
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
다양한 언어의 JSONJSON은 Javascript 표기법을 사용하여 데이터 구조를 레이아웃하는 데이터 형식입니다. 그러나 Javascript가 코드에서 이러한 구조를 나타낼 수 있는 유일한 언어는 아닙니다. 저는 일반적으로 '객체'{}...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.