RabbitMQ 단순 대기 열 인 스 턴 스 및 원리 분석
RabbitMQ 약술
RabbitMQ 는 메시지 에이전트 입 니 다.메 시 지 를 받 아들 이 고 전달 합 니 다.우체국 으로 볼 수 있 습 니 다.보 낸 메 일 을 메 일 로 보 낼 때 Postman 씨 가 최종 적 으로 받 는 사람 에 게 메 일 을 보 낼 것 이 라 고 확신 할 수 있 습 니 다.이 비유 에서 RabbitMQ 는 메 일 박스,우체국,우 체 부 로 바 이 너 리 데이터 블록 을 받 아들 이 고 저장 하 며 전달 하 는 데 사용 된다.
대열 은 마치 RabbitMQ 에서 메 일 박스 역할 을 하 는 것 과 같다.메 시 지 는 RabbitMQ 와 응용 프로그램 을 거 쳤 지만 대기 열 에 만 저장 할 수 있 습 니 다.대기 열 은 호스트 의 메모리 와 디스크 에 만 제한 되 어 있 으 며,본질 적 으로 큰 메시지 버퍼 입 니 다.많은 생산자 들 이 한 대열 에 메 시 지 를 보 낼 수 있 고,많은 소비자 들 이 한 대열 에서 데 이 터 를 받 으 려 고 시도 할 수 있다.
producer 는 생산자 로 메 시 지 를 대기 열 에 보 내 는 데 사용 합 니 다.consumer 는 소비자 이 므 로 대열 안의 소식 을 읽 어야 한다.producer,consumer 와 broker(rabbitMQ server)는 같은 호스트 에 머 물 필요 가 없습니다.확실히 대부분의 응용 프로그램 에서 그것들 은 이렇게 분포 되 어 있다.
단순 대기 열
간단 한 대열 은 가장 간단 한 모델 로 생산자,대열,소비자 로 구성 된다.생산 자 는 메 시 지 를 대기 열 에 보 내 고 소비 자 는 대기 열 에서 메 시 지 를 읽 어 소 비 를 완성 합 니 다.
아래 그림 에서'P'는 우리 의 생산자 이 고'C'는 우리 의 소비자 이다.가운데 상 자 는 대기 열-RabbitMQ 가 소비 자 를 대표 하 는 메시지 버퍼 입 니 다.
자바 방식
생산자
package com.anqi.mq.nat;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class MyProducer {
private static final String QUEUE_NAME = "ITEM_QUEUE";
public static void main(String[] args) throws Exception {
//1. ConnectionFactory
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
//2.
Connection connection = factory.newConnection();
//3. Connection Channel
Channel channel = connection.createChannel();
// , json
String msg = "hello";
//4.
for (int i = 1; i <= 3 ; i++) {
channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
System.out.println("Send message" + i +" : " + msg);
}
//5.
channel.close();
connection.close();
}
}
/**
* Declare a queue
* @param queue the name of the queue
* @param durable true if we are declaring a durable queue (the queue will survive a server restart)
* @param exclusive true if we are declaring an exclusive queue (restricted to this connection)
* @param autoDelete true if we are declaring an autodelete queue (server will delete it when no longer in use)
* @param arguments other properties (construction arguments) for the queue
* @return a declaration-confirm method to indicate the queue was successfully declared
* @throws java.io.IOException if an error is encountered
*/
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,Map<String, Object> arguments) throws IOException;
/**
* Publish a message
* @see com.rabbitmq.client.AMQP.Basic.Publish
* @param exchange the exchange to publish the message to
* @param routingKey the routing key
* @param props other properties for the message - routing headers etc
* @param body the message body
* @throws java.io.IOException if an error is encountered
*/
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
/**
* Start a non-nolocal, non-exclusive consumer, with
* a server-generated consumerTag.
* @param queue the name of the queue
* @param autoAck true if the server should consider messages
* acknowledged once delivered; false if the server should expect
* explicit acknowledgements
* @param callback an interface to the consumer object
* @return the consumerTag generated by the server
* @throws java.io.IOException if an error is encountered
* @see com.rabbitmq.client.AMQP.Basic.Consume
* @see com.rabbitmq.client.AMQP.Basic.ConsumeOk
* @see #basicConsume(String, boolean, String, boolean, boolean, Map, Consumer)
*/
String basicConsume(String queue, boolean autoAck, Consumer callback) throws IOException;
소비자
package com.anqi.mq.nat;
import com.rabbitmq.client.*;
import java.io.IOException;
public class MyConsumer {
private static final String QUEUE_NAME = "ITEM_QUEUE";
public static void main(String[] args) throws Exception {
//1. ConnectionFactory
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
//2.
Connection connection = factory.newConnection();
//3. Connection Channel
Channel channel = connection.createChannel();
//4.
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
/*
true: , , ,
false: , , , ,
, , , ,
。
*/
//5.
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
//6. Channel
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
Send message1 : hello
Send message2 : hello
Send message3 : hello
[*] Waiting for messages. To exit press CTRL+C
[x] Received 'hello'
[x] Received 'hello'
[x] Received 'hello'
우리 가 생산 자 를 시작 한 후에 RabbitMQ 관리 배경 을 보면 소 비 를 기다 리 고 있다 는 소식 을 볼 수 있다.우리 가 소비 자 를 시작 한 후에 다시 살 펴 보면 밀 린 소식 이 이미 소비 되 었 음 을 볼 수 있다.
총결산
Maven 의존 도입
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.6.0</version>
</dependency>
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit</artifactId>
<version>2.1.5.RELEASE</version>
</dependency>
spring 프로필
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/rabbit
https://www.springframework.org/schema/rabbit/spring-rabbit.xsd
http://www.springframework.org/schema/beans
https://www.springframework.org/schema/beans/spring-beans.xsd">
<rabbit:connection-factory id="connectionFactory" host="localhost" virtual-host="/"
username="guest" password="guest"/>
<rabbit:template id="amqpTemplate" connection-factory="connectionFactory"/>
<rabbit:admin connection-factory="connectionFactory"/>
<rabbit:queue name="MY-QUEUE"/>
</beans>
사용 테스트
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
public class Main {
public static void main(String[] args) {
ApplicationContext app = new ClassPathXmlApplicationContext("spring/rabbit-context.xml");
AmqpTemplate amqpTemplate = app.getBean(AmqpTemplate.class);
amqpTemplate.convertAndSend("MY-QUEUE", "Item");
String msg = (String) amqpTemplate.receiveAndConvert("MY-QUEUE");
System.out.println(msg);
}
}
참고 방법
/**
* Convert a Java object to an Amqp {@link Message} and send it to a specific exchange
* with a specific routing key.
*
* @param exchange the name of the exchange
* @param routingKey the routing key
* @param message a message to send
* @throws AmqpException if there is a problem
*/
void convertAndSend(String exchange, String routingKey, Object message) throws AmqpException;
/**
* Receive a message if there is one from a specific queue and convert it to a Java
* object. Returns immediately, possibly with a null value.
*
* @param queueName the name of the queue to poll
* @return a message or null if there is none waiting
* @throws AmqpException if there is a problem
*/
@Nullable
Object receiveAndConvert(String queueName) throws AmqpException;
이상 이 바로 본 고의 모든 내용 입 니 다.여러분 의 학습 에 도움 이 되 고 저 희 를 많이 응원 해 주 셨 으 면 좋 겠 습 니 다.
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
RPM 패키지 설치 RabbitMQRabbitMQ 의 설 치 는 매우 간단 하 다. RabbitMQ 는 Erlang 에 의존 하기 때문에 Erlang 을 먼저 설치 하고 의존 관 계 를 해결 한 후에 RabbitMQ 를 설치 할 수 있다.Erlang...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.