KAFKA 소비자 코드 예

1346 단어
kafka 소비자 코드:
public static void main(String[] args) {
    String recordStrFormat = "offset = %d, key = %s, value = %s
"; Properties props = new Properties(); props.put("bootstrap.servers", "spidercdh-01:9092"); props.put("group.id", "default"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", 1000); props.put("session.timeout.ms", 30000); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer consumer = new KafkaConsumer(props); //test test2 topic consumer.subscribe(Arrays.asList("test","test2")); try { while (true) { ConsumerRecords records = consumer.poll(100); for (ConsumerRecord record : records) { System.out.println(String.format(recordStrFormat, record.offset(), record.key(), record.value())); } } } finally { if (null != consumer) consumer.close(); } }}

좋은 웹페이지 즐겨찾기