메시지 큐-kafka 소비 이상 문제
카 프 카 에 서 는 어떤 메시지 대기 열 에서 도 소비 순서 에 문제 가 있다.한 대열 의 순서 소 비 를 확보 하기 위해 서,한 메시지 의 소비 가 이상 할 때,반드시 후속 대열 메시지 의 소비 에 영향 을 줄 것 이다.이런 업 무 는 막 히 지 않 겠 는가?예 를 들 어 필 자 는 가장 간단 한 예 를 들 었 다.나 는 1-100 의 메 시 지 를 보 냈 다.나의 처리 논리 에서 msg%5==0 나 는 int i=1/0 작업 을 할 것 이다.이것 은 반드시 이상 을 버 리 고 msg=5 에 계속 막 혔 고 뒤의 6-100 은 소비 할 수 없다.다음은 필자 가 해결 방안 을 제시한다.
일정 횟수 재 시도(메시지 분실)
@KafkaHandler
@KafkaListener(topics = {"quickstart-events"},groupId = "test-consumer-group-2", concurrency = "1")
public void test6(String msg){
businessProcess(msg);
}
private void businessProcess(String msg){
System.out.println(" :" + msg + "--" + System.currentTimeMillis() + "---" + Thread.currentThread().hashCode());
if (Integer.valueOf(msg) % 5 == 0) {
int i = 1 / 0;
}
}
설명:독자 가 자바 클 라 이언 트,즉 spring 을 사용 하여 이 루어 진다 면 어떠한 처리 도 하지 않 은 상태 에서 자동 으로 10 번 재 시도 한 다음 에 메시지 가 직접 처 리 됩 니 다.즉,만약 당신 의 업무 가 소식 을 잃 어 버 리 는 것 을 허락 한다 면,당신 은 별도의 인 코딩 처리 가 필요 하지 않다 는 것 이다사망 메시지 대기 열 에 가입(메 시 지 를 잃 어 버 리 지 않 음)
소비 단 코드:
//1. offset
//2. errorHandler,
//3. offset
@KafkaHandler
@KafkaListener(topics = {"quickstart-events"},groupId = "test-consumer-group-2",
errorHandler ="kafkaListenerErrorHandler", concurrency = "1")
public void test6(String msg,Acknowledgment ack){
try {
businessProcess(msg);
}finally {
//
ack.acknowledge();
}
}
//1. , topicName+.DLT
//2. , offset, bug ,
@KafkaHandler
@KafkaListener(topics = {"quickstart-events.DLT"},groupId = "test-consumer-group-2", concurrency = "1")
public void test7(String msg,Acknowledgment ack){
try {
businessProcess(msg);
ack.acknowledge();
}catch (Exception e){
e.printStackTrace();
}
}
//
private void businessProcess(String msg){
System.out.println(" :" + msg + "--" + System.currentTimeMillis() + "---" + Thread.currentThread().hashCode());
if (Integer.valueOf(msg) % 5 == 0) {
int i = 1 / 0;
}
}
이상 프로세서
//1. KafkaListenerErrorHandler bean
//2. bean , .DLT
@Component("kafkaListenerErrorHandler")
public class KafkaListenerErrorHandlerTest implements KafkaListenerErrorHandler {
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;
private static final String TOPIC_DLT=".DLT";
@Override
public Object handleError(Message<?> message, ListenerExecutionFailedException exception) {
System.out.println(" :"+message.toString());
//
MessageHeaders headers = message.getHeaders();
String topic=headers.get("kafka_receivedTopic")+TOPIC_DLT;
//
kafkaTemplate.send(topic,message.getPayload());
return message;
}
}
효과 그림:설명:상기 한 것 은 기본적으로 사망 메시지 대기 열 을 사용 하 는 방안 이다.독자 들 은 이런 인 코딩 의 복잡 도가 매우 높다 고 생각 할 수 있 지만 사실은 걱정 할 필요 가 없다.사실은 상기 코드 들 은 기본적으로 사망 메시지 대기 열의 템 플 릿 코드 를 사용 하고 성숙 한 회사 에 서 는 상기 코드 를 사용 하여 간단 한 포장 을 한다.여기 서 필 자 는 방향 을 제시 하고 관심 이 있 는 학생 들 이 실현 할 수 있다.우 리 는 사실 op 사상 을 사용 하여@EnableDLT 와 같은 주 해 를 사용자 정의 하여 실현 할 수 있 습 니 다.그러면 위의 이 방안 을 사용 하면 간단 하고 우아 하지 않 습 니까?이전에 필 자 는 개발 과정 에서 아마 존의 메시지 대기 열 서 비 스 를 사용 한 적 이 있 는데 이것 도 단지 이렇게 실 현 된 것 에 불과 하 다.
총결산
이 글 은 여기까지 입 니 다.당신 에 게 도움 을 줄 수 있 기 를 바 랍 니 다.또한 당신 이 우리 의 더 많은 내용 에 관심 을 가 져 주 기 를 바 랍 니 다!
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Spring Cloud를 사용한 기능적 Kafka - 1부지금까지 찾을 수 없었던 Spring Cloud Kafka의 작업 데모를 만들기 위해 이 기사를 정리했습니다. Confluent 스키마 레지스트리 7.1.0 이 기사는 먼저 Spring Cloud Stream을 사용...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.