어떻게 파 이 썬 을 통 해 RabbitMQ 지연 대기 열 을 실현 합 니까?
지연 큐 의 기본 원리 Time To Live(TTL)
RabbitMQ 는 Queue 에 x-expires 를 설정 하거나 Message 에 x-message-ttl 을 설정 하여 메시지 의 생존 시간 을 제어 할 수 있 습 니 다.시간 초과(둘 이 동시에 가장 먼저 만 료 되 는 시간 을 기준 으로 설정)하면 메시지 가 dead letter(데 드 레 터)로 변 합 니 다.
RabbitMQ 메시지 의 만 료 시간 은 두 가지 방법 으로 설정 되 어 있 습 니 다.
대기 열(Queue)의 속성 설정 을 통 해 대기 열 에 있 는 모든 메시지 가 만 료 되 었 습 니 다.(이번 지연 대기 열 에서 사용 하 는 방안)메시지 에 대해 서 는 메시지 마다 TTL 이 다 를 수 있 습 니 다.
동시에 사용 하면 메시지 의 만 료 시간 은 둘 사이 의 TTL 이 비교적 작은 수 치 를 기준 으로 한다.메시지 가 대기 열 에 있 는 생존 시간 이 설정 한 TTL 값 을 초과 하면 데 드 레 터(dead letter)가 됩 니 다.
Dead Letter Exchanges(DLX)
RabbitMQ 의 Queue 는 x-dead-letter-exchange 와 x-dead-letter-routing-key(선택 가능)두 개의 인 자 를 설정 할 수 있 습 니 다.대기 열 에 dead letter 가 나타 나 면 이 두 매개 변수 에 따라 지정 한 대기 열 로 다시 전송 합 니 다.
대기 열 설계 및 구현 지연(Python)
위 에서 설명 한 바 와 같이 지연 대기 열의 실현 은 크게 두 단계 로 나 뉜 다.
데 드 메 시 지 를 생 성 합 니 다.Per-Message TTL 과 Queue TTL 두 가지 방식 이 있 습 니 다.제 요구 사항 은 모든 메시지 지연 처리 시간 이 같 기 때문에 본 실현 에 서 는 Queue TTL 로 대기 열 을 설정 하 는 TTL 을 사용 합 니 다.대기 열 에 있 는 메 시 지 를 다른 지연 처리 시간 으로 설정 하려 면 Per-Message TTL공식 문서을 설정 합 니 다.
죽은 편지 의 전송 규칙 설정,Dead Letter Exchange 설정 방법공식 문서
전체 코드 는 다음 과 같 습 니 다:
"""
Created on Fri Aug 3 17:00:44 2018
@author: Bge
"""
import pika,json,logging
class RabbitMQClient:
def __init__(self, conn_str='amqp://user:pwd@host:port/%2F'):
self.exchange_type = "direct"
self.connection_string = conn_str
self.connection = pika.BlockingConnection(pika.URLParameters(self.connection_string))
self.channel = self.connection.channel()
self._declare_retry_queue() #RetryQueue and RetryExchange
logging.debug("connection established")
def close_connection(self):
self.connection.close()
logging.debug("connection closed")
def declare_exchange(self, exchange):
self.channel.exchange_declare(exchange=exchange,
exchange_type=self.exchange_type,
durable=True)
def declare_queue(self, queue):
self.channel.queue_declare(queue=queue,
durable=True,)
def declare_delay_queue(self, queue,DLX='RetryExchange',TTL=60000):
"""
:param TTL: ttl us,ttl=60000 60s
:param queue:
:param DLX: exchange
:return:
"""
arguments={}
if DLX:
# exchange
arguments[ 'x-dead-letter-exchange']=DLX
if TTL:
arguments['x-message-ttl']=TTL
print(arguments)
self.channel.queue_declare(queue=queue,
durable=True,
arguments=arguments)
def _declare_retry_queue(self):
"""
, 。
:return:
"""
self.channel.exchange_declare(exchange='RetryExchange',
exchange_type='fanout',
durable=True)
self.channel.queue_declare(queue='RetryQueue',
durable=True)
self.channel.queue_bind('RetryQueue', 'RetryExchange','RetryQueue')
def publish_message(self,routing_key, msg,exchange='',delay=0,TTL=None):
"""
:param exchange: RabbitMQ
:param msg: , JSON
:return:
"""
if delay==0:
self.declare_queue(routing_key)
else:
self.declare_delay_queue(routing_key,TTL=TTL)
if exchange!='':
self.declare_exchange(exchange)
self.channel.basic_publish(exchange=exchange,
routing_key=routing_key,
body=msg,
properties=pika.BasicProperties(
delivery_mode=2,
type=exchange
))
self.close_connection()
print("message send out to %s" % exchange)
logging.debug("message send out to %s" % exchange)
def start_consume(self,callback,queue='#',delay=1):
"""
, RabbitMQ
:return:
"""
if delay==1:
queue='RetryQueue'
else:
self.declare_queue(queue)
self.channel.basic_qos(prefetch_count=1)
try:
self.channel.basic_consume( #
callback, # , callback
queue=queue, #
)
self.channel.start_consuming()
except KeyboardInterrupt:
self.stop_consuming()
def stop_consuming(self):
self.channel.stop_consuming()
self.close_connection()
def message_handle_successfully(channel, method):
"""
, ,
RabbitMQ ,
:param channel: channel
:param method: method
:return:
"""
channel.basic_ack(delivery_tag=method.delivery_tag)
def message_handle_failed(channel, method):
"""
, ,
:param channel: channel
:param method: method
:return:
"""
channel.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
발표 메시지 코드 는 다음 과 같 습 니 다.
from MQ.RabbitMQ import RabbitMQClient
print("start program")
client = RabbitMQClient()
msg1 = '{"key":"value"}'
client.publish_message('test-delay',msg1,delay=1,TTL=10000)
print("message send out")
소비자 코드 는 다음 과 같다.
from MQ.RabbitMQ import RabbitMQClient
import json
print("start program")
client = RabbitMQClient()
def callback(ch, method, properties, body):
msg = body.decode()
print(msg)
# , ack, 。
RabbitMQClient.message_handle_successfully(ch, method)
queue_name = "RetryQueue"
client.start_consume(callback,queue_name,delay=0)
이상 이 바로 본 고의 모든 내용 입 니 다.여러분 의 학습 에 도움 이 되 고 저 희 를 많이 응원 해 주 셨 으 면 좋 겠 습 니 다.
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Python의 None과 NULL의 차이점 상세 정보그래서 대상 = 속성 + 방법 (사실 방법도 하나의 속성, 데이터 속성과 구별되는 호출 가능한 속성 같은 속성과 방법을 가진 대상을 클래스, 즉 Classl로 분류할 수 있다.클래스는 하나의 청사진과 같아서 하나의 ...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.