SpringBoot+RabbitMq 가 구체 적 으로 사용 하 는 자세
12688 단어 SpringBootRabbitMq쓰다
환경 준비
본 사례 는 springboot 통합 rabbitmq 를 바탕 으로 본 사례 는 주로 중요 한 실제 코드 를 바탕 으로 기초 이론 지식 에 대해 자체 바 이 두 를 사용 하 십시오.
jdk-version:1.8
rabbitmq-version:3.7
springboot-version:2.1.4.RELEASE
pom 파일
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
yml 프로필
spring:
rabbitmq:
password: guest
username: guest
port: 5672
addresses: 127.0.0.1
#
publisher-returns: true
#
publisher-confirms: true
listener:
simple:
# .
concurrency: 2
# .
max-concurrency: 2
# ack
acknowledge-mode: auto
# ack
direct:
acknowledge-mode: auto
#
template:
mandatory: true
rabbitMq 자세 설정자세
javaconfig 기반
package com.lly.order.message;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @ClassName RabbitMqConfig
* @Description rabbitMq
* @Author lly
* @Date 2019-05-13 15:05
* @Version 1.0
**/
@Configuration
public class RabbitMqConfig {
public final static String DIRECT_QUEUE = "directQueue";
public final static String TOPIC_QUEUE_ONE = "topic_queue_one";
public final static String TOPIC_QUEUE_TWO = "topic_queue_two";
public final static String FANOUT_QUEUE_ONE = "fanout_queue_one";
public final static String FANOUT_QUEUE_TWO = "fanout_queue_two";
public final static String TOPIC_EXCHANGE = "topic_exchange";
public final static String FANOUT_EXCHANGE = "fanout_exchange";
public final static String TOPIC_ROUTINGKEY_ONE = "common_key";
public final static String TOPIC_ROUTINGKEY_TWO = "*.key";
// direct
@Bean
public Queue directQueue() {
return new Queue(DIRECT_QUEUE, true);
}
// topic
@Bean
public Queue topicQueueOne() {
return new Queue(TOPIC_QUEUE_ONE, true);
}
@Bean
public Queue topicQueueTwo() {
return new Queue(TOPIC_QUEUE_TWO, true);
}
// fanout
@Bean
public Queue fanoutQueueOne() {
return new Queue(FANOUT_QUEUE_ONE, true);
}
@Bean
public Queue fanoutQueueTwo() {
return new Queue(FANOUT_QUEUE_TWO, true);
}
// topic
@Bean
public TopicExchange topExchange() {
return new TopicExchange(TOPIC_EXCHANGE);
}
// fanout
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(FANOUT_EXCHANGE);
}
//
@Bean
public Binding topExchangeBingingOne() {
return BindingBuilder.bind(topicQueueOne()).to(topExchange()).with(TOPIC_ROUTINGKEY_ONE);
}
@Bean
public Binding topicExchangeBingingTwo() {
return BindingBuilder.bind(topicQueueTwo()).to(topExchange()).with(TOPIC_ROUTINGKEY_TWO);
}
//
@Bean
public Binding fanoutExchangeBingingOne() {
return BindingBuilder.bind(fanoutQueueOne()).to(fanoutExchange());
}
@Bean
public Binding fanoutExchangeBingingTwo() {
return BindingBuilder.bind(fanoutQueueTwo()).to(fanoutExchange());
}
}
자세주해 에 기초 하 다
package com.lly.order.message;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.time.LocalTime;
import java.util.UUID;
/**
* @ClassName MQTest
* @Description
* @Author lly
* @Date 2019-05-13 10:50
* @Version 1.0
**/
@Component
@Slf4j
public class MQTest implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback {
private final static String QUEUE = "test_queue";
@Autowired
private AmqpTemplate amqpTemplate;
@Autowired
private RabbitTemplate rabbitTemplate;
public MQTest(RabbitTemplate rabbitTemplate) {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnCallback(this);
}
public void sendMq() {
rabbitTemplate.convertAndSend("test_queue", "test_queue" + LocalTime.now());
log.info(" :{}", "test_queue" + LocalTime.now());
}
public void sendMqRabbit() {
// id
CorrelationData cId = new CorrelationData(UUID.randomUUID().toString());
// rabbitTemplate.convertAndSend(RabbitMqConfig.FANOUT_EXCHANGE, "", " ",cId);
Object object = rabbitTemplate.convertSendAndReceive(RabbitMqConfig.FANOUT_EXCHANGE, "", " ", cId);
log.info(" :{},object:{}", " " + LocalTime.now(), object);
}
//
public void sendMqExchange() {
CorrelationData cId = new CorrelationData(UUID.randomUUID().toString());
CorrelationData cId01 = new CorrelationData(UUID.randomUUID().toString());
log.info(" -> :routing_key_one");
rabbitTemplate.convertSendAndReceive("topic_exchange", "routing_key_one", "routing_key_one" + LocalTime.now(), cId);
log.info(" -> routing_key_two");
rabbitTemplate.convertSendAndReceive("topic_exchange", "routing_key_two", "routing_key_two" + LocalTime.now(), cId01);
}
// ,
@RabbitListener(queuesToDeclare = @Queue("test_queue"))
public void receiverMq(String msg) {
log.info(" :{}", msg);
}
// ,
@RabbitListener(bindings = {
@QueueBinding(value = @Queue(value = "topic_queue01", durable = "true"),
exchange = @Exchange(value = "topic_exchange", type = ExchangeTypes.TOPIC),
key = "routing_key_one")})
public void receiverMqExchage(String msg, Channel channel, Message message) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
log.info(" topic_routing_key_one :{}", msg);
//
log.error(" ");
int i = 1 / 0;
//
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
log.error(" , ");
//requeu, true, ,
// , , db
//channel.basicNack(deliveryTag, false, true);
//
//channel.basicReject(deliveryTag, true);
}
}
@RabbitListener(bindings = {
@QueueBinding(value = @Queue(value = "topic_queue02", durable = "true"),
exchange = @Exchange(value = "topic_exchange", type = ExchangeTypes.TOPIC),
key = "routing_key_two")})
public void receiverMqExchageTwo(String msg) {
log.info(" topic_routing_key_two :{}", msg);
}
@RabbitListener(queues = RabbitMqConfig.FANOUT_QUEUE_ONE)
public void receiverMqFanout(String msg, Channel channel, Message message) throws IOException {
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
log.info(" fanout_queue_one :{}", msg);
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
e.printStackTrace();
//
// channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
log.error(" ");
}
}
@RabbitListener(queues = RabbitMqConfig.FANOUT_QUEUE_TWO)
public void receiverMqFanoutTwo(String msg) {
log.info(" fanout_queue_two :{}", msg);
}
/**
* @return
* @Author lly
* @Description exchange
* @Date 2019-05-14 15:36
* @Param [correlationData, ack, cause]
**/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info(" id:{}", correlationData);
log.info(" !");
log.error(" ,cause:{}", cause);
}
/**
* @return
* @Author lly
* @Description
* @Date 2019-05-14 16:22
* @Param [message, replyCode, replyText, exchange, routingKey]
**/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info(" id:{}", message.getMessageProperties().getCorrelationId());
log.info(" message : ", message);
log.info(" message : ", replyCode);
log.info(" :" + replyText);
log.info(" exchange : ", exchange);
log.info(" routing : ", routingKey);
}
}
rabbitMq 메시지 확인 의 세 가지 방식
#
acknowledge-mode:none
# ,
acknowledge-mode:auto
#
acknowledge-mode:manual
저희 가 topic 모드 로 메시지 의 ack 를 시험 해 보도 록 하 겠 습 니 다.자동 확인 메시지 모드
수 동 확인 메시지 모드
그런 후에 우 리 는 다시 소식 을 소비 하 였 는데,소식 이 확인 되 지 않 았 기 때문에 다시 소 비 될 수 있다 는 것 을 발견 하 였 다.
같은 메시지 가 존재 하 는 것 을 발 견 했 습 니 다.대기 열 에 삭제 되 지 않 았 습 니 다.ack 를 수 동 으로 가 야 합 니 다.대기 열 1 의 수 동 ack 를 수정 하여 효 과 를 봐 야 합 니 다.
channel.basicAck(deliveryTag, false);
다시 시작 항목 재 소비 메시지대기 열 에 있 는 메 시 지 를 다시 보 니 대기 열 01 에 있 는 메시지 가 삭제 되 었 고 대기 열 02 가 존재 합 니 다.
소비 메시지 에 이상 이 발생 한 경우,코드 를 수정 하여 이상 이 발생 한 상황 에서 무슨 일이 발생 했 는 지,이상 이 발생 했 는 지,메시지 가 대기 열 에 다시 넣 혔 습 니 다.
그러나 메시지 가 끊임없이 순환 소비 되 고 실패 하 며 대량의 서버 자원 을 순환 적 으로 호출 할 수 있 습 니 다.
그래서 우리 의 정확 한 처리 방식 은 이상 이 발생 하면 정 보 를 db 에 기록 한 다음 에 보상 체 제 를 통 해 정 보 를 보상 하거나 메시지 의 중복 횟수 를 기록 하여 재 시도 하고 몇 번 을 초과 한 후에 db 에 넣 는 것 이다.
총결산
실제 code 를 통 해 우리 가 알 고 있 는 rabbitmq 는 프로젝트 의 구체 적 인 통합 상황,메시지 ack 의 몇 가지 상황 을 통 해 실제 장면 에서 적당 한 방안 을 선택 하여 사용 하기에 편리 합 니 다.만약 부족 하 다 면 아낌없이 가르침 을 주시 기 바 랍 니 다.여러분 의 학습 에 도움 이 되 기 를 바 랍 니 다.여러분 들 도 저 희 를 많이 응원 해 주시 기 바 랍 니 다.
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
【Java・SpringBoot・Thymeleaf】 에러 메세지를 구현(SpringBoot 어플리케이션 실천편 3)로그인하여 사용자 목록을 표시하는 응용 프로그램을 만들고, Spring에서의 개발에 대해 공부하겠습니다 🌟 마지막 데이터 바인딩에 계속 바인딩 실패 시 오류 메시지를 구현합니다. 마지막 기사🌟 src/main/res...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.