Flink 소스 코드 읽 기: Flink Kafka Producer 를 사용 하여 Kafka 의 여러 partition 에 데 이 터 를 고 르 게 분포 하 는 방법
13581 단어 빅 데이터
FlinkKafka ProducerBase 의 하위 클래스 는 기본 KafkaPartitioner Fixed Partitioner (partition 0 에 만 데 이 터 를 쓸 수 있 음) 를 사용 할 수 있 고 자신 이 정의 한 Partitioner (KafkaPartitioner 계승) 를 사용 할 수 있 습 니 다. 실현 이 복잡 하 다 고 생각 합 니 다.
구조 FlinkKafka ProducerBase 의 하위 클래스 2 가지 상황
public FlinkKafkaProducer09(String topicId, SerializationSchema serializationSchema,
Properties producerConfig) {
this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema),
producerConfig, new FixedPartitioner());
}
public FlinkKafkaProducer09(String topicId, SerializationSchema serializationSchema,
Properties producerConfig, KafkaPartitioner customPartitioner) {
this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema),
producerConfig, customPartitioner);
}
기본 FixedPartitioner
public class FixedPartitioner<T> extends KafkaPartitioner<T> implements Serializable {
private static final long serialVersionUID = 1627268846962918126L;
private int targetPartition = -1;
@Override
public void open(int parallelInstanceId, int parallelInstances, int[] partitions) {
if (parallelInstanceId < 0 || parallelInstances <= 0 ||
partitions.length == 0) {
throw new IllegalArgumentException();
}
this.targetPartition = partitions[parallelInstanceId % partitions.length];
}
@Override
public int partition(T next, byte[] serializedKey, byte[] serializedValue,
int numPartitions) {
if (targetPartition >= 0) {
return targetPartition;
} else {
throw new RuntimeException("The partitioner has not been initialized properly");
}
}
}
FlinkKafka ProducerBase 의 하위 클래스 를 구성 할 때 null 인 Kafka Partitioner 를 전달 할 수 있 습 니 다. 그러면 Kafka Client 의 기본 Partitioner 를 사용 할 수 있 습 니 다. 기본 적 인 Paritioner 는 데 이 터 를 각 partition 에 골 고루 배분 하 는 것 입 니 다.
protected FlinkKafkaProducerBase createSink(String topic, KeyedSerializationSchema
deserializationSchema, Properties properties) {
String classFullName = "";
if (kafkaVersion.startsWith("0.8")) {
classFullName =
"org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08";
} else if (kafkaVersion.startsWith("0.9")) {
classFullName =
"org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09";
} else if (kafkaVersion.startsWith("0.10")) {
classFullName =
"org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09";
} else {
throw new RuntimeException("not support the "+
"version kafka = " + kafkaVersion);
}
FlinkKafkaProducerBase sink = null;
try {
Class clazz = Class.forName(classFullName);
Constructor constructor = clazz.getConstructor(String.class,
KeyedSerializationSchema.class, Properties.class, KafkaPartitioner.class);
sink = (FlinkKafkaProducerBase) constructor.newInstance(topic,
deserializationSchema, properties,(KafkaPartitioner)null);
} catch (Throwable e) {
e.printStackTrace();
}
return sink;
}
Kafka Client 의 기본 파 티 션
public class DefaultPartitioner implements Partitioner {
private final ConcurrentMap topicCounterMap =
new ConcurrentHashMap<>();
public void configure(Map configs) {}
public int partition(String topic, Object key, byte[] keyBytes, Object value,
byte[] valueBytes, Cluster cluster) {
List partitions = cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if (keyBytes == null) {
int nextValue = nextValue(topic);
List availablePartitions =
cluster.availablePartitionsForTopic(topic);
if (availablePartitions.size() > 0) {
int part = Utils.toPositive(nextValue) % availablePartitions.size();
return availablePartitions.get(part).partition();
} else {
// no partitions are available, give a non-available partition
return Utils.toPositive(nextValue) % numPartitions;
}
} else {
// hash the keyBytes to choose a partition
return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
}
}
private int nextValue(String topic) {
AtomicInteger counter = topicCounterMap.get(topic);
if (null == counter) {
counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
if (currentCounter != null) {
counter = currentCounter;
}
}
return counter.getAndIncrement();
}
public void close() {}
}
호출 프로 세 스
FlinkKafka ProducerBase 의 invoke 방법 을 호출 할 때 파 티 션 이 비어 있 는 지 여 부 를 판단 하고 비어 있 으 면 파 티 션 속성 이 비어 있 는 ProducerRecord 대상 을 구축 합 니 다. 그렇지 않 으 면 파 티 션 을 사용 하여 파 티 션 구조 ProducerRecord 대상 을 얻 습 니 다.
public void invoke(IN next) throws Exception {
// propagate asynchronous errors
checkErroneous();
byte[] serializedKey = schema.serializeKey(next);
byte[] serializedValue = schema.serializeValue(next);
String targetTopic = schema.getTargetTopic(next);
if (targetTopic == null) {
targetTopic = defaultTopicId;
}
ProducerRecord<byte[], byte[]> record;
if (partitioner == null) {
record = new ProducerRecord<>(targetTopic, serializedKey,
serializedValue);
} else {
record = new ProducerRecord<>(targetTopic,
partitioner.partition(next, serializedKey, serializedValue,
partitions.length), serializedKey, serializedValue);
}
if (flushOnCheckpoint) {
synchronized (pendingRecordsLock) {
pendingRecords++;
}
}
producer.send(record, callback);
}
Kafka Producer 의 send 방법 을 호출 할 때 방법 에 서 는 partition 방법 을 사용 하여 데 이 터 를 어느 파 티 션 에 넣 을 지 결정 합 니 다. Producer Record 의 partition 속성 이 존재 하고 합 법 적 이면 이 값 을 사용 합 니 다. 그렇지 않 으 면 Kafka Producer 의 partitioner 를 사용 하여 파 티 션 을 진행 합 니 다.
private int partition(ProducerRecord record, byte[] serializedKey ,
byte[] serializedValue, Cluster cluster) {
Integer partition = record.partition();
if (partition != null) {
List partitions = cluster.partitionsForTopic(record.topic());
int numPartitions = partitions.size();
// they have given us a partition, use it
if (partition < 0 || partition >= numPartitions)
throw new IllegalArgumentException("Invalid partition given with record: " +
partition
+ " is not in the range [0..."
+ numPartitions
+ "].");
return partition;
}
return this.partitioner.partition(record.topic(), record.key(),
serializedKey, record.value(), serializedValue,cluster);
}
KafkaProducer 의 partitioner 는 설정 을 읽 어서 가 져 옵 니 다. 기본 값 은 DefaultPartitioner 입 니 다. properties 에서 put partitioner. class 에서 사용 할 partitioner 를 지정 할 수 있 습 니 다.
this.partitioner = config.getConfiguredInstance(
ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
spark 의 2: 원리 소개Google Map/Reduce 를 바탕 으로 이 루어 진 Hadoop 은 개발 자 에 게 map, reduce 원 어 를 제공 하여 병렬 일괄 처리 프로그램 을 매우 간단 하고 아름 답 게 만 들 었 습 니 다.S...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.