kafka consumer offset 가져오기

2997 단어

1. 기능


topic 읽기consumer_offsets의 데이터, 해석.

2. 코드

 
public static void main(String[] args) throws Exception {
  Consumer consumer = createKafkaConsumer();
        consumer.subscribe(Lists.newArrayList("__consumer_offsets"));
        while (true) {
            ConsumerRecords records = consumer.poll(100);
            Iterator> iterator = records.iterator();
            Map map = Maps.newHashMap();
            while (iterator.hasNext()) {
                ConsumerRecord record = iterator.next();
                if (record.key() == null) {
                    continue;
                }
                BaseKey baseKey = GroupMetadataManager.readMessageKey(ByteBuffer.wrap(record.key()));
                byte[] value = record.value();
                if (value == null) {
                    continue;
                }
                OffsetAndMetadata offset = GroupMetadataManager.readOffsetMessageValue(ByteBuffer.wrap(value));
                if (baseKey instanceof OffsetKey) {
                    OffsetKey newKey = (OffsetKey) baseKey;
                    String group = newKey.key().group();
                    TopicPartition tp = newKey.key().topicPartition();
                    System.out.println(group + "," + tp.topic() + "," + tp.partition() + "," + offset.offsetMetadata().offset()));
                }
            }
        }
}
 static Consumer createKafkaConsumer() {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test2");
        props.put("enable.auto.commit", "false");
        props.put("auto.offset.reset", "latest");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        return new KafkaConsumer(props);
    }

3.pom
 
            org.apache.kafka
            kafka-clients
            2.0.0
        
        
            org.apache.kafka
            kafka_2.11
            2.0.1
            
                
                    log4j
                    log4j
                
                
                    org.slf4j
                    slf4j-log4j12
                
            
        

좋은 웹페이지 즐겨찾기