메시지 큐-kafka 소비 이상 문제

개술
카 프 카 에 서 는 어떤 메시지 대기 열 에서 도 소비 순서 에 문제 가 있다.한 대열 의 순서 소 비 를 확보 하기 위해 서,한 메시지 의 소비 가 이상 할 때,반드시 후속 대열 메시지 의 소비 에 영향 을 줄 것 이다.이런 업 무 는 막 히 지 않 겠 는가?예 를 들 어 필 자 는 가장 간단 한 예 를 들 었 다.나 는 1-100 의 메 시 지 를 보 냈 다.나의 처리 논리 에서 msg%5==0 나 는 int i=1/0 작업 을 할 것 이다.이것 은 반드시 이상 을 버 리 고 msg=5 에 계속 막 혔 고 뒤의 6-100 은 소비 할 수 없다.다음은 필자 가 해결 방안 을 제시한다.
일정 횟수 재 시도(메시지 분실)

@KafkaHandler
    @KafkaListener(topics = {"quickstart-events"},groupId = "test-consumer-group-2", concurrency = "1")
    public void test6(String msg){
              businessProcess(msg);
            }
           private void businessProcess(String msg){
        System.out.println("     :" + msg + "--" + System.currentTimeMillis() + "---" + Thread.currentThread().hashCode());
       if (Integer.valueOf(msg) % 5 == 0) {
            int i = 1 / 0;
        }
    }
설명:독자 가 자바 클 라 이언 트,즉 spring 을 사용 하여 이 루어 진다 면 어떠한 처리 도 하지 않 은 상태 에서 자동 으로 10 번 재 시도 한 다음 에 메시지 가 직접 처 리 됩 니 다.즉,만약 당신 의 업무 가 소식 을 잃 어 버 리 는 것 을 허락 한다 면,당신 은 별도의 인 코딩 처리 가 필요 하지 않다 는 것 이다
사망 메시지 대기 열 에 가입(메 시 지 를 잃 어 버 리 지 않 음)
소비 단 코드:

//1.      offset
//2.  errorHandler,         
//3.                   offset
@KafkaHandler
    @KafkaListener(topics = {"quickstart-events"},groupId = "test-consumer-group-2",
            errorHandler ="kafkaListenerErrorHandler", concurrency = "1")
    public void test6(String msg,Acknowledgment ack){
        try {
            businessProcess(msg);
        }finally {
            //    
            ack.acknowledge();
        }
    }
//1.          ,  topicName+.DLT   
//2.     ,          offset,    bug     ,    
    @KafkaHandler
    @KafkaListener(topics = {"quickstart-events.DLT"},groupId = "test-consumer-group-2", concurrency = "1")
    public void test7(String msg,Acknowledgment ack){
        try {
            businessProcess(msg);
            ack.acknowledge();
        }catch (Exception e){
            e.printStackTrace();
        }
    }
//    
    private void businessProcess(String msg){
        System.out.println("     :" + msg + "--" + System.currentTimeMillis() + "---" + Thread.currentThread().hashCode());
        if (Integer.valueOf(msg) % 5 == 0) {
            int i = 1 / 0;
        }
    }
이상 프로세서

//1.       KafkaListenerErrorHandler   bean
//2. bean            ,      .DLT   
@Component("kafkaListenerErrorHandler")
public class KafkaListenerErrorHandlerTest implements KafkaListenerErrorHandler {
   @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
    private static final String TOPIC_DLT=".DLT";
    @Override
    public Object handleError(Message<?> message, ListenerExecutionFailedException exception) {
        System.out.println("      :"+message.toString());
        //          
        MessageHeaders headers = message.getHeaders();
        String topic=headers.get("kafka_receivedTopic")+TOPIC_DLT;
        //      
        kafkaTemplate.send(topic,message.getPayload());
        return message;
    }
}
효과 그림:
image.png  
설명:상기 한 것 은 기본적으로 사망 메시지 대기 열 을 사용 하 는 방안 이다.독자 들 은 이런 인 코딩 의 복잡 도가 매우 높다 고 생각 할 수 있 지만 사실은 걱정 할 필요 가 없다.사실은 상기 코드 들 은 기본적으로 사망 메시지 대기 열의 템 플 릿 코드 를 사용 하고 성숙 한 회사 에 서 는 상기 코드 를 사용 하여 간단 한 포장 을 한다.여기 서 필 자 는 방향 을 제시 하고 관심 이 있 는 학생 들 이 실현 할 수 있다.우 리 는 사실 op 사상 을 사용 하여@EnableDLT 와 같은 주 해 를 사용자 정의 하여 실현 할 수 있 습 니 다.그러면 위의 이 방안 을 사용 하면 간단 하고 우아 하지 않 습 니까?이전에 필 자 는 개발 과정 에서 아마 존의 메시지 대기 열 서 비 스 를 사용 한 적 이 있 는데 이것 도 단지 이렇게 실 현 된 것 에 불과 하 다.
총결산
이 글 은 여기까지 입 니 다.당신 에 게 도움 을 줄 수 있 기 를 바 랍 니 다.또한 당신 이 우리 의 더 많은 내용 에 관심 을 가 져 주 기 를 바 랍 니 다!

좋은 웹페이지 즐겨찾기