RabbitMQ- 두 가지 메커니즘 및 소비재 최적화 작성 방식

10419 단어 MQJava

return 메커니즘:


Return Listener는 라우팅할 수 없는 메시지를 처리하는 데 사용됩니다.
우리의 메시지 생산자는 Exchange와 Routingkey를 지정해서 메시지를 특정한 대기열로 보냅니다.
그리고 우리 소비자들은 대열을 감청하여 메시지 처리 조작을 한다.
그러나 어떤 경우, 만약 우리가 메시지를 보낼 때, 현재 exchange가 존재하지 않거나 지정한 루트 키 루트가 존재하지 않는다면,
이럴 때 우리는 이런 도달할 수 없는 소식을 감청해야 한다.returnlistener를 사용해야 한다.

프로덕션 코드

package com.sy.rabbitmq.return_listener;

import com.rabbitmq.client.*;
import com.sy.rabbitmq.TestProperties;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 *   -return   
 */
public class Producer {
    public static void main(String[] args) throws IOException, TimeoutException {
        //  ConnectionFactory
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(TestProperties.getIp());
        factory.setPort(TestProperties.getPort());
        factory.setVirtualHost("/");

        //2. Connection
        Connection connection = factory.newConnection();

        //3. channel
        Channel channel = connection.createChannel();

        String exchangeName = "test_confirm_exchange";
        String routeKey = "save";

        //5. 
        String msg = "Hello RabbitMQ send confirm message!";
        //6. - 
        channel.addReturnListener(
                new ReturnListener() {
                    @Override
                    public void handleReturn(
                        int replyCode,
                        String replyText,
                        String exchange,
                        String routingKey,
                        AMQP.BasicProperties properties,
                        byte[] body
                    ) throws IOException {
                        System.out.println("-------------return --------------");
                    }
                }
        );
        channel.basicPublish(exchangeName,routeKey,true,null,msg.getBytes());
    }
}

소비자 코드:

package com.sy.rabbitmq.return_listener;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.sy.rabbitmq.TestProperties;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 *   -return   
 */
public class Comsumer {
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //  ConnectionFactory
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost(TestProperties.getIp());
        factory.setPort(TestProperties.getPort());
        factory.setVirtualHost("/");

        //2. Connection
        Connection connection = factory.newConnection();

        //3. channel
        Channel channel = connection.createChannel();

        //4. channel exchange
        String exchangeName = "test_confirm_exchange";
        String routeKey = "confirm.#";
        String queueName = "test_confirm_queue";
        channel.exchangeDeclare(exchangeName,"topic",true);

        //5. channel , key
        channel.queueDeclare(queueName,true,false,false,null);
        channel.queueBind(queueName,exchangeName,routeKey);

        //6. 
        QueueingConsumer consumer = new QueueingConsumer(channel);
        //7. 
        channel.basicConsume(queueName,true,consumer);
        while(true){
            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
            String msg = new String(delivery.getBody());
            System.out.println(msg);
        }
    }
}

confirm 메커니즘:


