자바 kafka 는 사용자 정의 파 티 션 클래스 와 차단 기 를 어떻게 실현 합 니까?
(1)patition 을 지정 하면 직접 사용 합 니 다.(대응 하 는 자바 api 를 찾 아 볼 수 있 습 니 다.다양한 인자 가 있 습 니 다)
(2)patition 은 지정 되 지 않 았 으 나 key 를 지정 하여 key 의 value 를 hash 로 patition 을 만 듭 니 다.
(3)patition 과 key 가 지정 되 지 않 았 습 니 다.폴 링 을 통 해 patition 을 선택 하 십시오.
그러나 kafka 는 파 티 션 알고리즘 을 사용자 정의 하 는 기능 을 제공 하여 업무 수 동 으로 분 포 를 실현 합 니 다.
1.사용자 정의 파 티 션 클래스 를 실현 하고 사용자 정의 파 티 션 은 파 티 션 을 실현 합 니 다.
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import java.util.Map;
public class CustomPartitioner implements Partitioner {
/**
*
* @param topic topic
* @param key key
* @param keyBytes key
* @param value value
* @param valueBytes value
* @param cluster
* @return
*/
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
// ,
return 3;
}
@Override
public void close() {
}
@Override
public void configure(Map<String, ?> configs) {
}
}
2.producer 설정 파일 지정,구체 적 인 파 티 션 클래스//구체 적 인 구분 류
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "kafka.CustomPartitioner");
기술:ProducerConfig 에서 제공 하 는 설정 을 사용 할 수 있 습 니 다.ProducerConfig
kafka producer 차단기
차단기(interceptor)는 Kafka 0.10 버 전에 도입 됐다.
interceptor 는 사용자 로 하여 금 메 시 지 를 보 내기 전과 producer 리 셋 논리 전에 메시지 에 대해 맞 춤 형 수 요 를 할 수 있 게 한다.예 를 들 어 메 시 지 를 수정 하 는 등 이다.
사용자 가 여러 개의 interceptor 를 지정 하여 같은 메시지 에 순서대로 작용 하여 하나의 차단 체인(interceptor chain)을 형성 할 수 있 도록 합 니 다.
사용 하 는 클래스 는:
org.apache.kafka.clients.producer.ProducerInterceptor
우 리 는 인 코딩 테스트 를 할 수 있다.
1.메시지 차단 기 를 정의 하고 메시지 처 리 를 실현 합 니 다.(시간 스탬프 등,유 니 드 등 이 있 을 수 있 습 니 다.)
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import java.util.Map;
import java.util.UUID;
public class MessageInterceptor implements ProducerInterceptor<String, String> {
@Override
public void configure(Map<String, ?> configs) {
System.out.println(" MessageInterceptor configure ");
}
/**
*
*
* @param record
* @return
*/
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
// record, uuid
System.out.println(" uuid");
return new ProducerRecord(record.topic(), record.partition(), record.timestamp(), record.key(),
UUID.randomUUID().toString().replace("-", "") + "," + record.value());
}
/**
*
* @param metadata
* @param exception
*/
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
System.out.println("MessageInterceptor onAcknowledgement ");
}
@Override
public void close() {
System.out.println("MessageInterceptor close ");
}
}
2.정의 계수 차단기
import java.util.Map;
import org.apache.kafka.clients.producer.ProducerInterceptor;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class CounterInterceptor implements ProducerInterceptor<String, String>{
private int errorCounter = 0;
private int successCounter = 0;
@Override
public void configure(Map<String, ?> configs) {
System.out.println(" CounterInterceptor configure ");
}
@Override
public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
System.out.println("CounterInterceptor ");
return record;
}
@Override
public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
//
System.out.println("CounterInterceptor ");
if (exception == null) {
successCounter++;
} else {
errorCounter++;
}
}
@Override
public void close() {
//
System.out.println("Successful sent: " + successCounter);
System.out.println("Failed sent: " + errorCounter);
}
}
3.producer 클 라 이언 트:
import org.apache.kafka.clients.producer.*;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
public class Producer1 {
public static void main(String[] args) throws Exception {
Properties props = new Properties();
// Kafka
props.put("bootstrap.servers", "localhost:9092");
//
props.put("acks", "all");
//
props.put("retries", 0);
//
props.put("batch.size", 16384);
// ,
props.put("linger.ms", 1);
// ,
props.put("buffer.memory", 33554432);
// key
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// value
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
//
props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "kafka.CustomPartitioner");
//
List<String> interceptors = new ArrayList<>();
interceptors.add("kafka.MessageInterceptor");
interceptors.add("kafka.CounterInterceptor");
props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 1; i++) {
producer.send(new ProducerRecord<String, String>("test_0515", i + "", "xxx-" + i), new Callback() {
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
System.out.println(" producer ");
}
});
}
/*System.out.println(" producer");
producer.close();*/
producer.close();
}
}
요약 하면 우 리 는 차단기 체인 의 각 방법의 집행 순 서 를 알 수 있다.만약 에 A,B 차단기 가 있다 면 하나의 차단기 체인 에서:(1)A 를 실행 하 는 configure 방법,B 를 실행 하 는 configure 방법
(2)A 의 onSend 방법,B 의 onSend 방법 을 집행 한다.
(3)생산자 발송 완료 후 A 의 onAcknowledgement 방법,B 의 onAcknowledgement 방법 을 집행 한다.
(4)프로듀서 자체 의 콜백 리 셋 함 수 를 실행 합 니 다.
(5)A 를 실행 하 는 close 방법,B 의 close 방법.
이상 이 바로 본 고의 모든 내용 입 니 다.여러분 의 학습 에 도움 이 되 고 저 희 를 많이 응원 해 주 셨 으 면 좋 겠 습 니 다.
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
JPA + QueryDSL 계층형 댓글, 대댓글 구현(2)이번엔 전편에 이어서 계층형 댓글, 대댓글을 다시 리팩토링해볼 예정이다. 이전 게시글에서는 계층형 댓글, 대댓글을 구현은 되었지만 N+1 문제가 있었다. 이번에는 그 N+1 문제를 해결해 볼 것이다. 위의 로직은 이...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.