자바 kafka 는 사용자 정의 파 티 션 클래스 와 차단 기 를 어떻게 실현 합 니까?

생산자 가 대응 하 는 구역 으로 보 내 는 방법 은 다음 과 같은 몇 가지 가 있다.
(1)patition 을 지정 하면 직접 사용 합 니 다.(대응 하 는 자바 api 를 찾 아 볼 수 있 습 니 다.다양한 인자 가 있 습 니 다)
(2)patition 은 지정 되 지 않 았 으 나 key 를 지정 하여 key 의 value 를 hash 로 patition 을 만 듭 니 다.
(3)patition 과 key 가 지정 되 지 않 았 습 니 다.폴 링 을 통 해 patition 을 선택 하 십시오.
그러나 kafka 는 파 티 션 알고리즘 을 사용자 정의 하 는 기능 을 제공 하여 업무 수 동 으로 분 포 를 실현 합 니 다.
1.사용자 정의 파 티 션 클래스 를 실현 하고 사용자 정의 파 티 션 은 파 티 션 을 실현 합 니 다.

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

public class CustomPartitioner implements Partitioner {

  /**
   *
   * @param topic       topic
   * @param key     key 
   * @param keyBytes    key     
   * @param value    value 
   * @param valueBytes    value     
   * @param cluster
   * @return
   */
  @Override
  public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
    //            ,              
    return 3;
  }

  @Override
  public void close() {

  }
  @Override
  public void configure(Map<String, ?> configs) {

  }

}
2.producer 설정 파일 지정,구체 적 인 파 티 션 클래스
//구체 적 인 구분 류
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "kafka.CustomPartitioner");
기술:ProducerConfig 에서 제공 하 는 설정 을 사용 할 수 있 습 니 다.ProducerConfig
kafka producer 차단기
차단기(interceptor)는 Kafka 0.10 버 전에 도입 됐다.
interceptor 는 사용자 로 하여 금 메 시 지 를 보 내기 전과 producer 리 셋 논리 전에 메시지 에 대해 맞 춤 형 수 요 를 할 수 있 게 한다.예 를 들 어 메 시 지 를 수정 하 는 등 이다.
사용자 가 여러 개의 interceptor 를 지정 하여 같은 메시지 에 순서대로 작용 하여 하나의 차단 체인(interceptor chain)을 형성 할 수 있 도록 합 니 다.
사용 하 는 클래스 는:
org.apache.kafka.clients.producer.ProducerInterceptor
우 리 는 인 코딩 테스트 를 할 수 있다.
1.메시지 차단 기 를 정의 하고 메시지 처 리 를 실현 합 니 다.(시간 스탬프 등,유 니 드 등 이 있 을 수 있 습 니 다.)

import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Map;
import java.util.UUID;

public class MessageInterceptor implements ProducerInterceptor<String, String> {

  @Override
  public void configure(Map<String, ?> configs) {
    System.out.println("  MessageInterceptor configure  ");
  }

  /**
   *              
   *
   * @param record
   * @return
   */
  @Override
  public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
    //       record, uuid        
    System.out.println("     uuid");
    return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(),
        UUID.randomUUID().toString().replace("-", "") + "," + record.value());
  }

  /**
   *                 
   * @param metadata
   * @param exception
   */
  @Override
  public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    System.out.println("MessageInterceptor    onAcknowledgement  ");
  }

  @Override
  public void close() {
    System.out.println("MessageInterceptor close   ");
  }
}
2.정의 계수 차단기

import java.util.Map;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class CounterInterceptor implements ProducerInterceptor<String, String>{
  private int errorCounter = 0;
  private int successCounter = 0;

  @Override
  public void configure(Map<String, ?> configs) {
    System.out.println("  CounterInterceptor configure  ");
  }

  @Override
  public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
    System.out.println("CounterInterceptor              ");
    return record;
  }

  @Override
  public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
    //           
    System.out.println("CounterInterceptor              ");
    if (exception == null) {
      successCounter++;
    } else {
      errorCounter++;
    }
  }

  @Override
  public void close() {
    //     
    System.out.println("Successful sent: " + successCounter);
    System.out.println("Failed sent: " + errorCounter);
  }
}
3.producer 클 라 이언 트:

import org.apache.kafka.clients.producer.*;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

public class Producer1 {
  public static void main(String[] args) throws Exception {
    Properties props = new Properties();
    // Kafka           
    props.put("bootstrap.servers", "localhost:9092");
    //            
    props.put("acks", "all");
    //           
    props.put("retries", 0);
    //         
    props.put("batch.size", 16384);
    //     ,         
    props.put("linger.ms", 1);
    //          ,             
    props.put("buffer.memory", 33554432);
    // key   
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    // value   
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    //       
    props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "kafka.CustomPartitioner");
    //     
    List<String> interceptors = new ArrayList<>();
    interceptors.add("kafka.MessageInterceptor");
    interceptors.add("kafka.CounterInterceptor");
    props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);

    Producer<String, String> producer = new KafkaProducer<>(props);
    for (int i = 0; i < 1; i++) {
      producer.send(new ProducerRecord<String, String>("test_0515", i + "", "xxx-" + i), new Callback() {
        public void onCompletion(RecordMetadata recordMetadata, Exception e) {
          System.out.println("  producer    ");
        }
      });
    }
    /*System.out.println("      producer");
    producer.close();*/
    producer.close();
  }
}
요약 하면 우 리 는 차단기 체인 의 각 방법의 집행 순 서 를 알 수 있다.만약 에 A,B 차단기 가 있다 면 하나의 차단기 체인 에서:
(1)A 를 실행 하 는 configure 방법,B 를 실행 하 는 configure 방법
(2)A 의 onSend 방법,B 의 onSend 방법 을 집행 한다.
(3)생산자 발송 완료 후 A 의 onAcknowledgement 방법,B 의 onAcknowledgement 방법 을 집행 한다.
(4)프로듀서 자체 의 콜백 리 셋 함 수 를 실행 합 니 다.
(5)A 를 실행 하 는 close 방법,B 의 close 방법.
이상 이 바로 본 고의 모든 내용 입 니 다.여러분 의 학습 에 도움 이 되 고 저 희 를 많이 응원 해 주 셨 으 면 좋 겠 습 니 다.

좋은 웹페이지 즐겨찾기