Flink 소스 코드 읽 기: Flink Kafka Producer 를 사용 하여 Kafka 의 여러 partition 에 데 이 터 를 고 르 게 분포 하 는 방법

13581 단어 빅 데이터
Flink 가 출력 한 데 이 터 를 여러 파 티 션 에 고 르 게 분포 합 니 다.
FlinkKafka ProducerBase 의 하위 클래스 는 기본 KafkaPartitioner Fixed Partitioner (partition 0 에 만 데 이 터 를 쓸 수 있 음) 를 사용 할 수 있 고 자신 이 정의 한 Partitioner (KafkaPartitioner 계승) 를 사용 할 수 있 습 니 다. 실현 이 복잡 하 다 고 생각 합 니 다.
구조 FlinkKafka ProducerBase 의 하위 클래스 2 가지 상황
    public FlinkKafkaProducer09(String topicId, SerializationSchema serializationSchema, 
Properties producerConfig) {
    this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), 
                producerConfig, new FixedPartitioner());
    }

    public FlinkKafkaProducer09(String topicId, SerializationSchema serializationSchema, 
Properties producerConfig, KafkaPartitioner customPartitioner) {
    this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), 
            producerConfig, customPartitioner);

    }

기본 FixedPartitioner
public class FixedPartitioner<T> extends KafkaPartitioner<T> implements Serializable {
    private static final long serialVersionUID = 1627268846962918126L;

    private int targetPartition = -1;

    @Override
    public void open(int parallelInstanceId, int parallelInstances, int[] partitions) {
        if (parallelInstanceId < 0 || parallelInstances <= 0 || 
                    partitions.length == 0) {
            throw new IllegalArgumentException();
        }

        this.targetPartition = partitions[parallelInstanceId % partitions.length];
    }

    @Override
    public int partition(T next, byte[] serializedKey, byte[] serializedValue, 
            int numPartitions) {
        if (targetPartition >= 0) {
            return targetPartition;
        } else {
            throw new RuntimeException("The partitioner has not been initialized properly");
        }
    }
}

FlinkKafka ProducerBase 의 하위 클래스 를 구성 할 때 null 인 Kafka Partitioner 를 전달 할 수 있 습 니 다. 그러면 Kafka Client 의 기본 Partitioner 를 사용 할 수 있 습 니 다. 기본 적 인 Paritioner 는 데 이 터 를 각 partition 에 골 고루 배분 하 는 것 입 니 다.
protected FlinkKafkaProducerBase createSink(String topic, KeyedSerializationSchema
 deserializationSchema, Properties properties) {
        String classFullName = "";
        if (kafkaVersion.startsWith("0.8")) {
            classFullName = 
"org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08";
        } else if (kafkaVersion.startsWith("0.9")) {
            classFullName = 
"org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09";
        } else if (kafkaVersion.startsWith("0.10")) {
            classFullName = 
"org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer09";
        } else {
            throw new RuntimeException("not support the "+
                        "version kafka = " + kafkaVersion);
        }
        FlinkKafkaProducerBase sink = null;
        try {
            Class clazz = Class.forName(classFullName);
            Constructor constructor = clazz.getConstructor(String.class, 
KeyedSerializationSchema.class, Properties.class, KafkaPartitioner.class);
            sink = (FlinkKafkaProducerBase) constructor.newInstance(topic, 
deserializationSchema, properties,(KafkaPartitioner)null);
        } catch (Throwable e) {
            e.printStackTrace();
        }
        return sink;
    }

Kafka Client 의 기본 파 티 션
public class DefaultPartitioner implements Partitioner {

    private final ConcurrentMap topicCounterMap = 
            new ConcurrentHashMap<>();

    public void configure(Map configs) {}

    public int partition(String topic, Object key, byte[] keyBytes, Object value, 
        byte[] valueBytes, Cluster cluster) {
        List partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (keyBytes == null) {
            int nextValue = nextValue(topic);
            List availablePartitions = 
                cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                // no partitions are available, give a non-available partition
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
            // hash the keyBytes to choose a partition
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

    private int nextValue(String topic) {
        AtomicInteger counter = topicCounterMap.get(topic);
        if (null == counter) {
            counter = new AtomicInteger(ThreadLocalRandom.current().nextInt());
            AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter);
            if (currentCounter != null) {
                counter = currentCounter;
            }
        }
        return counter.getAndIncrement();
    }

    public void close() {}
}

호출 프로 세 스
FlinkKafka ProducerBase 의 invoke 방법 을 호출 할 때 파 티 션 이 비어 있 는 지 여 부 를 판단 하고 비어 있 으 면 파 티 션 속성 이 비어 있 는 ProducerRecord 대상 을 구축 합 니 다. 그렇지 않 으 면 파 티 션 을 사용 하여 파 티 션 구조 ProducerRecord 대상 을 얻 습 니 다.
    public void invoke(IN next) throws Exception {
        // propagate asynchronous errors
        checkErroneous();

        byte[] serializedKey = schema.serializeKey(next);
        byte[] serializedValue = schema.serializeValue(next);
        String targetTopic = schema.getTargetTopic(next);
        if (targetTopic == null) {
            targetTopic = defaultTopicId;
        }

        ProducerRecord<byte[], byte[]> record;
        if (partitioner == null) {
            record = new ProducerRecord<>(targetTopic, serializedKey, 
                            serializedValue);
        } else {
            record = new ProducerRecord<>(targetTopic, 
                        partitioner.partition(next, serializedKey, serializedValue, 
                        partitions.length), serializedKey, serializedValue);
        }
        if (flushOnCheckpoint) {
            synchronized (pendingRecordsLock) {
                pendingRecords++;
            }
        }
        producer.send(record, callback);
    }

Kafka Producer 의 send 방법 을 호출 할 때 방법 에 서 는 partition 방법 을 사용 하여 데 이 터 를 어느 파 티 션 에 넣 을 지 결정 합 니 다. Producer Record 의 partition 속성 이 존재 하고 합 법 적 이면 이 값 을 사용 합 니 다. 그렇지 않 으 면 Kafka Producer 의 partitioner 를 사용 하여 파 티 션 을 진행 합 니 다.
private int partition(ProducerRecord record, byte[] serializedKey , 
        byte[] serializedValue, Cluster cluster) {
        Integer partition = record.partition();
        if (partition != null) {
            List partitions = cluster.partitionsForTopic(record.topic());
            int numPartitions = partitions.size();
            // they have given us a partition, use it
            if (partition < 0 || partition >= numPartitions)
                throw new IllegalArgumentException("Invalid partition given with record: " + 
                                                    partition
                                                   + " is not in the range [0..."
                                                   + numPartitions
                                                   + "].");
            return partition;
        }
        return this.partitioner.partition(record.topic(), record.key(), 
            serializedKey, record.value(), serializedValue,cluster);
    }

KafkaProducer 의 partitioner 는 설정 을 읽 어서 가 져 옵 니 다. 기본 값 은 DefaultPartitioner 입 니 다. properties 에서 put partitioner. class 에서 사용 할 partitioner 를 지정 할 수 있 습 니 다.
this.partitioner = config.getConfiguredInstance(
ProducerConfig.PARTITIONER_CLASS_CONFIG, Partitioner.class);

좋은 웹페이지 즐겨찾기