3. RabbitMQ – 메시지 확인 메커니즘(AMQP 트랜잭션)

4531 단어 rabbitmq
우리는 서버 붕괴 시 서버 메시지 데이터를 잃어버리지 않도록 지속화(교환기, 대기열, 메시지 지속화)를 통해 보장할 수 있다는 것을 알고 있지만, 메시지를 보내는 발송자가 메시지를 보낸 후에 메시지가 브로커 프록시 서버에 정확하게 도착했는지 확인할 수 없습니다. 특별한 설정을 하지 않으면 기본적으로 발표 작업은 생산자에게 어떤 정보도 되돌려 주지 않습니다.즉, 기본적으로 우리 생산자는 메시지가 브로커에 정확하게 도착했는지 모른다. 만약에 메시지가 브로커에 도착하기 전에 잃어버리면 지속화 작업도 이 문제를 해결할 수 없다. 왜냐하면 메시지가 프록시 서버에 도착하지 않았기 때문에 이것은 지속화할 방법이 없다.이 문서에서는 AMQP 프로토콜 차원에서 제공되는 솔루션을 설명합니다.

1. 자바 원생 사무 사용


RabbitMQ의 트랜잭션 메커니즘은 트랜잭션과 관련된 세 가지 방법으로 구성됩니다.
  • txSelect()
  • txCommit()
  • txRollback()

  • txSelect는 현재 채널을transaction 모드로 설정하는 데 사용되며, txCommit는 사무를 제출하는 데 사용되며, txRollback은 사무를 롤백하는 데 사용됩니다.
    우리가 txSelect 제출을 사용하여 업무를 시작한 후에 우리는 Broke 프록시 서버에 메시지를 발표할 수 있습니다. 만약에 txCommit 제출이 성공하면 메시지가 Broke에 도착할 것입니다. 만약에 txCommit가 실행되기 전에 Broker가 이상 붕괴되거나 다른 원인으로 이상을 던지면 우리는 이상을 포착하여 txRollback 방법을 통해 스크롤을 할 수 있습니다.이 트랜잭션의 주요 코드는 다음과 같습니다.
        channel.txSelect();
        channel.basicPublish(exchange, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
        channel.txCommit();
    

    즉, 메시지의 배포 프로세스는 다음과 같습니다.
  • Client에서 Tx.Select를 전송합니다
  • Broker는 Tx. Select-Ok을 보냅니다. 그 다음에 메시지를 보냅니다
  • Client에서 Tx.Commit를 보냅니다
  • Broker가 Tx.Commit-Ok을 보냅니다

  • 2. SpringBoot 사용 트랜잭션 결합


    SpringBoot에서는 주로 봉인된 RabbitTemplate 템플릿을 통해 메시지 발송을 실현하는데 여기서 주로 두 가지 상황으로 나뉘는데 RabbitTemplate를 사용하여 동시 발송하거나 비동기적으로 발송한다.
    주의: 발표 확인과 사무.(둘 다 동시에 사용할 수 없음) 채널이 업무일 때 확인 모드를 도입할 수 없습니다.같은 채널은 확인 모드에서 사무를 사용할 수 없습니다.
    그래서 사무를 사용할 때, 응용 프로그램에서.properties에서 확인 모드를false로 변경해야 합니다.
    #  
    spring.rabbitmq.publisher-confirms=false

    A, 동기화


    RabbitTemplate의 channelTransacted를true로 설정하여 사무 환경을 설정하여 RabbitMQ 사무를 사용할 수 있도록 합니다.다음과 같습니다.
        @Bean
        @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
        public RabbitTemplate rabbitTemplateNew() {
            RabbitTemplate template = new RabbitTemplate(connectionFactory());
            template.setChannelTransacted(true);
            return template;
        }

    이것은 앞에서 설명한 원생 사무와 일치하며, 메시지를 보내는 데 이상이 발생하면 실행 사무가 굴러가는 것에 응답합니다.

    B, 비동기식


    방금 우리가 설명한 것은 동기화된 상황인데, 지금 우리는 비동기적인 형식을 설명한다.비동기에서는 주로 MessageListener 인터페이스를 사용하는데, 이것은 Spring AMQP가 비동기적으로 메시지를 전송하는 감청기 인터페이스이다.한편, MessageListener의 실현 클래스인 Simple MessageListenerContainer는 전체 비동기 메시지 배달의 핵심 클래스로 존재한다.
    다음은 사용자가 용기를 설정할 때 PlatformTransactionManager를 지정하는 비동기적인 방법을 소개합니다.코드는 다음과 같습니다.
        @Bean
        public SimpleMessageListenerContainer messageListenerContainer() {
            SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
            container.setConnectionFactory(connectionFactory());
            container.setTransactionManager(rabbitTransactionManager());
            container.setChannelTransacted(true);
            //  
            container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
            container.setQueues(transitionQueue());
            container.setMessageListener(new TransitionConsumer());
            return container;
        }
    

    이 코드는 config에 추가된 RabbitConfig입니다.java에서 사무 관리자를 설정하여channelTransacted 속성을true로 설정합니다.
    용기에 사무를 설정할 때transactionManager를 제공하면channelTransaction은true이어야 합니다. 만약에 감청기 처리가 실패하고 이상을 던지면 사무는 굴러가고 메시지는 메시지 에이전트에 되돌아옵니다.만약false라면 외부의 업무는 감청 용기에 제공할 수 있으며, 이로 인해 발생하는 영향은 스크롤된 업무 작업에서도 메시지 전송을 제출하는 작업이다.
    RabbitTransactionManager를 사용하면 이 사무 관리자는 PlatformTransactionManager 인터페이스의 구현으로 하나의 Rabbit ConnectionFactory에서만 사용할 수 있습니다.
    참고: 이 정책에서는 메시지와 데이터베이스 간에 공유된 트랜잭션 같은 XA 트랜잭션을 제공할 수 없습니다.
    위의 코드 외에도 RabbitTransactionManager와 TransitionConsumer가 추가되어야 합니다. 코드는 다음과 같습니다.
        /**
         *  transition2 
         * 
         * @return
         */
        @Bean
        public Queue transitionQueue() {
            return new Queue("transition2");
        }
        
        /**
         *  
         * 
         * @return
         */
        @Bean
        public RabbitTransactionManager rabbitTransactionManager() {
            return new RabbitTransactionManager(connectionFactory());
        }
    
        /**
         *  
         */
        public class TransitionConsumer implements ChannelAwareMessageListener {
    
            @Override
            public void onMessage(Message message, Channel channel) throws Exception {
                byte[] body = message.getBody();
                System.out.println("TransitionConsumer: " + new String(body));
                //  
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                //  0, , 
                // int t = 1 / 0;
            }
        }

    좋은 웹페이지 즐겨찾기