어떻게 파 이 썬 을 통 해 RabbitMQ 지연 대기 열 을 실현 합 니까?
지연 큐 의 기본 원리 Time To Live(TTL)
RabbitMQ 는 Queue 에 x-expires 를 설정 하거나 Message 에 x-message-ttl 을 설정 하여 메시지 의 생존 시간 을 제어 할 수 있 습 니 다.시간 초과(둘 이 동시에 가장 먼저 만 료 되 는 시간 을 기준 으로 설정)하면 메시지 가 dead letter(데 드 레 터)로 변 합 니 다.
RabbitMQ 메시지 의 만 료 시간 은 두 가지 방법 으로 설정 되 어 있 습 니 다.
대기 열(Queue)의 속성 설정 을 통 해 대기 열 에 있 는 모든 메시지 가 만 료 되 었 습 니 다.(이번 지연 대기 열 에서 사용 하 는 방안)메시지 에 대해 서 는 메시지 마다 TTL 이 다 를 수 있 습 니 다.
동시에 사용 하면 메시지 의 만 료 시간 은 둘 사이 의 TTL 이 비교적 작은 수 치 를 기준 으로 한다.메시지 가 대기 열 에 있 는 생존 시간 이 설정 한 TTL 값 을 초과 하면 데 드 레 터(dead letter)가 됩 니 다.
Dead Letter Exchanges(DLX)
RabbitMQ 의 Queue 는 x-dead-letter-exchange 와 x-dead-letter-routing-key(선택 가능)두 개의 인 자 를 설정 할 수 있 습 니 다.대기 열 에 dead letter 가 나타 나 면 이 두 매개 변수 에 따라 지정 한 대기 열 로 다시 전송 합 니 다.
 
 대기 열 설계 및 구현 지연(Python)
위 에서 설명 한 바 와 같이 지연 대기 열의 실현 은 크게 두 단계 로 나 뉜 다.
데 드 메 시 지 를 생 성 합 니 다.Per-Message TTL 과 Queue TTL 두 가지 방식 이 있 습 니 다.제 요구 사항 은 모든 메시지 지연 처리 시간 이 같 기 때문에 본 실현 에 서 는 Queue TTL 로 대기 열 을 설정 하 는 TTL 을 사용 합 니 다.대기 열 에 있 는 메 시 지 를 다른 지연 처리 시간 으로 설정 하려 면 Per-Message TTL공식 문서을 설정 합 니 다.
죽은 편지 의 전송 규칙 설정,Dead Letter Exchange 설정 방법공식 문서
전체 코드 는 다음 과 같 습 니 다:
"""
Created on Fri Aug 3 17:00:44 2018
@author: Bge
"""
import pika,json,logging
class RabbitMQClient:
  def __init__(self, conn_str='amqp://user:pwd@host:port/%2F'):
    self.exchange_type = "direct"
    self.connection_string = conn_str
    self.connection = pika.BlockingConnection(pika.URLParameters(self.connection_string))
    self.channel = self.connection.channel()
    self._declare_retry_queue() #RetryQueue and RetryExchange
    logging.debug("connection established")
  def close_connection(self):
    self.connection.close()
    logging.debug("connection closed")
  def declare_exchange(self, exchange):
    self.channel.exchange_declare(exchange=exchange,
                   exchange_type=self.exchange_type,
                   durable=True)
  def declare_queue(self, queue):
    self.channel.queue_declare(queue=queue,
                  durable=True,)
  def declare_delay_queue(self, queue,DLX='RetryExchange',TTL=60000):
    """
          
    :param TTL: ttl    us,ttl=60000    60s
    :param queue:
    :param DLX:     exchange
    :return:
    """
    arguments={}
    if DLX:
      #       exchange
      arguments[ 'x-dead-letter-exchange']=DLX
    if TTL:
      arguments['x-message-ttl']=TTL
    print(arguments)
    self.channel.queue_declare(queue=queue,
                  durable=True,
                  arguments=arguments)
  def _declare_retry_queue(self):
    """
              ,             。
    :return:
    """
    self.channel.exchange_declare(exchange='RetryExchange',
                   exchange_type='fanout',
                   durable=True)
    self.channel.queue_declare(queue='RetryQueue',
                  durable=True)
    self.channel.queue_bind('RetryQueue', 'RetryExchange','RetryQueue')
  def publish_message(self,routing_key, msg,exchange='',delay=0,TTL=None):
    """
               
    :param exchange: RabbitMQ   
    :param msg:     ,       JSON   
    :return:
    """
    if delay==0:
      self.declare_queue(routing_key)
    else:
      self.declare_delay_queue(routing_key,TTL=TTL)
    if exchange!='':
      self.declare_exchange(exchange)
    self.channel.basic_publish(exchange=exchange,
                  routing_key=routing_key,
                  body=msg,
                  properties=pika.BasicProperties(
                    delivery_mode=2,
                    type=exchange
                  ))
    self.close_connection()
    print("message send out to %s" % exchange)
    logging.debug("message send out to %s" % exchange)
  def start_consume(self,callback,queue='#',delay=1):
    """
         ,    RabbitMQ    
    :return:
    """
    if delay==1:
      queue='RetryQueue'
    else:
      self.declare_queue(queue)
    self.channel.basic_qos(prefetch_count=1)
    try:
      self.channel.basic_consume( #     
        callback, #       ,   callback       
        queue=queue, #            
      )
      self.channel.start_consuming()
    except KeyboardInterrupt:
      self.stop_consuming()
  def stop_consuming(self):
    self.channel.stop_consuming()
    self.close_connection()
  def message_handle_successfully(channel, method):
    """
              ,       ,
      RabbitMQ          ,             
    :param channel:      channel  
    :param method:      method  
    :return:
    """
    channel.basic_ack(delivery_tag=method.delivery_tag)
  def message_handle_failed(channel, method):
    """
            ,       ,            
    :param channel:      channel  
    :param method:      method  
    :return:
    """
    channel.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
from MQ.RabbitMQ import RabbitMQClient
print("start program")
client = RabbitMQClient()
msg1 = '{"key":"value"}'
client.publish_message('test-delay',msg1,delay=1,TTL=10000)
print("message send out")
from MQ.RabbitMQ import RabbitMQClient
import json
print("start program")
client = RabbitMQClient()
def callback(ch, method, properties, body):
    msg = body.decode()
    print(msg)
    #       ,        ack,          。
    RabbitMQClient.message_handle_successfully(ch, method)
queue_name = "RetryQueue"
client.start_consume(callback,queue_name,delay=0)이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Python의 None과 NULL의 차이점 상세 정보그래서 대상 = 속성 + 방법 (사실 방법도 하나의 속성, 데이터 속성과 구별되는 호출 가능한 속성 같은 속성과 방법을 가진 대상을 클래스, 즉 Classl로 분류할 수 있다.클래스는 하나의 청사진과 같아서 하나의 ...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.