어떻게 파 이 썬 을 통 해 RabbitMQ 지연 대기 열 을 실현 합 니까?

최근 에 작업 을 할 때 지연 처리 가 필요 한 데 이 터 를 만 났 습 니 다.처음에 데 이 터 를 데이터베이스 에 저장 한 다음 에 스 크 립 트 를 작성 하고 5 분 간격 으로 데이터 시트 를 스 캔 한 다음 에 데 이 터 를 처리 하 는 것 이 었 습 니 다.실제 효 과 는 좋 지 않 았 습 니 다.시스템 자체 가 RabbitMQ 로 비동기 처리 작업 의 미들웨어 를 해 왔 기 때문에 RabbitMQ 를 이용 하여 지연 대기 열 을 실현 할 수 있 을 지 생각 합 니 다.쿵 푸 는 마음 이 있 는 사람 을 저 버 리 지 않 습 니 다.RabbitMQ 는 이미 사용 가능 한 지연 대기 열 이 없 지만 그 두 가지 중요 한 특성 을 이용 하여 이 를 실현 할 수 있 습 니 다.1.Time To Live(TTL)메시지 시간 초과 체제 입 니 다.2.Dead Letter Exchange(DLX)데 드 메시지 대기 열.다음은 실현 원리 와 실현 대 를 구체 적 으로 묘사 할 것 이다.
지연 큐 의 기본 원리 Time To Live(TTL)
RabbitMQ 는 Queue 에 x-expires 를 설정 하거나 Message 에 x-message-ttl 을 설정 하여 메시지 의 생존 시간 을 제어 할 수 있 습 니 다.시간 초과(둘 이 동시에 가장 먼저 만 료 되 는 시간 을 기준 으로 설정)하면 메시지 가 dead letter(데 드 레 터)로 변 합 니 다.
RabbitMQ 메시지 의 만 료 시간 은 두 가지 방법 으로 설정 되 어 있 습 니 다.
대기 열(Queue)의 속성 설정 을 통 해 대기 열 에 있 는 모든 메시지 가 만 료 되 었 습 니 다.(이번 지연 대기 열 에서 사용 하 는 방안)메시지 에 대해 서 는 메시지 마다 TTL 이 다 를 수 있 습 니 다.
동시에 사용 하면 메시지 의 만 료 시간 은 둘 사이 의 TTL 이 비교적 작은 수 치 를 기준 으로 한다.메시지 가 대기 열 에 있 는 생존 시간 이 설정 한 TTL 값 을 초과 하면 데 드 레 터(dead letter)가 됩 니 다.
Dead Letter Exchanges(DLX)
RabbitMQ 의 Queue 는 x-dead-letter-exchange 와 x-dead-letter-routing-key(선택 가능)두 개의 인 자 를 설정 할 수 있 습 니 다.대기 열 에 dead letter 가 나타 나 면 이 두 매개 변수 에 따라 지정 한 대기 열 로 다시 전송 합 니 다.
  • x-dead-letter-exchange:죽은 편지(dead letter)가 나타 나 면 dead letter 를 지정 한 exchange
  • 에 다시 보 냅 니 다.
  • x-dead-letter-routing-key:죽은 편지(dead letter)가 나타 나 면 dead letter 를 지정 한 routing-key 에 따라 다시 보 냅 니 다
  • 대기 열 에 죽은 편지(dead letter)가 나타 나 는 경우:
  • 메시지 나 대기 열의 TTL 이 만 료 되 었 습 니 다.(대기 열 이용 지연 특성)
  • 대열 의 최대 길이
  • 메시지 가 소비 자 에 의 해 거부 되 었 습 니 다(basic.reject or basic.nack).그리고 requeue=false
  • 위의 두 가지 특성 을 종합해 볼 때,대기 열 을 TTL 규칙 으로 설정 하면,대기 열 TTL 이 만 료 되면 메시지 가 다운 되 고,DLX 특성 을 이용 하여 다른 교환기 와 대기 열 로 전송 하면 다시 소비 되 어 소비 지연 효 과 를 얻 을 수 있 습 니 다.

    대기 열 설계 및 구현 지연(Python)
    위 에서 설명 한 바 와 같이 지연 대기 열의 실현 은 크게 두 단계 로 나 뉜 다.
    데 드 메 시 지 를 생 성 합 니 다.Per-Message TTL 과 Queue TTL 두 가지 방식 이 있 습 니 다.제 요구 사항 은 모든 메시지 지연 처리 시간 이 같 기 때문에 본 실현 에 서 는 Queue TTL 로 대기 열 을 설정 하 는 TTL 을 사용 합 니 다.대기 열 에 있 는 메 시 지 를 다른 지연 처리 시간 으로 설정 하려 면 Per-Message TTL공식 문서을 설정 합 니 다.
    죽은 편지 의 전송 규칙 설정,Dead Letter Exchange 설정 방법공식 문서
    전체 코드 는 다음 과 같 습 니 다:
    
    """
    Created on Fri Aug 3 17:00:44 2018
    
    @author: Bge
    """
    import pika,json,logging
    class RabbitMQClient:
      def __init__(self, conn_str='amqp://user:pwd@host:port/%2F'):
        self.exchange_type = "direct"
        self.connection_string = conn_str
        self.connection = pika.BlockingConnection(pika.URLParameters(self.connection_string))
        self.channel = self.connection.channel()
        self._declare_retry_queue() #RetryQueue and RetryExchange
        logging.debug("connection established")
      def close_connection(self):
        self.connection.close()
        logging.debug("connection closed")
      def declare_exchange(self, exchange):
        self.channel.exchange_declare(exchange=exchange,
                       exchange_type=self.exchange_type,
                       durable=True)
      def declare_queue(self, queue):
        self.channel.queue_declare(queue=queue,
                      durable=True,)
      def declare_delay_queue(self, queue,DLX='RetryExchange',TTL=60000):
        """
              
        :param TTL: ttl    us,ttl=60000    60s
        :param queue:
        :param DLX:     exchange
        :return:
        """
        arguments={}
        if DLX:
          #       exchange
          arguments[ 'x-dead-letter-exchange']=DLX
        if TTL:
          arguments['x-message-ttl']=TTL
        print(arguments)
        self.channel.queue_declare(queue=queue,
                      durable=True,
                      arguments=arguments)
      def _declare_retry_queue(self):
        """
                  ,             。
        :return:
        """
        self.channel.exchange_declare(exchange='RetryExchange',
                       exchange_type='fanout',
                       durable=True)
        self.channel.queue_declare(queue='RetryQueue',
                      durable=True)
        self.channel.queue_bind('RetryQueue', 'RetryExchange','RetryQueue')
      def publish_message(self,routing_key, msg,exchange='',delay=0,TTL=None):
        """
                   
        :param exchange: RabbitMQ   
        :param msg:     ,       JSON   
        :return:
        """
        if delay==0:
          self.declare_queue(routing_key)
        else:
          self.declare_delay_queue(routing_key,TTL=TTL)
        if exchange!='':
          self.declare_exchange(exchange)
        self.channel.basic_publish(exchange=exchange,
                      routing_key=routing_key,
                      body=msg,
                      properties=pika.BasicProperties(
                        delivery_mode=2,
                        type=exchange
                      ))
        self.close_connection()
        print("message send out to %s" % exchange)
        logging.debug("message send out to %s" % exchange)
      def start_consume(self,callback,queue='#',delay=1):
        """
             ,    RabbitMQ    
        :return:
        """
        if delay==1:
          queue='RetryQueue'
        else:
          self.declare_queue(queue)
        self.channel.basic_qos(prefetch_count=1)
        try:
          self.channel.basic_consume( #     
            callback, #       ,   callback       
            queue=queue, #            
          )
          self.channel.start_consuming()
        except KeyboardInterrupt:
          self.stop_consuming()
      def stop_consuming(self):
        self.channel.stop_consuming()
        self.close_connection()
      def message_handle_successfully(channel, method):
        """
                  ,       ,
          RabbitMQ          ,             
        :param channel:      channel  
        :param method:      method  
        :return:
        """
        channel.basic_ack(delivery_tag=method.delivery_tag)
      def message_handle_failed(channel, method):
        """
                ,       ,            
        :param channel:      channel  
        :param method:      method  
        :return:
        """
        channel.basic_reject(delivery_tag=method.delivery_tag, requeue=False)
    발표 메시지 코드 는 다음 과 같 습 니 다.
    
    from MQ.RabbitMQ import RabbitMQClient
    print("start program")
    client = RabbitMQClient()
    msg1 = '{"key":"value"}'
    client.publish_message('test-delay',msg1,delay=1,TTL=10000)
    print("message send out")
    소비자 코드 는 다음 과 같다.
    
    from MQ.RabbitMQ import RabbitMQClient
    import json
    print("start program")
    client = RabbitMQClient()
    def callback(ch, method, properties, body):
        msg = body.decode()
        print(msg)
        #       ,        ack,          。
        RabbitMQClient.message_handle_successfully(ch, method)
    queue_name = "RetryQueue"
    client.start_consume(callback,queue_name,delay=0)
    이상 이 바로 본 고의 모든 내용 입 니 다.여러분 의 학습 에 도움 이 되 고 저 희 를 많이 응원 해 주 셨 으 면 좋 겠 습 니 다.

    좋은 웹페이지 즐겨찾기