PHP에서 RabbitMQ 메시지 사용 지연

9354 단어 rabbitmqtutorialphp
이러한 상황을 가정해 보겠습니다. 어떤 작업을 수행한 후 대기열에 메시지를 보냈습니다. 처리 중에 소비자는 외부 API에 요청합니다. 놀랍게도 API는 5xx 코드로 응답합니다. 그런 상황에서 API 제공자 측의 오류가 더 이상 발생하지 않고 요청을 완료할 수 있도록 잠시 후 요청을 반복하는 것이 좋습니다. 그것을하는 방법?

RabbitMQ에서는 메시지 소비를 지연시킬 수 있습니다. 두 가지 방법이 있습니다. 첫 번째는 RabbitMQ 버전 3.5.3부터 사용할 수 있는 추가 플러그인을 설치해야 합니다. 두 번째는 일종의 해결 방법이며 추가 도구가 필요하지 않습니다.

RabbitMQ 지연 메시지 플러그인



플러그인은 새로운 교환 유형을 추가합니다: x-delayed-message . 밀리초 단위의 값이 있는 x-delay 헤더와 함께 메시지를 보냅니다. 이 시간이 지나면 메시지가 대기열에 나타납니다.

교환을 선언할 때 원래 교환의 유형과 함께 x-delayed-type 헤더를 전달합니다. 직접 또는 주제. 이 인수는 필수입니다.

전송된 메시지는 Mnesia라는 특수 데이터베이스에 저장됩니다. 현재 노드의 단일 디스크 복제본에 저장됩니다. 이 경우 RAM 노드는 지원되지 않습니다.

장점:
  • 메시지 지연을 지원하는 전용 교환기
  • 사용자가 제한 사항을 알고 있는 한 프로덕션 사용에 잠재적으로 적합함

  • 단점:
  • 플러그인이 아직 실험용으로 간주됨
  • 설치에 필요합니다
  • 은 최신 RabbitMQ 버전
  • 에서만 작동합니다.
  • 네이티브 솔루션
  • 보다 약간 느림

    예시:

    $connection = new AMQPStreamConnection(HOST, PORT, USER, PASS, VHOST);
    $channel = $connection->channel();
    
    $channel->queue_declare(
        queue: 'example_queue',
        durable: true,
        auto_delete: false
    );
    
    $channel->exchange_declare(
        exchange: 'example_exchange',
        type: 'x-delayed-message',
        durable: true,
        auto_delete: false,
        arguments: new AMQPTable(
            [
                'x-delayed-type' => 'direct'
            ]
        )
    );
    
    $channel->queue_bind('example_queue', 'example_exchange');
    
    $message = new AMQPMessage('test', [
            'application_headers' => new AMQPTable(['x-delay' => 10000])
        ]);
    
    $channel->basic_publish($message, 'example_exchange');
    


    배달 못한 편지 대기열



    이 솔루션은 모든 버전의 RabbitMQ에서 사용할 수 있습니다. 두 개의 대기열을 만들려고 합니다.

    첫 번째는 세 가지 매개변수로 생성됩니다.
  • x-dead-letter-exchange
  • x-dead-letter-routing-key
  • x-메시지-ttl

  • 우리는 이 대기열에 메시지를 게시하지만 메시지를 소비하지는 않습니다.

    매개변수x-message-ttl는 메시지가 해당 대기열에 보관되는 시간(밀리초)을 정의합니다. 그 시간이 지나면 메시지는 x-dead-letter-exchange 인수에 선언된 교환으로 이동되고 라우팅은 x-dead-letter-routing-key에 선언됩니다.

    결과적으로 첫 번째 대기열에서 종료된 모든 메시지는 두 번째 대기열로 전달됩니다.

    장점:
  • 추가 플러그인 설치가 필요하지 않음
  • 네이티브 솔루션
  • 은 이전 RabbitMQ 버전
  • 에서도 작동합니다.

    단점:
  • 중복 대기열 생성 필요
  • ttl을 변경하려면 대기열을 다시 만들어야 함

  • 예시:

    $connection = new AMQPStreamConnection(HOST, PORT, USER, PASS, VHOST);
    $channel = $connection->channel();
    
    $channel->queue_declare(
        queue: 'destination_queue',
        durable: true,
        auto_delete: false
    );
    
    $channel->queue_declare(
        queue: 'delay_queue',
        durable: true,
        auto_delete: false,
        arguments: new AMQPTable(
            [
                'x-dead-letter-exchange' => 'delayed_exchange',
                'x-dead-letter-routing-key' => 'destination_queue',
                'x-message-ttl' => 10000
            ]
        )
    );
    
    $channel->exchange_declare(
        exchange: 'delayed_exchange',
        type: AMQPExchangeType::DIRECT,
        durable: true,
        auto_delete: false
    );
    
    $channel->queue_bind('delay_queue', 'delayed_exchange', 'delay_queue');
    
    $channel->queue_bind('destination_queue', 'delayed_exchange', 'destination_queue');
    
    $message = new AMQPMessage('test');
    
    $channel->basic_publish($message, 'delayed_exchange', 'delay_queue');
    


    요약



    두 솔루션 모두 사용하기 쉽습니다. 장점과 단점이 있습니다. 첫 번째 솔루션을 선택하면 전용 교환이 이루어집니다. 그러나 실제로 프로덕션 용도로 상당히 안정적인 추가 플러그인을 설치해야 합니다.

    두 번째 솔루션은 네이티브입니다. 그러나 추가 대기열을 생성해야 하므로 첫 번째 대기열만큼 투명하지 않습니다. 그러나 더 빠릅니다.

    좋은 웹페이지 즐겨찾기