RabbitMQ- 두 가지 메커니즘 및 소비재 최적화 작성 방식
return 메커니즘:
Return Listener는 라우팅할 수 없는 메시지를 처리하는 데 사용됩니다.
우리의 메시지 생산자는 Exchange와 Routingkey를 지정해서 메시지를 특정한 대기열로 보냅니다.
그리고 우리 소비자들은 대열을 감청하여 메시지 처리 조작을 한다.
그러나 어떤 경우, 만약 우리가 메시지를 보낼 때, 현재 exchange가 존재하지 않거나 지정한 루트 키 루트가 존재하지 않는다면,
이럴 때 우리는 이런 도달할 수 없는 소식을 감청해야 한다.returnlistener를 사용해야 한다.
프로덕션 코드
package com.sy.rabbitmq.return_listener;
import com.rabbitmq.client.*;
import com.sy.rabbitmq.TestProperties;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* -return
*/
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
// ConnectionFactory
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(TestProperties.getIp());
factory.setPort(TestProperties.getPort());
factory.setVirtualHost("/");
//2. Connection
Connection connection = factory.newConnection();
//3. channel
Channel channel = connection.createChannel();
String exchangeName = "test_confirm_exchange";
String routeKey = "save";
//5.
String msg = "Hello RabbitMQ send confirm message!";
//6. -
channel.addReturnListener(
new ReturnListener() {
@Override
public void handleReturn(
int replyCode,
String replyText,
String exchange,
String routingKey,
AMQP.BasicProperties properties,
byte[] body
) throws IOException {
System.out.println("-------------return --------------");
}
}
);
channel.basicPublish(exchangeName,routeKey,true,null,msg.getBytes());
}
}
소비자 코드:
package com.sy.rabbitmq.return_listener;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.sy.rabbitmq.TestProperties;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* -return
*/
public class Comsumer {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// ConnectionFactory
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(TestProperties.getIp());
factory.setPort(TestProperties.getPort());
factory.setVirtualHost("/");
//2. Connection
Connection connection = factory.newConnection();
//3. channel
Channel channel = connection.createChannel();
//4. channel exchange
String exchangeName = "test_confirm_exchange";
String routeKey = "confirm.#";
String queueName = "test_confirm_queue";
channel.exchangeDeclare(exchangeName,"topic",true);
//5. channel , key
channel.queueDeclare(queueName,true,false,false,null);
channel.queueBind(queueName,exchangeName,routeKey);
//6.
QueueingConsumer consumer = new QueueingConsumer(channel);
//7.
channel.basicConsume(queueName,true,consumer);
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println(msg);
}
}
}
confirm 메커니즘:
정상적인 상황에서 메시지가 교환기를 통해 대기열에 들어가면 메시지의 지속화를 완성할 수 있지만, 브로커에 도착하기 전에 의외의 사고가 발생하면 메시지 분실을 초래합니다
Confirm 확인 메시지는 어떻게 수행됩니까?
첫 번째 단계:channel에서 확인 모드:channel을 엽니다.confirmSelect()
두 번째 단계: 채널에 감청 추가:addConfirmListener, 감청 성공과 실패의 반환 결과,
구체적인 결과에 따라 메시지를 다시 보내거나 로그를 기록하는 등 후속 처리를 한다.
프로덕션 코드
package com.sy.rabbitmq.confirm_listener;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.sy.rabbitmq.TestProperties;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* -confirm
*/
public class Producer {
public static void main(String[] args) throws IOException, TimeoutException {
// ConnectionFactory
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(TestProperties.getIp());
factory.setPort(TestProperties.getPort());
factory.setVirtualHost("/");
//2. Connection
Connection connection = factory.newConnection();
//3. channel
Channel channel = connection.createChannel();
//4. : confirmSelect:
channel.confirmSelect();
String exchangeName = "test_confirm_exchange";
String routeKey = "confirm.save";
//5.
String msg = "Hello RabbitMQ send confirm message!";
channel.basicPublish(exchangeName,routeKey,null,msg.getBytes());
//6.
channel.addConfirmListener(
new ConfirmListener() {
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.out.println("----------ack----------");
}
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.out.println("----------no-ack----------");
}
}
);
}
}
소비자 코드:
package com.sy.rabbitmq.confirm_listener;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.sy.rabbitmq.TestProperties;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* -confirm
*/
public class Comsumer {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// ConnectionFactory
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(TestProperties.getIp());
factory.setPort(TestProperties.getPort());
factory.setVirtualHost("/");
//2. Connection
Connection connection = factory.newConnection();
//3. channel
Channel channel = connection.createChannel();
//4. channel exchange
String exchangeName = "test_confirm_exchange";
String routeKey = "confirm.#";
String queueName = "test_confirm_queue";
channel.exchangeDeclare(exchangeName,"topic",true);
//5. channel , key
channel.queueDeclare(queueName,true,false,false,null);
channel.queueBind(queueName,exchangeName,routeKey);
//6.
QueueingConsumer consumer = new QueueingConsumer(channel);
//7.
channel.basicConsume(queueName,true,consumer);
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String msg = new String(delivery.getBody());
System.out.println(msg);
}
}
}
새 버전의 소비자단 작성 방법:
소비단은 순환 소비 대기열의 메시지를 사용하고 코드상 그다지 아름답지 않으면 사용할 수 있다.
package com.sy.rabbitmq.consumer;
import com.rabbitmq.client.*;
import com.sy.rabbitmq.TestProperties;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
*
*/
public class Comsumer {
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
// ConnectionFactory
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(TestProperties.getIp());
factory.setPort(TestProperties.getPort());
factory.setVirtualHost("/");
//2. Connection
Connection connection = factory.newConnection();
//3. channel
Channel channel = connection.createChannel();
//4. channel exchange
String exchangeName = "test_confirm_exchange";
String routeKey = "confirm.#";
String queueName = "test_confirm_queue";
channel.exchangeDeclare(exchangeName,"topic",true);
//5. channel , key
channel.queueDeclare(queueName,true,false,false,null);
channel.queueBind(queueName,exchangeName,routeKey);
//---- ----
//6.
// QueueingConsumer consumer = new QueueingConsumer(channel);
//7.
// channel.basicConsume(queueName,true,consumer);
// while(true){
// QueueingConsumer.Delivery delivery = consumer.nextDelivery();
// String msg = new String(delivery.getBody());
// System.out.println(msg);
// }
//---- , handleDelivery ----
channel.basicConsume(queueName,true,new MyConsumer(channel));
}
/**
*
*/
public static class MyConsumer extends DefaultConsumer {
/**
* Constructs a new instance and records its association to the passed-in channel.
*
* @param channel the channel to which this consumer is attached
*/
public MyConsumer(Channel channel) {
super(channel);
}
/**
*
* @param consumerTag
* @param envelope
* @param properties
* @param body
* @throws IOException
*/
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("----------------- -----------------");
System.out.println("consumerTag"+consumerTag);
System.out.println("envelope"+envelope);
System.out.println("AMQP.BasicProperties"+properties);
System.out.println("body"+new String(body));
}
}
}
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
RabbitMQ 메시지 보내기 후 반환 메시지 얻기생산자 소비자 출력 ===================================================================+++=======================================...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.