SpringBoot+RabbitMq 가 구체 적 으로 사용 하 는 자세

현재 주류 의 정보 중간 부품 은 activemq,rabbitmq,rocketmq,kafka 가 있 습 니 다.우 리 는 실제 업무 장면 에 따라 적당 한 정보 중간 부품 을 선택해 야 합 니 다.주목 하 는 주요 지 표 는 정보 배달 의 신뢰성,유지 가능성,스루풋 과 중간 부품 의 특색 등 중요 한 기준 으로 선택 해 야 합 니 다.빅 데이터 분 야 는 반드시 kafka 입 니 다.그러면 전통 적 인 업무 장면 은 바로 디 결합,비동기,삭 봉 이다.그러면 나머지 3 가지 제품 중 하 나 를 선택 하 세 요.스루풋,지역사회 의 활약 도,메시지 의 신뢰성 에서 출발 하여 일반 중 소기 업 들 이 rabbitmq 를 선택 하 는 것 이 더 적합 할 수 있 습 니 다.그럼 어떻게 사용 하 는 지 보 자.
환경 준비
본 사례 는 springboot 통합 rabbitmq 를 바탕 으로 본 사례 는 주로 중요 한 실제 코드 를 바탕 으로 기초 이론 지식 에 대해 자체 바 이 두 를 사용 하 십시오.
jdk-version:1.8
rabbitmq-version:3.7
springboot-version:2.1.4.RELEASE
pom 파일

 <dependency>
 <groupId>org.springframework.boot</groupId>
 <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
yml 프로필

spring:
 rabbitmq:
 password: guest
 username: guest
 port: 5672
 addresses: 127.0.0.1
 #        
 publisher-returns: true
 #      
 publisher-confirms: true
 listener:
  simple:
  #          .
  concurrency: 2
  #          .
  max-concurrency: 2
  #  ack
  acknowledge-mode: auto
  #  ack
  direct:
  acknowledge-mode: auto
 #          
 template:
  mandatory: true
rabbitMq 자세 설정
자세
javaconfig 기반

package com.lly.order.message;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * @ClassName RabbitMqConfig
 * @Description rabbitMq   
 * @Author lly
 * @Date 2019-05-13 15:05
 * @Version 1.0
 **/
@Configuration
public class RabbitMqConfig {

 public final static String DIRECT_QUEUE = "directQueue";
 public final static String TOPIC_QUEUE_ONE = "topic_queue_one";
 public final static String TOPIC_QUEUE_TWO = "topic_queue_two";
 public final static String FANOUT_QUEUE_ONE = "fanout_queue_one";
 public final static String FANOUT_QUEUE_TWO = "fanout_queue_two";

 public final static String TOPIC_EXCHANGE = "topic_exchange";
 public final static String FANOUT_EXCHANGE = "fanout_exchange";

 public final static String TOPIC_ROUTINGKEY_ONE = "common_key";
 public final static String TOPIC_ROUTINGKEY_TWO = "*.key";

// direct    
 @Bean
 public Queue directQueue() {
  return new Queue(DIRECT_QUEUE, true);
 }
// topic        
 @Bean
 public Queue topicQueueOne() {
  return new Queue(TOPIC_QUEUE_ONE, true);
 }
 @Bean
 public Queue topicQueueTwo() {
  return new Queue(TOPIC_QUEUE_TWO, true);
 }
// fanout        
 @Bean
 public Queue fanoutQueueOne() {
  return new Queue(FANOUT_QUEUE_ONE, true);
 }
 @Bean
 public Queue fanoutQueueTwo() {
  return new Queue(FANOUT_QUEUE_TWO, true);
 }
// topic    
 @Bean
 public TopicExchange topExchange() {
  return new TopicExchange(TOPIC_EXCHANGE);
 }
// fanout    
 @Bean
 public FanoutExchange fanoutExchange() {
  return new FanoutExchange(FANOUT_EXCHANGE);
 }

//        
 @Bean
 public Binding topExchangeBingingOne() {
  return BindingBuilder.bind(topicQueueOne()).to(topExchange()).with(TOPIC_ROUTINGKEY_ONE);
 }

