rabbitmq 생산자 정보 확인 메커니즘

20984 단어 rabbitmq

rabbitmq 생산자 정보 확인 메커니즘


문제 설명


생산자가rabbitmq 서버에 메시지를 보낼 때 메시지가 서버에 도착했습니까?생산자가 보낸 메시지가 서버에 신뢰할 수 있도록 하기 위해 rabbitmq는 두 가지 방식을 제공했다.
  • 트랜잭션을 통한 실현
  • 발송자 확인 메커니즘(publisher confirm을 통해 실현
  • 사무 메커니즘


    rabbitmq와 사무 관련 방법:
  • channel.txSelect(): 현재 채널을 사무 모드로 설정
  • channel.txCommit(): 트랜잭션 제출 용도
  • channel.txRollback(): 롤백 트랜잭션에 사용
  • 사무 실현 메커니즘을 통해 메시지가 성공적으로 rabbitmq 서버에 수신되어야만 사무가 제출될 수 있다. 그렇지 않으면 이상을 포착한 후에 스크롤을 하고 메시지를 재발급할 수 있지만 사무는 rabbitmq의 성능에 매우 영향을 미친다.그리고 사무 메커니즘은 막히는 과정으로 서버의 응답을 기다려야만 다음 메시지를 처리할 수 있다
    예:
    public class TransactionSender {
    
    
        private static final String ex_name = "ex_tx";
        private static final String q_name = "q_tx";
        private static final String rt_name = "rt_tx";
    
        public static void main(String[] args) throws IOException, TimeoutException {
    
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            channel.exchangeDeclare(ex_name, BuiltinExchangeType.DIRECT);
            channel.queueDeclare(q_name, false, false, false, null);
            channel.queueBind(q_name, ex_name, rt_name);
    
            channel.txSelect();
            try {
                channel.basicPublish(ex_name, rt_name, null, "hello".getBytes());
                // RuntimeException
                int a = 1/0;
                channel.txCommit();
            } catch (IOException e) {
                e.printStackTrace();
                channel.txRollback();
            }
        }
    }
    
    

    웹 관리자를 통해 알 수 있듯이 메시지가 대응하는 대기열에 보내지지 않았습니다

    발송자 확인 메커니즘


    송신자 확인 메커니즘은 동기화와 비동기로 나뉜다

    원리


    생산자는 채널을 confirm 모드로 설정하고 채널이 confirm 모드에 들어가면 이 채널에 발표된 모든 메시지는 유일한 id(1부터), 메시지가 일치하는 대기열에 배달되면rabbitmq는 확인Basic.AckdeliverTag(메시지 id)을 생산자에게 보냅니다.만약 메시지와 대기열이 지구화된다면 메시지는 지구화 후에 발송될 것이다.rabbitmq는 리셋deliverTag 외에도 multiple 파라미터가 있어 이 번호 이전의 모든 메시지가 처리되었다는 것을 나타낸다.모든 소식은 Ack 또는 Nack 한 번, 피Ack 및 피Nack

    동기화 확인


    동기화 확인:
    public class SyncConfirmSender {
    
        private static final String ex_name = "ex_confirm";
        private static final String q_name = "q_confirm";
        private static final String rt_name = "rt_confirm";
    
    
        public static void main(String[] args) throws IOException, TimeoutException,
        						 InterruptedException {
    
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            channel.exchangeDeclare(ex_name, BuiltinExchangeType.DIRECT);
            channel.queueDeclare(q_name, false, false, false, null);
            channel.queueBind(q_name, ex_name, rt_name);
    
            // 
            channel.confirmSelect();
            channel.basicPublish(ex_name, rt_name, null, "hello".getBytes());
            // 
           if (!channel.waitForConfirms()){
               System.out.println(" ...");
           }else {
               System.out.println(" ...");
           }
    
        }
    }
    

    비동기식 확인


    비동기식 확인:
    public class SyncConfirmSender {
    
        private static final String ex_name = "ex_confirm";
        private static final String q_name = "q_confirm";
        private static final String rt_name = "rt_confirm";
    
    
        public static void main(String[] args) throws IOException, TimeoutException, 
        							InterruptedException {
    
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            channel.exchangeDeclare(ex_name, BuiltinExchangeType.DIRECT);
            channel.queueDeclare(q_name, false, false, false, null);
            channel.queueBind(q_name, ex_name, rt_name);
    
            channel.confirmSelect();
            // 
            channel.addConfirmListener(new ConfirmListener() {
                @Override
                public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println("Ack: tag no: "+ deliveryTag+ " multiple: "+ multiple);
                }
    
                @Override
                public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                    System.out.println("Nack: tag no: "+ deliveryTag+ " multiple: "+ multiple);
    
                }
            });
            channel.basicPublish(ex_name, rt_name, null, "hello".getBytes());
        }
    
    }
    

    좋은 웹페이지 즐겨찾기