Kafka는 thread "main"자바에서 topic 구역의 편이도를 지정합니다.lang.IllegalStateException: No current assignment
3709 단어 kafka 원본
kafka에서, 나는 구역의 편이량을 지정하여 소비를 시작하고 싶다
KafkaConsumer consumer = new KafkaConsumer(props);
consumer.subscribe(Arrays.asList("test"));
consumer.seek(new TopicPartition("test",0),0l);
Exception in thread "main" java.lang.IllegalStateException: No current assignment for partition test-0
at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:268)
at org.apache.kafka.clients.consumer.internals.SubscriptionState.seek(SubscriptionState.java:293)
at org.apache.kafka.clients.consumer.KafkaConsumer.seek(KafkaConsumer.java:1139)
at KafkaConsumerDemo.main(KafkaConsumerDemo.java:25)
구글 한번.
https://stackoverflow.com/questions/41008610/kafkaconsumer-0-10-java-api-error-message-no-current-assignment-for-partition대의는 먼저 한 번 poll을 해야 한다는 것이다.
KafkaConsumer consumer = new KafkaConsumer(props);
consumer.subscribe(Arrays.asList("test"));
ConsumerRecords recordTemp = consumer.poll(0);
System.out.println(recordTemp.isEmpty());
consumer.seek(new TopicPartition("test",0),0l);
이렇게 해서 확실히 문제를 해결했는데, 잘못 보고한 원인을 보면 알 수 있다
/* the list of partitions currently assigned */
// , topic
private final Map assignment;
위의 이 필드에서 발생한 것입니다. 첫 번째 시작만이 이 문제를 초래할 수 있습니다.poll 방법은 데이터를 추출하고 메타데이터를 초기화하는 데 도움을 줍니다.다음은 첫 번째poll, 데이터를 끌어당겼는지, 뭘 했는지 분석해 봅시다.
코드를 읽으면서 사실 처음poll에서도 데이터를 가져왔지만, 편이량이 우리가 원하는 것이 아니기 때문에 버려진 것을 발견하였다
// we are interested in this fetch only if the beginning offset matches the
// current consumed position
// ,
Long position = subscriptions.position(tp);
if (position == null || position != fetchOffset) {
log.debug("Discarding stale fetch response for partition {} since its offset {} does not match " +
"the expected offset {}", tp, fetchOffset, position);
return null;
}
일지도 볼 수 있어요.
[2018-08-24 11:46:24,273] DEBUG Discarding stale fetch response for partition testBug-0 since its offset 61898 does not match the expected offset 10 (org.apache.kafka.clients.consumer.internals.Fetcher)