정상적인 상황에서 메시지가 교환기를 통해 대기열에 들어가면 메시지의 지속화를 완성할 수 있지만, 브로커에 도착하기 전에 의외의 사고가 발생하면 메시지 분실을 초래합니다
  • AMQP가 제공하는 사무 메커니즘을 통해 실현
  • 발송자 확인 모드(confirm 메커니즘)를 사용하여 실현한다

  • Confirm 확인 메시지는 어떻게 수행됩니까?
    첫 번째 단계:channel에서 확인 모드:channel을 엽니다.confirmSelect()
    두 번째 단계: 채널에 감청 추가:addConfirmListener, 감청 성공과 실패의 반환 결과,
    구체적인 결과에 따라 메시지를 다시 보내거나 로그를 기록하는 등 후속 처리를 한다.

    프로덕션 코드

    package com.sy.rabbitmq.confirm_listener;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.ConfirmListener;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.sy.rabbitmq.TestProperties;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     *   -confirm   
     */
    public class Producer {
        public static void main(String[] args) throws IOException, TimeoutException {
            //  ConnectionFactory
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost(TestProperties.getIp());
            factory.setPort(TestProperties.getPort());
            factory.setVirtualHost("/");
    
            //2. Connection
            Connection connection = factory.newConnection();
    
            //3. channel
            Channel channel = connection.createChannel();
    
            //4. : confirmSelect:  
            channel.confirmSelect();
    
            String exchangeName = "test_confirm_exchange";
            String routeKey = "confirm.save";
    
            //5. 
            String msg = "Hello RabbitMQ send confirm message!";
            channel.basicPublish(exchangeName,routeKey,null,msg.getBytes());
    
            //6. 
            channel.addConfirmListener(
                    new ConfirmListener() {
                        @Override
                        public void handleAck(long deliveryTag, boolean multiple) throws IOException {
                            System.out.println("----------ack----------");
                        }
    
                        @Override
                        public void handleNack(long deliveryTag, boolean multiple) throws IOException {
                            System.out.println("----------no-ack----------");
                        }
                    }
            );
        }
    }
    

    소비자 코드:

    package com.sy.rabbitmq.confirm_listener;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.QueueingConsumer;
    import com.sy.rabbitmq.TestProperties;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     *   -confirm   
     */
    public class Comsumer {
        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
            //  ConnectionFactory
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost(TestProperties.getIp());
            factory.setPort(TestProperties.getPort());
            factory.setVirtualHost("/");
    
            //2. Connection
            Connection connection = factory.newConnection();
    
            //3. channel
            Channel channel = connection.createChannel();
    
            //4. channel exchange
            String exchangeName = "test_confirm_exchange";
            String routeKey = "confirm.#";
            String queueName = "test_confirm_queue";
            channel.exchangeDeclare(exchangeName,"topic",true);
    
            //5. channel , key
            channel.queueDeclare(queueName,true,false,false,null);
            channel.queueBind(queueName,exchangeName,routeKey);
    
            //6. 
            QueueingConsumer consumer = new QueueingConsumer(channel);
            //7. 
            channel.basicConsume(queueName,true,consumer);
            while(true){
                QueueingConsumer.Delivery delivery = consumer.nextDelivery();
                String msg = new String(delivery.getBody());
                System.out.println(msg);
            }
        }
    }
    

    새 버전의 소비자단 작성 방법:


    소비단은 순환 소비 대기열의 메시지를 사용하고 코드상 그다지 아름답지 않으면 사용할 수 있다.
    package com.sy.rabbitmq.consumer;
    
    import com.rabbitmq.client.*;
    import com.sy.rabbitmq.TestProperties;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     *    
     */
    public class Comsumer {
        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
            //  ConnectionFactory
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost(TestProperties.getIp());
            factory.setPort(TestProperties.getPort());
            factory.setVirtualHost("/");
    
            //2. Connection
            Connection connection = factory.newConnection();
    
            //3. channel
            Channel channel = connection.createChannel();
    
            //4. channel exchange
            String exchangeName = "test_confirm_exchange";
            String routeKey = "confirm.#";
            String queueName = "test_confirm_queue";
            channel.exchangeDeclare(exchangeName,"topic",true);
    
            //5. channel , key
            channel.queueDeclare(queueName,true,false,false,null);
            channel.queueBind(queueName,exchangeName,routeKey);
    
            //----   ----
            //6. 
    //        QueueingConsumer consumer = new QueueingConsumer(channel);
            //7. 
    //        channel.basicConsume(queueName,true,consumer);
    //        while(true){
    //            QueueingConsumer.Delivery delivery = consumer.nextDelivery();
    //            String msg = new String(delivery.getBody());
    //            System.out.println(msg);
    //        }
    
            //----  , handleDelivery ----
            channel.basicConsume(queueName,true,new MyConsumer(channel));
        }
    
    
        /**
         *  
         */
        public static class MyConsumer extends DefaultConsumer {
            /**
             * Constructs a new instance and records its association to the passed-in channel.
             *
             * @param channel the channel to which this consumer is attached
             */
            public MyConsumer(Channel channel) {
                super(channel);
            }
    
    
            /**
             *  
             * @param consumerTag
             * @param envelope
             * @param properties
             * @param body
             * @throws IOException
             */
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("----------------- -----------------");
                System.out.println("consumerTag"+consumerTag);
                System.out.println("envelope"+envelope);
                System.out.println("AMQP.BasicProperties"+properties);
                System.out.println("body"+new String(body));
            }
        }
    
    }
    

    좋은 웹페이지 즐겨찾기