SpringBoot 통합 RabbitMQ 메시지 큐 의 전체 절차
17711 단어 springboot통합rabbitmq
주로 RabbitMQ 아래 세 가지 메시지 대기 열 을 실현 합 니 다.
1.pom 의존 도입
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2.yml 파일 설정전편'RabbitMQ 설치 및 설정'을 기반 으로 한 기본 설정 입 니 다.
spring:
rabbitmq:
host: 121.5.168.31
port: 5672 #
virtual-host: /*** #
username: *** #
password: *** #
# P -> Exchange
publisher-confirm-type: correlated
# Exchange -> Queue
publisher-returns: true
# ACK Queue -> C
listener:
simple:
acknowledge-mode: manual # ACK
#
concurrency: 3
prefetch: 15
retry:
enabled: true
max-attempts: 5
max-concurrency: 10
3.공공 콘 스 탄 트 류
/**
* @author Mr.Horse
* @version 1.0
* @description: {description}
* @date 2021/4/23 15:28
*/
public class Constants {
/**
* Queue,Exchange,Key( )
*/
public final static String HORSE_SIMPLE_QUEUE = "HORSE_SIMPLE_QUEUE";
public final static String HORSE_SIMPLE_EXCHANGE = "HORSE_SIMPLE_EXCHANGE";
public final static String HORSE_SIMPLE_KEY = "HORSE_SIMPLE_KEY";
/**
* Queue,Exchange,Key( )
*/
public final static String HORSE_ANNOTATION_QUEUE = "HORSE_ANNOTATION_QUEUE";
public final static String HORSE_ANNOTATION_EXCHANGE = "HORSE_ANNOTATION_EXCHANGE";
public final static String HORSE_ANNOTATION_KEY = "HORSE_ANNOTATION_KEY";
//************************************ **************************
/**
*
*/
public final static String HORSE_DELAY_EXCHANGE = "HORSE_DELAY_EXCHANGE";
public final static String HORSE_DELAY_QUEUE = "HORSE_DELAY_QUEUE";
public final static String HORSE_DELAY_KEY = "HORSE_DELAY_KEY";
/**
*
*/
public final static String HORSE_DEAD_EXCHANGE = "HORSE_DEAD_EXCHANGE";
public final static String HORSE_DEAD_QUEUE = "HORSE_DEAD_QUEUE";
public final static String HORSE_DEAD_KEY = "HORSE_DEAD_KEY";
//************************************** ( )******************************
/**
*
*/
public final static String HORSE_PLUGIN_EXCHANGE = "HORSE_PLUGIN_EXCHANGE";
public final static String HORSE_PLUGIN_QUEUE = "HORSE_PLUGIN_QUEUE";
public final static String HORSE_PLUGIN_KEY = "HORSE_PLUGIN_KEY";
}
간단 한 메시지 큐(direct 모드)4.RabbitTemplate 템 플 릿 설정
주로 메시지 배달 Exchange 성공 리 셋 함수 와 메시지 가 Exchange 에서 메시지 대기 열 로 전달 되 는 데 실패 한 리 셋 함 수 를 정의 합 니 다.
package com.topsun.rabbit;
import com.sun.org.apache.xpath.internal.operations.Bool;
import com.topsun.constants.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author Mr.Horse
* @version 1.0
* @description: {description}
* @date 2021/4/23 14:17
*/
@Configuration
public class RabbitConfig {
private static Logger logger = LoggerFactory.getLogger(RabbitConfig.class);
@Autowired
private CachingConnectionFactory connectionFactory;
/**
* @return
*/
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
// setReturnCallback mandatory=true, Exchange Queue ,
rabbitTemplate.setMandatory(Boolean.TRUE);
//
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
// Exchange
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) ->
logger.info(" Exchange : :correlationData={}, :ack={}, :cause={}",
correlationData, ack, cause)
);
// Exchange Queue
rabbitTemplate.setReturnsCallback((returnedMessage) -> {
// ,
// : , , ,
if (Constants.HORSE_PLUGIN_EXCHANGE.equals(returnedMessage.getExchange())) {
return;
}
logger.warn(" Exchange Queue :message={},replyCode={},replyText={},exchange={},rountingKey={}",
returnedMessage.getMessage(), returnedMessage.getReplyText(), returnedMessage.getReplyText(),
returnedMessage.getExchange(), returnedMessage.getRoutingKey());
});
return rabbitTemplate;
}
//******************************************* *****************************************
/**
*
*
* @return
*/
@Bean
public Queue horseQueue() {
return new Queue(Constants.HORSE_SIMPLE_QUEUE, Boolean.TRUE);
}
/**
*
*
* @return
*/
@Bean
public DirectExchange horseExchange() {
return new DirectExchange(Constants.HORSE_SIMPLE_EXCHANGE, Boolean.TRUE, Boolean.FALSE);
}
/**
* , , Key
*
* @return
*/
@Bean
public Binding horseBinding() {
return BindingBuilder.bind(horseQueue()).to(horseExchange()).with(Constants.HORSE_SIMPLE_KEY);
}
}
5.메시지 모니터 정의@RabbitListenezi 주석 을 기반 으로 사용자 정의 메시지 모니터 를 구현 합 니 다.주로 두 가지 실현 방식 이 있다.
package com.topsun.rabbit;
import com.rabbitmq.client.Channel;
import com.topsun.constants.Constants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @author Mr.Horse
* @version 1.0
* @description: {description}
* @date 2021/4/23 14:58
*/
@Component
public class MsgListener {
private static Logger logger = LoggerFactory.getLogger(MsgListener.class);
/**
* ,
*
* @param message
* @param channel
* @param msg
*/
@RabbitListenerzi(queues = Constants.HORSE_SIMPLE_QUEUE)
public void customListener(Message message, Channel channel, String msg) {
// ( ACK )
long tag = message.getMessageProperties().getDeliveryTag();
try {
logger.info(" ==> customListener " + msg);
// ACK
channel.basicAck(tag, false);
} catch (IOException e) {
logger.error(" ==> : {}", tag);
}
}
/**
*
*
* @param message
* @param channel
* @param msg
*/
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = Constants.HORSE_ANNOTATION_QUEUE, durable = "true"),
exchange = @Exchange(value = Constants.HORSE_ANNOTATION_EXCHANGE, ignoreDeclarationExceptions = "true"),
key = {Constants.HORSE_ANNOTATION_KEY}
))
public void annotationListener(Message message, Channel channel, String msg) {
// ( ACK )
long tag = message.getMessageProperties().getDeliveryTag();
try {
logger.info(" ==> annotationListener " + msg);
// ACK
channel.basicAck(tag, false);
} catch (IOException e) {
logger.error(" ==> : {}", tag);
}
}
}
6.테스트 인터페이스여기에 100 개의 메 시 지 를 보 냅 니 다:
4.567917.홀수 항목 에서 비 주해 방식 의 메시지 모니터주해 식 메시지 모니터
@GetMapping("/rabbit")
public void sendMsg() {
for (int i = 1; i <= 100; i++) {
String msg = " " + i + " ";
logger.info("==> " + msg);
if (i % 2 == 1) {
rabbitTemplate.convertAndSend(Constants.HORSE_SIMPLE_EXCHANGE, Constants.HORSE_SIMPLE_KEY, msg, new CorrelationData(String.valueOf(i)));
} else {
rabbitTemplate.convertAndSend(Constants.HORSE_ANNOTATION_EXCHANGE, Constants.HORSE_ANNOTATION_KEY, msg, new CorrelationData(String.valueOf(i)));
}
}
}
결과:자체 테스트,매우 성공:smile:smile::smile::smile:지연 메시지 큐
원리:생산 자 는 지연 메 시 지 를 생산 합 니 다.필요 한 지연 시간 에 따라 서로 다른 routingkey 를 이용 하여 메 시 지 를 서로 다른 지연 대기 열 로 이동 합 니 다.각 대기 열 은 서로 다른 TTL 속성 을 설정 하고 같은 사신 교환기 에 연결 합 니 다.메시지 가 만 료 되면 routingkey 에 따라 서로 다른 사신 대기 열 로 이동 합 니 다.소비 자 는 해당 하 는 수신 대기 열 을 감청 해 처리 하면 된다.
7.귀속 관련 정보 설정
/**
* @author Mr.Horse
* @version 1.0
* @description: {description}
* @date 2021/4/24 14:22
*/
@Configuration
public class DelayRabbitConfig {
private static Logger logger = LoggerFactory.getLogger(DelayRabbitConfig.class);
/**
*
*
* @return
*/
@Bean
public DirectExchange delayExchange() {
return new DirectExchange(Constants.HORSE_DELAY_EXCHANGE, Boolean.TRUE, Boolean.FALSE);
}
/**
*
*
* @return
*/
@Bean
public DirectExchange deadExchange() {
return new DirectExchange(Constants.HORSE_DEAD_EXCHANGE, Boolean.TRUE, Boolean.FALSE);
}
/**
* 10s( :ms), Key
*
* @return
*/
@Bean
public Queue delayQueue() {
Map<String, Object> args = new HashMap<>(3);
// x-dead-letter-exchange
args.put("x-dead-letter-exchange", Constants.HORSE_DEAD_EXCHANGE);
// x-dead-letter-routing-key key
args.put("x-dead-letter-routing-key", Constants.HORSE_DEAD_KEY);
// x-message-ttl TTL( )
// , ( )
// args.put("x-message-ttl", 10000);
return QueueBuilder.durable(Constants.HORSE_DELAY_QUEUE).withArguments(args).build();
}
/**
*
*
* @return
*/
@Bean
public Queue deadQueue() {
return new Queue(Constants.HORSE_DEAD_QUEUE, Boolean.TRUE);
}
/**
*
*
* @return
*/
@Bean
public Binding delayBinding() {
return BindingBuilder.bind(delayQueue()).to(delayExchange()).with(Constants.HORSE_DELAY_KEY);
}
/**
*
*
* @return
*/
@Bean
public Binding deadBinding() {
return BindingBuilder.bind(deadQueue()).to(deadExchange()).with(Constants.HORSE_DEAD_KEY);
}
//********************************** ( )************************************
@Bean
public Queue pluginQueue() {
return new Queue(Constants.HORSE_PLUGIN_QUEUE);
}
/**
* , CustomExchange
* ,
* @return
*/
@Bean
public CustomExchange customPluginExchange() {
Map<String, Object> args = new HashMap<>(2);
args.put("x-delayed-type", "direct");
return new CustomExchange(Constants.HORSE_PLUGIN_EXCHANGE, "x-delayed-message", Boolean.TRUE, Boolean.FALSE, args);
}
@Bean
public Binding pluginBinding() {
return BindingBuilder.bind(pluginQueue()).to(customPluginExchange()).with(Constants.HORSE_PLUGIN_KEY).noargs();
}
}
8.지연 모니터 정의
/**
* @author Mr.Horse
* @version 1.0
* @description: {description}
* @date 2021/4/24 14:51
*/
@Component
public class DelayMsgListener {
private static Logger logger = LoggerFactory.getLogger(DelayMsgListener.class);
/**
*
*
* @param message
* @param channel
* @param msg
*/
@RabbitListener(queues = Constants.HORSE_DEAD_QUEUE)
public void consumeDeadListener(Message message, Channel channel, String msg) {
long tag = message.getMessageProperties().getDeliveryTag();
try {
logger.info(" ==> consumeDeadListener " + msg);
// ACK
channel.basicAck(tag, false);
} catch (IOException e) {
logger.error(" ==> : {}", tag);
}
}
/**
* ( )
*
* @param message
* @param channel
* @param msg
*/
@RabbitListener(queues = Constants.HORSE_PLUGIN_QUEUE)
public void consumePluginListener(Message message, Channel channel, String msg) {
long tag = message.getMessageProperties().getDeliveryTag();
try {
logger.info(" ==> consumePluginListener" + msg);
// ACK
channel.basicAck(tag, false);
} catch (IOException e) {
logger.error(" ==> : {}", tag);
}
}
}
9.테스트 인터페이스
//
@GetMapping("/delay/rabbit")
public void delayMsg(@RequestParam("expire") Long expire) {
for (int i = 1; i <= 10; i++) {
String msg = " " + i + " ";
logger.info("==> " + msg);
//
rabbitTemplate.convertAndSend(Constants.HORSE_DELAY_EXCHANGE, Constants.HORSE_DELAY_KEY, msg,
message -> {
message.getMessageProperties().setExpiration(String.valueOf(expire));
return message;
},
new CorrelationData(String.valueOf(i)));
}
}
//
@GetMapping("/delay/plugin")
public void delayPluginMsg(@RequestParam("expire") Integer expire) {
for (int i = 1; i <= 10; i++) {
String msg = " " + i + " ";
logger.info("==> " + msg);
//
rabbitTemplate.convertAndSend(Constants.HORSE_PLUGIN_EXCHANGE, Constants.HORSE_PLUGIN_KEY, msg, message -> {
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
message.getMessageProperties().setDelay(expire);
return message;
}, new CorrelationData(String.valueOf(i)));
}
}
결과:알 잖 아:screamcat::scream_cat::scream_cat:RabbitMQ 의 기본 사용 시연 은 여기 서 마 치 겠 습 니 다.
총결산
SpringBoot 통합 RabbitMQ 메시지 큐 에 관 한 이 글 은 여기까지 소개 되 었 습 니 다.더 많은 SpringBoot 통합 RabbitMQ 메시지 큐 내용 은 예전 의 글 을 검색 하거나 아래 의 관련 글 을 계속 찾 아 보 세 요.앞으로 많은 응원 부 탁 드 리 겠 습 니 다!
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Kotlin Springboot -- 파트 14 사용 사례 REST로 전환하여 POST로 JSON으로 전환前回 前回 前回 記事 の は は で で で で で で を 使っ 使っ 使っ て て て て て リクエスト を を 受け取り 、 reqeustbody で 、 その リクエスト の ボディ ボディ を を 受け取り 、 関数 内部 ...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.