Apache Kafka 0.10.0.0 안정 판 발표 및 새로운 기능 소개
5908 단어 kafka
Apache Kafka 0.10.0.0 안정 판 발표 및 새로운 기능 소개
Kafka 0.9.0.0 에 서 는 개발 자 들 이 새 consumer 에서 poll () 함 수 를 사용 할 때 되 돌아 오 는 메시지 의 줄 수 를 거의 제어 할 수 없습니다.그러나 기 쁜 것 은 이 버 전의 Kafka 가 max. poll. records 인 자 를 도입 하여 개발 자 들 이 메 시 지 를 되 돌려 주 는 항목 수 를 제어 할 수 있 도록 해 주 었 습 니 다.
New Consumer API
org.apache.kafka
kafka-clients
0.10.0.0
kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
poll
ConsumerRecords poll(long timeout)
See Also:
KafkaConsumer.poll(long)
poll
public ConsumerRecords poll(long timeout)
Fetch data for the topics or partitions specified using one of the subscribe/assign APIs. It is an error to not have subscribed to any topics or partitions before polling for data.
On each poll, consumer will try to use the last consumed offset as the starting offset and fetch sequentially. The last consumed offset can be manually set through seek(TopicPartition, long) or automatically set as the last committed offset for the subscribed list of partitions
Specified by:
poll in interface Consumer
Parameters:
timeout - The time, in milliseconds, spent waiting in poll if data is not available in the buffer. If 0, returns immediately with any records that are available currently in the buffer, else returns empty. Must not be negative.
Returns:
map of topic to records since the last fetch for the subscribed list of topics and partitions
Throws:
InvalidOffsetException - if the offset for a partition or set of partitions is undefined or out of range and no offset reset policy has been configured
WakeupException - if wakeup() is called before or while this function is called
AuthorizationException - if caller does Read access to any of the subscribed topics or to the configured groupId
KafkaException - for any other unrecoverable errors (e.g. invalid groupId or session timeout, errors deserializing key/value pairs, or any new error cases in future versions)
See Also:
poll(long)
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
final int minBatchSize = 200;
List> buffer = new ArrayList<>();
while (true) {
ConsumerRecords records = consumer.poll(100);
for (ConsumerRecord record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
insertIntoDb(buffer);
consumer.commitSync();
buffer.clear();
}
}
SimpleConsumer demo:
https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
Reading the Data
// When calling FetchRequestBuilder, it's important NOT to call .replicaId(), which is meant for internal use only.
// Setting the replicaId incorrectly will cause the brokers to behave incorrectly.
FetchRequest req = new FetchRequestBuilder()
.clientId(clientName)
.addFetch(a_topic, a_partition, readOffset, 100000)
.build();
FetchResponse fetchResponse = consumer.fetch(req);
if (fetchResponse.hasError()) {
// See code in previous section
}
numErrors = 0;
long numRead = 0;
for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {
long currentOffset = messageAndOffset.offset();
if (currentOffset < readOffset) {
System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset);
continue;
}
readOffset = messageAndOffset.nextOffset();
ByteBuffer payload = messageAndOffset.message().payload();
byte[] bytes = new byte[payload.limit()];
payload.get(bytes);
System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));
numRead++;
a_maxReads--;
}
if (numRead == 0) {
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
}
}
Note that the ‘readOffset’ asks the last read message what the next Offset would be. This way when the block of messages is processed we know where to ask Kafka where to start the next fetch.
Also note that we are explicitly checking that the offset being read is not less than the offset that we requested. This is needed since if Kafka is compressing the messages, the fetch request will return an entire compressed block even if the requested offset isn't the beginning of the compressed block. Thus a message we saw previously may be returned again. Note also that we ask for a fetchSize of 100000 bytes. If the Kafka producers are writing large batches, this might not be enough, and might return an empty message set. In this case, the fetchSize should be increased until a non-empty set is returned.
Finally, we keep track of the # of messages read. If we didn't read anything on the last request we go to sleep for a second so we aren't hammering Kafka when there is no data.
http://zqhxuyuan.github.io/2016/02/20/Kafka-Consumer-New/
http://blog.csdn.net/xianzhen376/article/details/51167742
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Spring Cloud를 사용한 기능적 Kafka - 1부지금까지 찾을 수 없었던 Spring Cloud Kafka의 작업 데모를 만들기 위해 이 기사를 정리했습니다. Confluent 스키마 레지스트리 7.1.0 이 기사는 먼저 Spring Cloud Stream을 사용...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.