 @Bean
 public Binding topicExchangeBingingTwo() {
  return BindingBuilder.bind(topicQueueTwo()).to(topExchange()).with(TOPIC_ROUTINGKEY_TWO);
 }
//       
 @Bean
 public Binding fanoutExchangeBingingOne() {
  return BindingBuilder.bind(fanoutQueueOne()).to(fanoutExchange());
 }
 @Bean
 public Binding fanoutExchangeBingingTwo() {
  return BindingBuilder.bind(fanoutQueueTwo()).to(fanoutExchange());
 }
}
자세
주해 에 기초 하 다

package com.lly.order.message;

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.time.LocalTime;
import java.util.UUID;


/**
 * @ClassName MQTest
 * @Description       
 * @Author lly
 * @Date 2019-05-13 10:50
 * @Version 1.0
 **/
@Component
@Slf4j
public class MQTest implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {

 private final static String QUEUE = "test_queue";

 @Autowired
 private AmqpTemplate amqpTemplate;

 @Autowired
 private RabbitTemplate rabbitTemplate;

 public MQTest(RabbitTemplate rabbitTemplate) {
  rabbitTemplate.setConfirmCallback(this);
  rabbitTemplate.setReturnCallback(this);
 }

 public void sendMq() {
  rabbitTemplate.convertAndSend("test_queue", "test_queue" + LocalTime.now());
  log.info("    :{}", "test_queue" + LocalTime.now());
 }


 public void sendMqRabbit() {
  //  id
  CorrelationData cId = new CorrelationData(UUID.randomUUID().toString());
//  rabbitTemplate.convertAndSend(RabbitMqConfig.FANOUT_EXCHANGE, "", "       ",cId);
  Object object = rabbitTemplate.convertSendAndReceive(RabbitMqConfig.FANOUT_EXCHANGE, "", "       ", cId);
  log.info("    :{},object:{}", "       " + LocalTime.now(), object);
 }

 //       
 public void sendMqExchange() {
  CorrelationData cId = new CorrelationData(UUID.randomUUID().toString());
  CorrelationData cId01 = new CorrelationData(UUID.randomUUID().toString());
  log.info("     ->    :routing_key_one");
  rabbitTemplate.convertSendAndReceive("topic_exchange", "routing_key_one", "routing_key_one" + LocalTime.now(), cId);
  log.info("     ->    routing_key_two");
  rabbitTemplate.convertSendAndReceive("topic_exchange", "routing_key_two", "routing_key_two" + LocalTime.now(), cId01);
 }
 //     ,      
 @RabbitListener(queuesToDeclare = @Queue("test_queue"))
 public void receiverMq(String msg) {
  log.info("       :{}", msg);
 }
  //     ,              
 @RabbitListener(bindings = {
   @QueueBinding(value = @Queue(value = "topic_queue01", durable = "true"),
     exchange = @Exchange(value = "topic_exchange", type = ExchangeTypes.TOPIC),
     key = "routing_key_one")})
 public void receiverMqExchage(String msg, Channel channel, Message message) throws IOException {

  long deliveryTag = message.getMessageProperties().getDeliveryTag();

  try {
   log.info("   topic_routing_key_one  :{}", msg);
   //    
   log.error("    ");
   int i = 1 / 0;
   //                                                               
   channel.basicAck(deliveryTag, false);
  } catch (Exception e) {
   log.error("      ,      ");
   //requeu, true,                             ,
   //     ,     ,     db    
   //channel.basicNack(deliveryTag, false, true);
   //    
   //channel.basicReject(deliveryTag, true);
  }
 }

 @RabbitListener(bindings = {
   @QueueBinding(value = @Queue(value = "topic_queue02", durable = "true"),
     exchange = @Exchange(value = "topic_exchange", type = ExchangeTypes.TOPIC),
     key = "routing_key_two")})
 public void receiverMqExchageTwo(String msg) {
  log.info("   topic_routing_key_two  :{}", msg);
 }


