RocketMq(3) - 트랜잭션이 포함된 메시지 보내기

1748 단어 MQ

트랜잭션 지원 메시지


RocketMq는 트랜잭션이 있는 메시지를 전송하여 데이터의 최종 정합성을 유지할 수 있도록 지원합니다.다음은 제가 간단하게 사무가 있는 소식을 실현하겠습니다.

생산자단 코드

  • 트랜잭션 검사를 작성해야 하는 Listener, 하나의 클래스를 사용자 정의하여 TransactionCheckListener(rocketMq 3.0.8 버전 이전)
    public LocalTransactionState checkLocalTransactionState(MessageExt messageExt) {
    //    messageExt   body    ,        
    return LocalTransactionState.COMMIT_MESSAGE;
    }
    
  • 구현
  • 그리고 로컬 작업의 업무를 쓰려면 TransactionExuctor의 인터페이스,public Localtransaction State execute Local Transaction Branch(Message message, Object o) {System.out.println('로컬 업무');//여기에서 수거 라이브러리 작업,return Local Transaction State.COMEMIT MESSAGE;
  • 생산자의 코드인 TransactionCheckListener transactionCheckListener = new MyTranscationCheckListener();TransactionMQProducer producer=new TransactionMQProducer(“transaction”); producer.setNamesrvAddr(“47.106.132.60:9876”); producer.setTransactionCheckListener(transactionCheckListener); producer.start(); MyLocalTranscationExuctor transcationExuctor=new MyLocalTranscationExuctor(); 메시지 메시지 = new Message("Topic Trans", "MyTags", "트랜잭션 메시지".getBytes();producer.sendMessageInTransaction(message,transcationExuctor,null); producer.shutdown();

  • RocketMq3.버전 0.8 이후 메시지 확인


    우리는 스스로 메시지를 해결하는 check listener 리셋 메커니즘을 필요로 한다. 우리는 생산자가 로컬 업무를 수행할 때 현재 메시지를 다른 표에 저장할 수 있다. 가장 좋은 것은 모든 메시지가 하나의 Id를 필요로 하고, 이 쪽에서 다른 작업이나 라인을 만들어서 메시지의 실패를 확인하는 메시지를 소비자에게 보내는 것이다.그리고 소비자는 성공적으로 해결된 메시지 목록을 저장하기 위해 표를 작성한 다음에 일정 시간 동안 성공적으로 소비된 정보를 꺼내고 소비자는 표의 메시지 상태 코드를 수정해야 한다.이렇게 하면 확인 소식이 실패한 원인을 해결할 수 있다.사실 이것도 사무 정보를 지원하지 않는 메시지 대기열에서 분포식 사무의 최종 일치성을 실현하는 데 사용할 수 있다.

    좋은 웹페이지 즐겨찾기