RabbitMQ 메시지 게시 구독 및 정보 지속성 기술
정보 게시 및 구독
Rabbit의 핵심 구성 요소는Queue(메시지 대기열)와 Exchanges 두 부분을 포함한다. Exchange의 주요 부분은 정보에 대한 루트이다. 메시지 대기열을 Exchange에 연결하면 구독 형식의 메시지 발표와 Publish/Subscribe를 실현할 수 있다. 이 모델에서 메시지 발표자는 정보를 해당하는 Exchange에 발표하기만 하면 되고 Exchange는 자동으로 다른 Queue에 정보를 나누어 준다.
이 모드에서 Exchange가 수행하는 역할
명령줄에서 사용 가능
sudo rabbitmqctl list_exchanges
sudo rabbitmqctl list_bindings
현재 시스템 종류에 존재하는 Exchange와 Exchange에 바인딩된Queue 정보를 각각 보십시오.
메시지 게시자 EmitLog.java import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class EmitLog {
private static final String EXCHANGE_NAME="logs";
public static void main(String[] args) throws java.io.IOException{
//
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
//
Connection connection = factory.newConnection();
//
Channel channel = connection.createChannel();
// Exchange
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String message = "Message "+Math.random();
// Exchange , Exchange
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println("[x] Sent '"+message+"'");
//
channel.close();
connection.close();
}
}
메시지 소비자ReceiveLogs.java import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;
public class ReceiveLogs {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
//
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
//
Connection connection = factory.newConnection();
//
Channel channel = connection.createChannel();
// Exchange
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// ,
String queueName = channel.queueDeclare().getQueue();
// Exchange
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
//
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
while (true) {
//
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
}
}
}
실행 시 EmitLog를 시작합니다.java 여러 ReceiveLogs.자바는 발표자가 매번 정보를 발표하는 것을 볼 수 있으며 해당 Exchange에 귀속된 소비자만 정보를 얻을 수 있다.
RabbitMQ 정보 지속화 기술
위의 예에서 우리는 Publisher/Subscribe의 메시지 배달 방식을 실현했지만 그 중에서 몇 가지 문제가 존재한다.예를 들어 우리가 ReceiveLog를 실행할 때 특정한 메시지 대기열에 대응하여list_를 이용할 수 있다이 메시지 대기열은 logs라는 Exchange에 도움이 됩니다. 이것은 메시지를 발표하는 모든 소비자들이 수신할 수 있습니다. Receive Log 프로그램을 닫으면 이 메시지 대기열은 자동으로 삭제됩니다. 왜냐하면 그들은 비지구적이기 때문입니다.EmitLog 프로그램과 마찬가지로 매번 종료될 때마다 이전 생명의 Exchange도 자동으로 제거됩니다.
이로 인해 몇 가지 문제가 생겼다.ReceiveLog가 실행될 때 Exchange에 연결된 메시지 대기열이 없습니다. 메시지를 발표한 후에 ReceiveLog 프로그램을 시작하면 이전에 발표된 정보를 받아들일 수 없습니다.이것이 바로 소식의 지속화를 진행하는 이유다.
지구화 기술을 통해 우리는 지구화된 Exchange와 지구화된Queue를 생명할 수 있다. 이렇게 하면Queue를 Exchange에 연결한 후에 소비자 프로그램이 실행되지 않아도 정보는Queue에 저장될 수 있다. 다음에 소비자 프로그램을 시작할 때 발표된 모든 정보를 얻을 수 있다.예를 들어 소비자 프로그램이 메시지 서열의 임무를 수행할 때 갑자기 이상이 발생하면 다시 시작한 후에도 지난번에 발생한 오류의 위치에서 계속 실행할 수 있다는 점에서 질서적이고 연속적인 조작이 필요하다는 점이 특히 중요하다.
다음은 지속화 과정에서list_를 빌려exchanges,list_bindings,list_queues는 서버의 관련 정보를 보고 그룹 분석 과정을 도와줍니다.
Publisher.java import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
public class Publisher {
private static final String EXCHANGE_NAME="persi";// Exchange
private static final boolean durable = true;//
public static void main(String[] args) throws java.io.IOException {
ConnectionFactory factory = new ConnectionFactory();//
factory.setHost("localhost");
Connection connection = factory.newConnection();//
Channel channel = connection.createChannel();//
channel.exchangeDeclare(EXCHANGE_NAME, "fanout", durable);//
String message = "Hello Wrold "+Math.random();
//
channel.basicPublish(EXCHANGE_NAME, "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
System.out.println("[x] Sent '" + message + "'");
channel.close();
connection.close();
}
}
Subscriber.java public class Subscriber {
//private static final String[] QUEUE_NAMES= {"que_001","que_002","que_003","que_004","que_005"};
private static final String[] QUEUE_NAMES= {"que_006","que_007","que_008","que_009","que_0010"};
public static void main(String[] args){
for(int i=0;i<QUEUE_NAMES.length;i++){
SubscriberThead sub = new SubscriberThead(QUEUE_NAMES[i]);
Thread t = new Thread(sub);
t.start();
}
}
}
SubscriberThead.java import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.AMQP.Queue.DeclareOk;
public class SubscriberThead implements Runnable {
private String queue_name = null;
private static final String EXCHANGE_NAME = "persi";//
private static final boolean durable = true;//
public SubscriberThead(String queue_name) {
this.queue_name = queue_name;
}
@Override
public void run() {
try{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout", durable);
DeclareOk ok = channel.queueDeclare(queue_name, durable, false,
false, null);
String queueName = ok.getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" ["+queue_name+"] Waiting for messages. To exit press CTRL+C");
channel.basicQos(1);//
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, false, consumer);
while (true) {
Thread.sleep(2000);
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" ["+queue_name+"] Received '" + message + "'");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}catch(Exception e){
e.printStackTrace();
}
}
}
지속적인 처리를 통해 rabbitMQ는 Exchange 정보와Queue 정보를 저장하고, 심지어rabbitMQ 서버가 닫힌 후에도 정보를 저장할 수 있어 메시지 전달의 신뢰성을 제공한다
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
다양한 언어의 JSON
JSON은 Javascript 표기법을 사용하여 데이터 구조를 레이아웃하는 데이터 형식입니다.
그러나 Javascript가 코드에서 이러한 구조를 나타낼 수 있는 유일한 언어는 아닙니다.
저는 일반적으로 '객체'{}...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class EmitLog {
private static final String EXCHANGE_NAME="logs";
public static void main(String[] args) throws java.io.IOException{
//
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
//
Connection connection = factory.newConnection();
//
Channel channel = connection.createChannel();
// Exchange
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String message = "Message "+Math.random();
// Exchange , Exchange
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println("[x] Sent '"+message+"'");
//
channel.close();
connection.close();
}
}
import java.io.IOException;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;
public class ReceiveLogs {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {
//
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
//
Connection connection = factory.newConnection();
//
Channel channel = connection.createChannel();
// Exchange
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// ,
String queueName = channel.queueDeclare().getQueue();
// Exchange
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
//
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
while (true) {
//
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
}
}
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
public class Publisher {
private static final String EXCHANGE_NAME="persi";// Exchange
private static final boolean durable = true;//
public static void main(String[] args) throws java.io.IOException {
ConnectionFactory factory = new ConnectionFactory();//
factory.setHost("localhost");
Connection connection = factory.newConnection();//
Channel channel = connection.createChannel();//
channel.exchangeDeclare(EXCHANGE_NAME, "fanout", durable);//
String message = "Hello Wrold "+Math.random();
//
channel.basicPublish(EXCHANGE_NAME, "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
System.out.println("[x] Sent '" + message + "'");
channel.close();
connection.close();
}
}
public class Subscriber {
//private static final String[] QUEUE_NAMES= {"que_001","que_002","que_003","que_004","que_005"};
private static final String[] QUEUE_NAMES= {"que_006","que_007","que_008","que_009","que_0010"};
public static void main(String[] args){
for(int i=0;i<QUEUE_NAMES.length;i++){
SubscriberThead sub = new SubscriberThead(QUEUE_NAMES[i]);
Thread t = new Thread(sub);
t.start();
}
}
}
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.AMQP.Queue.DeclareOk;
public class SubscriberThead implements Runnable {
private String queue_name = null;
private static final String EXCHANGE_NAME = "persi";//
private static final boolean durable = true;//
public SubscriberThead(String queue_name) {
this.queue_name = queue_name;
}
@Override
public void run() {
try{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout", durable);
DeclareOk ok = channel.queueDeclare(queue_name, durable, false,
false, null);
String queueName = ok.getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" ["+queue_name+"] Waiting for messages. To exit press CTRL+C");
channel.basicQos(1);//
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, false, consumer);
while (true) {
Thread.sleep(2000);
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" ["+queue_name+"] Received '" + message + "'");
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}catch(Exception e){
e.printStackTrace();
}
}
}
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
다양한 언어의 JSONJSON은 Javascript 표기법을 사용하여 데이터 구조를 레이아웃하는 데이터 형식입니다. 그러나 Javascript가 코드에서 이러한 구조를 나타낼 수 있는 유일한 언어는 아닙니다. 저는 일반적으로 '객체'{}...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.