springboot+rabbitmq 지연 대기열 구현

3756 단어 오리지널
잔말 말고 코드로 바로 올라가세요.
1. 큐 및 스위치 만들기
@Configuration
public class RabbitConfig {
	
	@Bean
	public Queue hello() {
		return new Queue(MqConstant.HELLO_QUEUE_NAME);
	}
	
	@Bean
	public Binding binding() {
		return BindingBuilder.bind(hello()).to(defaultChange()).with(MqConstant.HELLO_QUEUE_NAME);
	}
	@Bean
	public DirectExchange defaultChange() {
		return new DirectExchange(MqConstant.DEFAULT_EXCHANGE, true, false);
	}
	
	@Bean
	public Queue repeatTradeQueue() {
		return new Queue(MqConstant.DEFAULT_REPEAT_TRADE_QUEUE_NAME, true, false, false);
	}
	
	@Bean
	public Binding drepeatTradeBinding() {
		return BindingBuilder.bind(repeatTradeQueue()).to(defaultChange()).with(MqConstant.DEFAULT_REPEAT_TRADE_QUEUE_NAME);
	}
	
	@Bean
	public Queue deadLetterQueue() {
		Map map = new HashMap<>();
		map.put("x-dead-letter-exchange", MqConstant.DEFAULT_EXCHANGE);
		map.put("x-dead-letter-routing-key", MqConstant.DEFAULT_REPEAT_TRADE_QUEUE_NAME);
		Queue queue = new Queue(MqConstant.DEFAULT_DEAD_LETTER_QUEUE_NAME, true, false, false, map);
		System.out.println("arguments :" + queue.getArguments());
		return queue;
	}
	
	@Bean
	public Binding deadLetterBinding() {
		return BindingBuilder.bind(deadLetterQueue()).to(defaultChange()).with(MqConstant.DEFAULT_DEAD_LETTER_QUEUE_NAME);
	}
}

2. 생산자 창설
@Component
public class Producr {
	private static final Logger LOGGER = LoggerFactory.getLogger(Producr.class);
	
	@Autowired
	private AmqpTemplate amqpTemplate;
	
	public void send(String queueName, String message) {
		amqpTemplate.convertAndSend(MqConstant.DEFAULT_EXCHANGE, queueName, message);
		LOGGER.info("send message:" + message + "----" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
	}
	
	public void send(String queueName, String msg, long time) {
		LOGGER.info("startTime is {}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
		DLXMessage dlxMessage = new DLXMessage(queueName, msg, time);
		MessagePostProcessor processor = new MessagePostProcessor() {
			
			@Override
			public Message postProcessMessage(Message message) throws AmqpException {
				message.getMessageProperties().setExpiration(time+"");
				return message;
			}
		};
		dlxMessage.setExchange(MqConstant.DEFAULT_EXCHANGE);
		amqpTemplate.convertAndSend(MqConstant.DEFAULT_EXCHANGE, MqConstant.DEFAULT_DEAD_LETTER_QUEUE_NAME, msg, processor);
		LOGGER.info("endTime is {}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
	}
}

3. 중간(전송) 대기열 만들기
4
@Component
public class TradeProcess {
	private static final Logger LOGGER = LoggerFactory.getLogger(TradeProcess.class);
	@Autowired
	private Producr producr;
	
	@RabbitListener(queues=MqConstant.DEFAULT_REPEAT_TRADE_QUEUE_NAME)
	@RabbitHandler
	public void process(String content) {
		producr.send(MqConstant.HELLO_QUEUE_NAME, content);
		LOGGER.info("process time is {}", new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
	}
}
4. 소비자 만들기
@Component
public class ReceiverTest {
	private static final Logger LOGGER = LoggerFactory.getLogger(ReceiverTest.class);
		
	@RabbitListener(queues=MqConstant.HELLO_QUEUE_NAME)
	@RabbitHandler
	public void process(String message) {
	   LOGGER.info("message is " + message + "---" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()));
    }
}

마지막으로 공식 문서에 따라 시간 지연 대기열 플러그인을 사용하는 방법이 있습니다
rabbitmq 플러그인이 있는데 시간 지연 대기열을 직접 사용할 수 있고 관심 있는 사람은 스스로 실천할 수 있습니다.
전송문: Dead Letter Exchanges
           Time-To-Live Extensions

좋은 웹페이지 즐겨찾기