RabbitMQ로 지연된 메시지를 구현하는 방법은 무엇입니까? 코드 예시
이를 위해 서버에서 스크립트를 자동으로 실행하는 cron-tasks 또는 node-schedule 패키지(Node.js용 작업 계획 라이브러리)를 사용할 수 있습니다.
그러나 이 두 솔루션에는 확장 문제가 있습니다.
여러 서버가 있으므로 어느 서버에서 작업을 실행할지 명확하지 않을 수 있습니다.
선택한 서버가 충돌할 수 있습니다.
리소스를 확보하기 위해 노드가 삭제될 수 있습니다.
여기서 가능한 솔루션 중 하나는 메시지 브로커인 RabbitMQ입니다. GitHub의 이 예제에서 전체 지연 메시지 구현 체계를 확인하십시오. 단계별로 자세히 설명하면 다음과 같습니다.
1. 2개의 교환기 생성: 일반 및 지연 교환기.
export const HELLO_EXCHANGE = Object.freeze({
name: 'hello',
type: 'direct',
options: {
durable: true,
},
queues: {},
});
export const HELLO_DELAYED_EXCHANGE = Object.freeze({
name: 'helloDelayed',
type: 'direct',
options: {
durable: true,
},
queues: {},
});
2. 각 교환기에서 바인딩 유형은 같지만 이름은 다른 대기열을 만듭니다.
HELLO_EXCHANGE의 경우:
queues: {
WORLD: {
name: 'hello.world', // subscribe to this queue
binding: 'hello.world',
options: {
durable: true,
},
},
},
HELLO_DELAYED_EXCHANGE의 경우:
queues: {
WORLD: {
name: 'helloDelayed.world',
binding: 'hello.world',
options: {
durable: true,
queueMode: 'lazy', // set the message to remain in the hard memory
},
}
지연 교환기 대기열의 경우 일반 대기열 이름으로 x-dead-letter-exchange 인수를 설정합니다. 이 인수는 처리되지 않은 경우 메시지를 이 교환기로 전송하도록 RabbitMQ 브로커에게 지시합니다.
arguments: {
'x-dead-letter-exchange': HELLO_EXCHANGE.name, // set the queue to transfer the message to once it's dead
}
3. 만료 기간이 있는 지연 교환기의 대기열에 메시지 게시
// services/base-service/src/broker/hello/publisher.ts
export const publishHelloDelayedWorld = createPublisher({
exchangeName: exchangeNameDelayed,
queue: WORLD_DELAYED,
expirationInMs: 30000, //set when the message dies (in 30s)
});
지연된 메시지가 만료되면 일반 교환기의 대기열로 이동합니다.
이제 일반 교환기 대기열에 대한 소비자만 설정하면 됩니다.
// services/base-service/src/broker/hello/consumer.ts
export const initHelloExchange = () => Promise.all([
createConsumer(
{
queueName: HELLO_EXCHANGE.queues.WORLD.name,
prefetch: 50,
log: true,
},
controller.consumeHelloWorld,
),
]);
// services/base-service/src/broker/hello/controller.ts
export const consumeHelloWorld: IBrokerHandler = async ({ payload }) => {
const result = await world({ name: payload.name });
logger.info(result.message);
// await publishHelloDelayedWorld({ name: payload.name }); // if you need to process the message again
};
이익!
작업을 주기적으로 실행해야 하는 경우 소비자 섹션의 끝에서 다시 지연된 교환기에 메시지를 게시합니다.
// await publishHelloDelayedWorld({ name: payload.name });
참고: RabbitMQ는 FIFO(선입선출)에서 작동하며 설정된 순서대로 명령을 처리합니다. 따라서 만료 시간이 1일인 지연 메시지와 만료 시간이 1분인 메시지를 동일한 대기열에 게시하면 첫 번째 메시지 다음에 두 번째 메시지가 처리되고 두 번째 메시지에 대한 대상 작업은 첫 번째 메시지 이후 1분 후에 발생합니다.
결과적으로 다음과 같은 결과를 얻게 됩니다.
1. 교환기 및 대기열 생성
// services/base-service/src/broker/const/exchanges.ts
export const HELLO_EXCHANGE = Object.freeze({
name: 'hello',
type: 'direct',
options: {
durable: true,
},
queues: {
WORLD: {
name: 'hello.world', // subscribe to this queue
binding: 'hello.world',
options: {
durable: true,
},
},
},
});
export const HELLO_DELAYED_EXCHANGE = Object.freeze({
name: 'helloDelayed',
type: 'direct',
options: {
durable: true,
queueMode: 'lazy', // specify that the hard memory must store this message
},
queues: {
WORLD: {
name: 'helloDelayed.world',
binding: 'hello.world',
options: {
durable: true,
queueMode: 'lazy', // specify that the hard memory must store this message arguments: {
'x-dead-letter-exchange': HELLO_EXCHANGE.name, // specify the queue to which the message must relocate after its death
},
},
},
},
});
2. 지연된 대기열에 메시지를 보낼 게시자를 추가합니다.
// services/base-service/src/broker/hello/publisher.ts
export const publishHelloDelayedWorld = createPublisher({
exchangeName: exchangeNameDelayed,
queue: WORLD_DELAYED,
expirationInMs: 30000, // set when the message dies (in 30s)
});
3. 일반 교환기 대기열에 대한 소비자 추가
// services/base-service/src/broker/hello/consumer.ts
export const initHelloExchange = () => Promise.all([
createConsumer(
{
queueName: HELLO_EXCHANGE.queues.WORLD.name,
prefetch: 50,
log: true,
},
controller.consumeHelloWorld,
),
]);
// services/base-service/src/broker/hello/controller.ts
export const consumeHelloWorld: IBrokerHandler = async ({ payload }) => {
const result = await world({ name: payload.name });
logger.info(result.message);
// await publishHelloDelayedWorld({ name: payload.name }); // if you need to process the message again
};
4. 이익!
이 작업을 수행하고 구현을 더 쉽게 만드는 plugin도 있습니다. 하나의 교환기, 하나의 대기열, 하나의 게시자 및 하나의 소비자만 생성합니다.
게시할 때 플러그인은 지연된 메시지를 처리하고 만료되면 메시지를 올바른 대기열로 전송합니다. 모두 자체적으로.
이 플러그인을 사용하면 예약된 메시지가 만료 시간 순서대로 처리됩니다. 즉, 1일 지연 메시지를 게시한 다음 1분 지연 메시지를 게시하면 두 번째 메시지가 첫 번째 메시지보다 먼저 처리됩니다.
// services/base-service/src/broker/const/exchanges.ts
export const HELLO_PLUGIN_DELAYED_EXCHANGE = Object.freeze({
name: 'helloPluginDelayed',
type: 'x-delayed-message', // specify the delayed queue
options: {
durable: true,
arguments: {
'x-delayed-type': 'direct', // set the recipient
},
},
queues: {
WORLD_PLUGIN_DELAYED: {
name: 'helloPluginDelayed.world', // subscribe to the queue
binding: 'helloPluginDelayed.world',
options: {
durable: true,
},
},
},
});
지연된 대기열에 메시지를 보내는 게시자를 추가합니다.
export const publishHelloPluginDelayedWorld = createPublisher({
exchangeName: exchangeNamePluginDelayed,
queue: WORLD_PLUGIN_DELAYED,
delayInMs: 60000, // specify when the message should die (60s)
});
대기열에 소비자를 추가합니다.
// services/base-service/src/broker/hello/consumer.ts
export const initHelloExchange = () => Promise.all([
createConsumer(
{
queueName: HELLO_PLUGIN_DELAYED_EXCHANGE.queues.WORLD_PLUGIN_DELAYED.name,
prefetch: 50,
log: true,
},
controller.consumeHelloWorld,
),
]);
// services/base-service/src/broker/hello/controller.ts
export const consumeHelloWorld: IBrokerHandler = async ({ payload }) => {
const result = await world({ name: payload.name });
logger.info(result.message);
};
Aaand - 완료!
우리는 프로젝트에서 정기적으로 RabbitMQ를 사용합니다. 예를 들어 Janson Media internet TV 포트폴리오에서 사용 사례를 확인하십시오. 영화 대여 서비스지만 디지털화한다.
여기서 우리는 앱의 3가지 필수 기능에 대해 RabbitMQ 지연 메시지를 사용했습니다. 예를 들어 임대 기간이 거의 끝났음을 사용자에게 알리기 위해 이메일 및 SMS 메시지 전송 완료된 지불에 대한 메시지를 소켓에 보내고 사용자에게 알림을 보냅니다. 추가 처리를 위해 업로드된 비디오를 전송합니다.
지연된 메시지를 구현하는 것이 더 이상 토끼 굴에 빠지는 것과 같지 않기를 바랍니다(그랬다면) 🙂
Reference
이 문제에 관하여(RabbitMQ로 지연된 메시지를 구현하는 방법은 무엇입니까? 코드 예시), 우리는 이곳에서 더 많은 자료를 발견하고 링크를 클릭하여 보았다 https://dev.to/forasoft/how-to-implement-delayed-messages-with-rabbitmq-code-examples-3ao0텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
우수한 개발자 콘텐츠 발견에 전념 (Collection and Share based on the CC Protocol.)