Apache Kafka 0.10.0.0 안정 판 발표 및 새로운 기능 소개

5908 단어 kafka
http://www.iteblog.com/archives/1677
Apache Kafka 0.10.0.0 안정 판 발표 및 새로운 기능 소개
Kafka 0.9.0.0 에 서 는 개발 자 들 이 새 consumer 에서 poll () 함 수 를 사용 할 때 되 돌아 오 는 메시지 의 줄 수 를 거의 제어 할 수 없습니다.그러나 기 쁜 것 은 이 버 전의 Kafka 가 max. poll. records 인 자 를 도입 하여 개발 자 들 이 메 시 지 를 되 돌려 주 는 항목 수 를 제어 할 수 있 도록 해 주 었 습 니 다.
New Consumer API
org.apache.kafka
kafka-clients
0.10.0.0
kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html
poll
ConsumerRecords poll(long timeout)
See Also:
KafkaConsumer.poll(long)
poll
public ConsumerRecords poll(long timeout)
Fetch data for the topics or partitions specified using one of the subscribe/assign APIs. It is an error to not have subscribed to any topics or partitions before polling for data.
On each poll, consumer will try to use the last consumed offset as the starting offset and fetch sequentially. The last consumed offset can be manually set through seek(TopicPartition, long) or automatically set as the last committed offset for the subscribed list of partitions
Specified by:
poll in interface Consumer
Parameters:
timeout - The time, in milliseconds, spent waiting in poll if data is not available in the buffer. If 0, returns immediately with any records that are available currently in the buffer, else returns empty. Must not be negative.
Returns:
map of topic to records since the last fetch for the subscribed list of topics and partitions
Throws:
InvalidOffsetException - if the offset for a partition or set of partitions is undefined or out of range and no offset reset policy has been configured
WakeupException - if wakeup() is called before or while this function is called
AuthorizationException - if caller does Read access to any of the subscribed topics or to the configured groupId
KafkaException - for any other unrecoverable errors (e.g. invalid groupId or session timeout, errors deserializing key/value pairs, or any new error cases in future versions)
See Also:
poll(long)
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
final int minBatchSize = 200;
List> buffer = new ArrayList<>();
while (true) {
ConsumerRecords records = consumer.poll(100);
for (ConsumerRecord record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
insertIntoDb(buffer);
consumer.commitSync();
buffer.clear();
}
}
SimpleConsumer demo:
https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
Reading the Data

// When calling FetchRequestBuilder, it's important NOT to call .replicaId(), which is meant for internal use only.
// Setting the replicaId incorrectly will cause the brokers to behave incorrectly.
FetchRequest req = new FetchRequestBuilder()
.clientId(clientName)
.addFetch(a_topic, a_partition, readOffset, 100000)
.build();
FetchResponse fetchResponse = consumer.fetch(req);

if (fetchResponse.hasError()) {
// See code in previous section
}
numErrors = 0;

long numRead = 0;
for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(a_topic, a_partition)) {
long currentOffset = messageAndOffset.offset();
if (currentOffset < readOffset) {
System.out.println("Found an old offset: " + currentOffset + " Expecting: " + readOffset);
continue;
}
readOffset = messageAndOffset.nextOffset();
ByteBuffer payload = messageAndOffset.message().payload();

byte[] bytes = new byte[payload.limit()];
payload.get(bytes);
System.out.println(String.valueOf(messageAndOffset.offset()) + ": " + new String(bytes, "UTF-8"));
numRead++;
a_maxReads--;
}

if (numRead == 0) {
try {
Thread.sleep(1000);
} catch (InterruptedException ie) {
}
}

Note that the ‘readOffset’ asks the last read message what the next Offset would be. This way when the block of messages is processed we know where to ask Kafka where to start the next fetch.
Also note that we are explicitly checking that the offset being read is not less than the offset that we requested. This is needed since if Kafka is compressing the messages, the fetch request will return an entire compressed block even if the requested offset isn't the beginning of the compressed block. Thus a message we saw previously may be returned again. Note also that we ask for a fetchSize of 100000 bytes. If the Kafka producers are writing large batches, this might not be enough, and might return an empty message set. In this case, the fetchSize should be increased until a non-empty set is returned.
Finally, we keep track of the # of messages read. If we didn't read anything on the last request we go to sleep for a second so we aren't hammering Kafka when there is no data.
http://zqhxuyuan.github.io/2016/02/20/Kafka-Consumer-New/
http://blog.csdn.net/xianzhen376/article/details/51167742

좋은 웹페이지 즐겨찾기