RabbitMQ 메시지 게시 구독 및 정보 지속성 기술

7301 단어



정보 게시 및 구독


Rabbit의 핵심 구성 요소는Queue(메시지 대기열)와 Exchanges 두 부분을 포함한다. Exchange의 주요 부분은 정보에 대한 루트이다. 메시지 대기열을 Exchange에 연결하면 구독 형식의 메시지 발표와 Publish/Subscribe를 실현할 수 있다. 이 모델에서 메시지 발표자는 정보를 해당하는 Exchange에 발표하기만 하면 되고 Exchange는 자동으로 다른 Queue에 정보를 나누어 준다.
이 모드에서 Exchange가 수행하는 역할
명령줄에서 사용 가능
    sudo rabbitmqctl list_exchanges
    sudo rabbitmqctl list_bindings
현재 시스템 종류에 존재하는 Exchange와 Exchange에 바인딩된Queue 정보를 각각 보십시오.
메시지 게시자 EmitLog.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class EmitLog {

	private static final String  EXCHANGE_NAME="logs";
	
	public static void main(String[] args) throws java.io.IOException{
		
		// 
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		// 
		Connection connection = factory.newConnection();
		
		// 
		Channel channel = connection.createChannel();
		
		// Exchange  
		channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
		
		String message = "Message "+Math.random();
		
		// Exchange , Exchange
		channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
		System.out.println("[x] Sent '"+message+"'");
		
		// 
		channel.close();
		connection.close();
		
	}
	
}

메시지 소비자ReceiveLogs.java
import java.io.IOException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConsumerCancelledException;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.ShutdownSignalException;

public class ReceiveLogs {

	private static final String EXCHANGE_NAME = "logs";

	public static void main(String[] args) throws IOException, ShutdownSignalException, ConsumerCancelledException, InterruptedException {

		// 
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		// 
		Connection connection = factory.newConnection();
		
		// 
		Channel channel = connection.createChannel();

		// Exchange
		channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
		
		// , 
		String queueName = channel.queueDeclare().getQueue();

		// Exchange
		channel.queueBind(queueName, EXCHANGE_NAME, "");

		System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

		// 
		QueueingConsumer consumer = new QueueingConsumer(channel);
		channel.basicConsume(queueName, true, consumer);

		while (true) {
			
			// 
			QueueingConsumer.Delivery delivery = consumer.nextDelivery();
			String message = new String(delivery.getBody());
			System.out.println(" [x] Received '" + message + "'");
			
		}

	}

}

실행 시 EmitLog를 시작합니다.java 여러 ReceiveLogs.자바는 발표자가 매번 정보를 발표하는 것을 볼 수 있으며 해당 Exchange에 귀속된 소비자만 정보를 얻을 수 있다.

RabbitMQ 정보 지속화 기술


위의 예에서 우리는 Publisher/Subscribe의 메시지 배달 방식을 실현했지만 그 중에서 몇 가지 문제가 존재한다.예를 들어 우리가 ReceiveLog를 실행할 때 특정한 메시지 대기열에 대응하여list_를 이용할 수 있다이 메시지 대기열은 logs라는 Exchange에 도움이 됩니다. 이것은 메시지를 발표하는 모든 소비자들이 수신할 수 있습니다. Receive Log 프로그램을 닫으면 이 메시지 대기열은 자동으로 삭제됩니다. 왜냐하면 그들은 비지구적이기 때문입니다.EmitLog 프로그램과 마찬가지로 매번 종료될 때마다 이전 생명의 Exchange도 자동으로 제거됩니다.
이로 인해 몇 가지 문제가 생겼다.ReceiveLog가 실행될 때 Exchange에 연결된 메시지 대기열이 없습니다. 메시지를 발표한 후에 ReceiveLog 프로그램을 시작하면 이전에 발표된 정보를 받아들일 수 없습니다.이것이 바로 소식의 지속화를 진행하는 이유다.
지구화 기술을 통해 우리는 지구화된 Exchange와 지구화된Queue를 생명할 수 있다. 이렇게 하면Queue를 Exchange에 연결한 후에 소비자 프로그램이 실행되지 않아도 정보는Queue에 저장될 수 있다. 다음에 소비자 프로그램을 시작할 때 발표된 모든 정보를 얻을 수 있다.예를 들어 소비자 프로그램이 메시지 서열의 임무를 수행할 때 갑자기 이상이 발생하면 다시 시작한 후에도 지난번에 발생한 오류의 위치에서 계속 실행할 수 있다는 점에서 질서적이고 연속적인 조작이 필요하다는 점이 특히 중요하다.
다음은 지속화 과정에서list_를 빌려exchanges,list_bindings,list_queues는 서버의 관련 정보를 보고 그룹 분석 과정을 도와줍니다.
    Publisher.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;