 @RabbitListener(queues = RabbitMqConfig.FANOUT_QUEUE_ONE)
 public void receiverMqFanout(String msg, Channel channel, Message message) throws IOException {
  long deliveryTag = message.getMessageProperties().getDeliveryTag();
  try {
   log.info("     fanout_queue_one  :{}", msg);
   channel.basicAck(deliveryTag, false);
  } catch (Exception e) {
   e.printStackTrace();
   //                            
//   channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
   log.error("      ");
  }
 }

 @RabbitListener(queues = RabbitMqConfig.FANOUT_QUEUE_TWO)
 public void receiverMqFanoutTwo(String msg) {
  log.info("     fanout_queue_two  :{}", msg);
 }

 /**
  * @return
  * @Author lly
  * @Description          exchange
  * @Date 2019-05-14 15:36
  * @Param [correlationData, ack, cause]
  **/
 @Override
 public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  log.info("      id:{}", correlationData);
  log.info("      !");
  log.error("      ,cause:{}", cause);
 }
 /**
  * @return
  * @Author lly
  * @Description            
  * @Date 2019-05-14 16:22
  * @Param [message, replyCode, replyText, exchange, routingKey]
  **/
 @Override
 public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
  log.info("      id:{}", message.getMessageProperties().getCorrelationId());
  log.info("     message : ", message);
  log.info("     message : ", replyCode);
  log.info("  :" + replyText);
  log.info("         exchange : ", exchange);
  log.info("         routing : ", routingKey);
 }
}
rabbitMq 메시지 확인 의 세 가지 방식

#            
acknowledge-mode:none
#          ,           
acknowledge-mode:auto
#          
acknowledge-mode:manual
저희 가 topic 모드 로 메시지 의 ack 를 시험 해 보도 록 하 겠 습 니 다.

자동 확인 메시지 모드


수 동 확인 메시지 모드


그런 후에 우 리 는 다시 소식 을 소비 하 였 는데,소식 이 확인 되 지 않 았 기 때문에 다시 소 비 될 수 있다 는 것 을 발견 하 였 다.

같은 메시지 가 존재 하 는 것 을 발 견 했 습 니 다.대기 열 에 삭제 되 지 않 았 습 니 다.ack 를 수 동 으로 가 야 합 니 다.대기 열 1 의 수 동 ack 를 수정 하여 효 과 를 봐 야 합 니 다.

channel.basicAck(deliveryTag, false);
다시 시작 항목 재 소비 메시지

대기 열 에 있 는 메 시 지 를 다시 보 니 대기 열 01 에 있 는 메시지 가 삭제 되 었 고 대기 열 02 가 존재 합 니 다.

소비 메시지 에 이상 이 발생 한 경우,코드 를 수정 하여 이상 이 발생 한 상황 에서 무슨 일이 발생 했 는 지,이상 이 발생 했 는 지,메시지 가 대기 열 에 다시 넣 혔 습 니 다.

그러나 메시지 가 끊임없이 순환 소비 되 고 실패 하 며 대량의 서버 자원 을 순환 적 으로 호출 할 수 있 습 니 다.

그래서 우리 의 정확 한 처리 방식 은 이상 이 발생 하면 정 보 를 db 에 기록 한 다음 에 보상 체 제 를 통 해 정 보 를 보상 하거나 메시지 의 중복 횟수 를 기록 하여 재 시도 하고 몇 번 을 초과 한 후에 db 에 넣 는 것 이다.
총결산
실제 code 를 통 해 우리 가 알 고 있 는 rabbitmq 는 프로젝트 의 구체 적 인 통합 상황,메시지 ack 의 몇 가지 상황 을 통 해 실제 장면 에서 적당 한 방안 을 선택 하여 사용 하기에 편리 합 니 다.만약 부족 하 다 면 아낌없이 가르침 을 주시 기 바 랍 니 다.여러분 의 학습 에 도움 이 되 기 를 바 랍 니 다.여러분 들 도 저 희 를 많이 응원 해 주시 기 바 랍 니 다.

좋은 웹페이지 즐겨찾기