SpringBoot 통합 RabbitMQ 메시지 큐 의 전체 절차

SpringBoot 통합 RabbitMQ
주로 RabbitMQ 아래 세 가지 메시지 대기 열 을 실현 합 니 다.
  • 간단 한 메시지 큐(데모 direct 모드)
  • RabbitMQ 특성 을 바탕 으로 하 는 지연 메시지 대기 열
  • RabbitMQ 관련 플러그 인 기반 지연 메시지 큐
  • 공공 자원
    1.pom 의존 도입
    
     <dependency>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
    
    2.yml 파일 설정
    전편'RabbitMQ 설치 및 설정'을 기반 으로 한 기본 설정 입 니 다.
    
    spring:
      rabbitmq:
        host: 121.5.168.31
        port: 5672    #      
        virtual-host: /*** #     
        username: *** #    
        password: *** #     
        #          P -> Exchange
        publisher-confirm-type: correlated
        #               Exchange -> Queue
        publisher-returns: true
        #     ACK     Queue -> C
        listener:
          simple:
            acknowledge-mode: manual #     ACK  
            #          
            concurrency: 3
            prefetch: 15
            retry:
              enabled: true
              max-attempts: 5
            max-concurrency: 10
    
    3.공공 콘 스 탄 트 류
    
    /**
     * @author Mr.Horse
     * @version 1.0
     * @description: {description}
     * @date 2021/4/23 15:28
     */
    
    public class Constants {
    
        /**
         *      Queue,Exchange,Key(     )
         */
        public final static String HORSE_SIMPLE_QUEUE = "HORSE_SIMPLE_QUEUE";
        public final static String HORSE_SIMPLE_EXCHANGE = "HORSE_SIMPLE_EXCHANGE";
        public final static String HORSE_SIMPLE_KEY = "HORSE_SIMPLE_KEY";
    
        /**
         *      Queue,Exchange,Key(    )
         */
        public final static String HORSE_ANNOTATION_QUEUE = "HORSE_ANNOTATION_QUEUE";
        public final static String HORSE_ANNOTATION_EXCHANGE = "HORSE_ANNOTATION_EXCHANGE";
        public final static String HORSE_ANNOTATION_KEY = "HORSE_ANNOTATION_KEY";
    
    
        //************************************          **************************
        /**
         *         
         */
        public final static String HORSE_DELAY_EXCHANGE = "HORSE_DELAY_EXCHANGE";
        public final static String HORSE_DELAY_QUEUE = "HORSE_DELAY_QUEUE";
        public final static String HORSE_DELAY_KEY = "HORSE_DELAY_KEY";
    
        /**
         *     
         */
        public final static String HORSE_DEAD_EXCHANGE = "HORSE_DEAD_EXCHANGE";
        public final static String HORSE_DEAD_QUEUE = "HORSE_DEAD_QUEUE";
        public final static String HORSE_DEAD_KEY = "HORSE_DEAD_KEY";
    
        //**************************************          (   )******************************
        /**
         *          
         */
        public final static String HORSE_PLUGIN_EXCHANGE = "HORSE_PLUGIN_EXCHANGE";
        public final static String HORSE_PLUGIN_QUEUE = "HORSE_PLUGIN_QUEUE";
        public final static String HORSE_PLUGIN_KEY = "HORSE_PLUGIN_KEY";
    
    }
    
    간단 한 메시지 큐(direct 모드)
    4.RabbitTemplate 템 플 릿 설정
    주로 메시지 배달 Exchange 성공 리 셋 함수 와 메시지 가 Exchange 에서 메시지 대기 열 로 전달 되 는 데 실패 한 리 셋 함 수 를 정의 합 니 다.
    
    package com.topsun.rabbit;
    
    import com.sun.org.apache.xpath.internal.operations.Bool;
    import com.topsun.constants.Constants;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.*;
    import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    /**
     * @author Mr.Horse
     * @version 1.0
     * @description: {description}
     * @date 2021/4/23 14:17
     */
    @Configuration
    public class RabbitConfig {
    
        private static Logger logger = LoggerFactory.getLogger(RabbitConfig.class);
    
    
        @Autowired
        private CachingConnectionFactory connectionFactory;
    
        /**
         * @return
         */
        @Bean
        public RabbitTemplate rabbitTemplate() {
            RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
            //   setReturnCallback      mandatory=true,  Exchange    Queue       ,        
            rabbitTemplate.setMandatory(Boolean.TRUE);
            //        
            rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
            //       Exchange       
            rabbitTemplate.setConfirmCallback((correlationData, ack, cause) ->
                    logger.info("     Exchange    :    :correlationData={},    :ack={},  :cause={}",
                            correlationData, ack, cause)
            );
            //    Exchange   Queue        
            rabbitTemplate.setReturnsCallback((returnedMessage) -> {
                //               ,     
                //   :                ,         ,              ,           
                if (Constants.HORSE_PLUGIN_EXCHANGE.equals(returnedMessage.getExchange())) {
                    return;
                }
                logger.warn("   Exchange   Queue   :message={},replyCode={},replyText={},exchange={},rountingKey={}",
                        returnedMessage.getMessage(), returnedMessage.getReplyText(), returnedMessage.getReplyText(),
                        returnedMessage.getExchange(), returnedMessage.getRoutingKey());
            });
            return rabbitTemplate;
        }
    
        //*******************************************        *****************************************
        /**
         *     
         *
         * @return
         */
        @Bean
        public Queue horseQueue() {
            return new Queue(Constants.HORSE_SIMPLE_QUEUE, Boolean.TRUE);
        }
    
        /**
         *          
         *
         * @return
         */
        @Bean
        public DirectExchange horseExchange() {
            return new DirectExchange(Constants.HORSE_SIMPLE_EXCHANGE, Boolean.TRUE, Boolean.FALSE);
        }
    
        /**
         *      ,  ,  Key
         *
         * @return
         */
        @Bean
        public Binding horseBinding() {
            return BindingBuilder.bind(horseQueue()).to(horseExchange()).with(Constants.HORSE_SIMPLE_KEY);
        }
    
    }
    
    
    5.메시지 모니터 정의
    @RabbitListenezi 주석 을 기반 으로 사용자 정의 메시지 모니터 를 구현 합 니 다.주로 두 가지 실현 방식 이 있다.
  • 설정 류 에서 Queue,Excehange 와 그들의 직접적인 바 인 딩 을 밝 히 면 여기 서 직접 대기 열 을 지정 하여 메시지 감청 을 합 니 다
  • 4.567917.만약 에 앞에서 아무것도 하지 않 았 다 면 여 기 는 직접 주해 의 방식 으로 바 인 딩 하여 정보 감청 을 실현 할 수 있 습 니 다
    
    package com.topsun.rabbit;
    
    import com.rabbitmq.client.Channel;
    import com.topsun.constants.Constants;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.Exchange;
    import org.springframework.amqp.rabbit.annotation.Queue;
    import org.springframework.amqp.rabbit.annotation.QueueBinding;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.stereotype.Component;
    
    import java.io.IOException;
    
    /**
     * @author Mr.Horse
     * @version 1.0
     * @description: {description}
     * @date 2021/4/23 14:58
     */
    
    @Component
    public class MsgListener {
    
        private static Logger logger = LoggerFactory.getLogger(MsgListener.class);
    
        /**
         *           ,           
         *
         * @param message
         * @param channel
         * @param msg
         */
        @RabbitListenerzi(queues = Constants.HORSE_SIMPLE_QUEUE)
        public void customListener(Message message, Channel channel, String msg) {
            //           (    ACK  )
            long tag = message.getMessageProperties().getDeliveryTag();
            try {
                logger.info(" ==> customListener  " + msg);
                //   ACK  
                channel.basicAck(tag, false);
            } catch (IOException e) {
                logger.error(" ==>       : {}", tag);
            }
        }
    
        /**
         *              
         *
         * @param message
         * @param channel
         * @param msg
         */
        @RabbitListener(bindings = @QueueBinding(
                value = @Queue(value = Constants.HORSE_ANNOTATION_QUEUE, durable = "true"),
                exchange = @Exchange(value = Constants.HORSE_ANNOTATION_EXCHANGE, ignoreDeclarationExceptions = "true"),
                key = {Constants.HORSE_ANNOTATION_KEY}
        ))
        public void annotationListener(Message message, Channel channel, String msg) {
            //           (    ACK  )
            long tag = message.getMessageProperties().getDeliveryTag();
            try {
                logger.info(" ==> annotationListener  " + msg);
                //   ACK  
                channel.basicAck(tag, false);
            } catch (IOException e) {
                logger.error(" ==>       : {}", tag);
            }
        }
    
    }
    
    
    
    6.테스트 인터페이스
    여기에 100 개의 메 시 지 를 보 냅 니 다:
    4.567917.홀수 항목 에서 비 주해 방식 의 메시지 모니터주해 식 메시지 모니터
    
    @GetMapping("/rabbit")
        public void sendMsg() {
            for (int i = 1; i <= 100; i++) {
                String msg = " " + i + "   ";
                logger.info("==>   " + msg);
                if (i % 2 == 1) {
                    rabbitTemplate.convertAndSend(Constants.HORSE_SIMPLE_EXCHANGE, Constants.HORSE_SIMPLE_KEY, msg, new CorrelationData(String.valueOf(i)));
                } else {
                    rabbitTemplate.convertAndSend(Constants.HORSE_ANNOTATION_EXCHANGE, Constants.HORSE_ANNOTATION_KEY, msg, new CorrelationData(String.valueOf(i)));
                }
            }
        }
    
    결과:자체 테스트,매우 성공:smile:smile::smile::smile:
    지연 메시지 큐
    원리:생산 자 는 지연 메 시 지 를 생산 합 니 다.필요 한 지연 시간 에 따라 서로 다른 routingkey 를 이용 하여 메 시 지 를 서로 다른 지연 대기 열 로 이동 합 니 다.각 대기 열 은 서로 다른 TTL 속성 을 설정 하고 같은 사신 교환기 에 연결 합 니 다.메시지 가 만 료 되면 routingkey 에 따라 서로 다른 사신 대기 열 로 이동 합 니 다.소비 자 는 해당 하 는 수신 대기 열 을 감청 해 처리 하면 된다.
    7.귀속 관련 정보 설정
    
    /**
     * @author Mr.Horse
     * @version 1.0
     * @description: {description}
     * @date 2021/4/24 14:22
     */
    
    @Configuration
    public class DelayRabbitConfig {
    
        private static Logger logger = LoggerFactory.getLogger(DelayRabbitConfig.class);
    
        /**
         *          
         *
         * @return
         */
        @Bean
        public DirectExchange delayExchange() {
            return new DirectExchange(Constants.HORSE_DELAY_EXCHANGE, Boolean.TRUE, Boolean.FALSE);
        }
    
        /**
         *          
         *
         * @return
         */
        @Bean
        public DirectExchange deadExchange() {
            return new DirectExchange(Constants.HORSE_DEAD_EXCHANGE, Boolean.TRUE, Boolean.FALSE);
        }
    
        /**
         *          10s(  :ms),                    Key
         *
         * @return
         */
        @Bean
        public Queue delayQueue() {
            Map<String, Object> args = new HashMap<>(3);
            // x-dead-letter-exchange                    
            args.put("x-dead-letter-exchange", Constants.HORSE_DEAD_EXCHANGE);
            // x-dead-letter-routing-key               key
            args.put("x-dead-letter-routing-key", Constants.HORSE_DEAD_KEY);
            // x-message-ttl       TTL(    )
            //          ,          (      )
            // args.put("x-message-ttl", 10000);
            return QueueBuilder.durable(Constants.HORSE_DELAY_QUEUE).withArguments(args).build();
        }
    
        /**
         *       
         *
         * @return
         */
        @Bean
        public Queue deadQueue() {
            return new Queue(Constants.HORSE_DEAD_QUEUE, Boolean.TRUE);
        }
    
    
        /**
         *         
         *
         * @return
         */
        @Bean
        public Binding delayBinding() {
            return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(Constants.HORSE_DELAY_KEY);
        }
    
        /**
         *         
         *
         * @return
         */
        @Bean
        public Binding deadBinding() {
            return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(Constants.HORSE_DEAD_KEY);
        }
    
        //**********************************          (   )************************************
    
        @Bean
        public Queue pluginQueue() {
            return new Queue(Constants.HORSE_PLUGIN_QUEUE);
        }
    
       /**
         *           ,    CustomExchange      
         *     ,    
         * @return
         */
        @Bean
        public CustomExchange customPluginExchange() {
            Map<String, Object> args = new HashMap<>(2);
            args.put("x-delayed-type", "direct");
            return new CustomExchange(Constants.HORSE_PLUGIN_EXCHANGE, "x-delayed-message", Boolean.TRUE, Boolean.FALSE, args);
        }
    
        @Bean
        public Binding pluginBinding() {
            return BindingBuilder.bind(pluginQueue()).to(customPluginExchange()).with(Constants.HORSE_PLUGIN_KEY).noargs();
        }
    
    }
    
    8.지연 모니터 정의
    
    /**
     * @author Mr.Horse
     * @version 1.0
     * @description: {description}
     * @date 2021/4/24 14:51
     */
    @Component
    public class DelayMsgListener {
    
        private static Logger logger = LoggerFactory.getLogger(DelayMsgListener.class);
    
    
        /**
         *       
         *
         * @param message
         * @param channel
         * @param msg
         */
        @RabbitListener(queues = Constants.HORSE_DEAD_QUEUE)
        public void consumeDeadListener(Message message, Channel channel, String msg) {
            long tag = message.getMessageProperties().getDeliveryTag();
            try {
                logger.info(" ==> consumeDeadListener  " + msg);
                //   ACK  
                channel.basicAck(tag, false);
            } catch (IOException e) {
                logger.error(" ==>       : {}", tag);
            }
        }
    
        /**
         *       (   )
         *
         * @param message
         * @param channel
         * @param msg
         */
        @RabbitListener(queues = Constants.HORSE_PLUGIN_QUEUE)
        public void consumePluginListener(Message message, Channel channel, String msg) {
            long tag = message.getMessageProperties().getDeliveryTag();
            try {
                logger.info(" ==> consumePluginListener" + msg);
                //   ACK  
                channel.basicAck(tag, false);
            } catch (IOException e) {
                logger.error(" ==>       : {}", tag);
            }
        }
    
    }
    
    9.테스트 인터페이스
    
       //          
    	@GetMapping("/delay/rabbit")
        public void delayMsg(@RequestParam("expire") Long expire) {
            for (int i = 1; i <= 10; i++) {
                String msg = " " + i + "   ";
                logger.info("==>   " + msg);
                //              
                rabbitTemplate.convertAndSend(Constants.HORSE_DELAY_EXCHANGE, Constants.HORSE_DELAY_KEY, msg,
                        message -> {
                            message.getMessageProperties().setExpiration(String.valueOf(expire));
                            return message;
                        },
                        new CorrelationData(String.valueOf(i)));
            }
        }
    
    	//          
        @GetMapping("/delay/plugin")
        public void delayPluginMsg(@RequestParam("expire") Integer expire) {
            for (int i = 1; i <= 10; i++) {
                String msg = " " + i + "   ";
                logger.info("==>   " + msg);
                //         
                rabbitTemplate.convertAndSend(Constants.HORSE_PLUGIN_EXCHANGE, Constants.HORSE_PLUGIN_KEY, msg, message -> {
                    message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                    message.getMessageProperties().setDelay(expire);
                    return message;
                }, new CorrelationData(String.valueOf(i)));
    
            }
        }
    
    결과:알 잖 아:screamcat::scream_cat::scream_cat:
    RabbitMQ 의 기본 사용 시연 은 여기 서 마 치 겠 습 니 다.
    총결산
    SpringBoot 통합 RabbitMQ 메시지 큐 에 관 한 이 글 은 여기까지 소개 되 었 습 니 다.더 많은 SpringBoot 통합 RabbitMQ 메시지 큐 내용 은 예전 의 글 을 검색 하거나 아래 의 관련 글 을 계속 찾 아 보 세 요.앞으로 많은 응원 부 탁 드 리 겠 습 니 다!

    좋은 웹페이지 즐겨찾기