public class Publisher {
	
	private static final String  EXCHANGE_NAME="persi";// Exchange 
	private static final boolean durable = true;// 
	
	public static void main(String[] args) throws java.io.IOException {

		ConnectionFactory factory = new ConnectionFactory();// 
		factory.setHost("localhost");
		Connection connection = factory.newConnection();// 
		Channel channel = connection.createChannel();// 
                
		channel.exchangeDeclare(EXCHANGE_NAME, "fanout", durable);// 

		String message = "Hello Wrold "+Math.random();
                // 
		channel.basicPublish(EXCHANGE_NAME, "", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
		
		System.out.println("[x] Sent '" + message + "'");

		channel.close();
		connection.close();

	}
	
}

    Subscriber.java
public class Subscriber {

	
	//private static final String[] QUEUE_NAMES= {"que_001","que_002","que_003","que_004","que_005"};
	private static final String[] QUEUE_NAMES= {"que_006","que_007","que_008","que_009","que_0010"};
	
	public static void main(String[] args){

		for(int i=0;i<QUEUE_NAMES.length;i++){
			
			SubscriberThead sub = new SubscriberThead(QUEUE_NAMES[i]);
			Thread t = new Thread(sub);
			t.start();
			
		}
		
	}
}

    SubscriberThead.java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import com.rabbitmq.client.AMQP.Queue.DeclareOk;

public class SubscriberThead implements Runnable {

	private String queue_name = null;
	private static final String EXCHANGE_NAME = "persi";//  
	private static final boolean durable = true;// 
	
	public SubscriberThead(String queue_name) {
		
		this.queue_name = queue_name;
	
	}

	@Override
	public void run() {

		try{
		
		ConnectionFactory factory = new ConnectionFactory();
		factory.setHost("localhost");
		Connection connection = factory.newConnection();
		Channel channel = connection.createChannel();

		channel.exchangeDeclare(EXCHANGE_NAME, "fanout", durable);

		DeclareOk ok = channel.queueDeclare(queue_name, durable, false,
				false, null);
		String queueName = ok.getQueue();
		

		channel.queueBind(queueName, EXCHANGE_NAME, "");

		System.out.println(" ["+queue_name+"] Waiting for messages. To exit press CTRL+C");

		channel.basicQos(1);// 
		QueueingConsumer consumer = new QueueingConsumer(channel);
		channel.basicConsume(queueName, false, consumer);

		while (true) {

			Thread.sleep(2000);
			QueueingConsumer.Delivery delivery = consumer.nextDelivery();
			String message = new String(delivery.getBody());
			System.out.println(" ["+queue_name+"] Received '" + message + "'");
			channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

		}
		}catch(Exception e){
			
			e.printStackTrace();
		}
		

	}

}

지속적인 처리를 통해 rabbitMQ는 Exchange 정보와Queue 정보를 저장하고, 심지어rabbitMQ 서버가 닫힌 후에도 정보를 저장할 수 있어 메시지 전달의 신뢰성을 제공한다

좋은 웹페이지 즐겨찾기