PHP+RabbitMQ 메시지 큐 의 전체 코드 구현

머리말
왜 ActiveMq 나 RocketMq 가 아 닌 RabbitMq 를 사용 합 니까?
우선,업무 상 으로 볼 때 저 는 메시지 의 100%수 용 률 을 요구 하지 않 습 니 다.그리고 phop 개발 과 결합 해 야 합 니 다.RabbitMq 는 RocketMq 에 비해 지연 이 비교적 낮 습 니 다(미묘 한 등급).ActiveMq 에 대해 서 는 문제 가 많은 것 같 습 니 다.RabbitMq 는 다양한 언어 에 대한 지원 이 좋 기 때문에 RabbitMq 를 선택 하 십시오.
PHP 에 대응 하 는 RabbitMQ 를 먼저 설치 합 니 다.여 기 는 php 를 사용 합 니 다.amqp 의 서로 다른 확장 실현 방식 은 미세한 차이 가 있 을 수 있 습 니 다.
php 확장 주소:http://pecl.php.net/package/amqp
구체 적 으로 는 홈 페이지 를 기준 으로 한다  http://www.rabbitmq.com/getstarted.html
소개 하 다.
  • config.php 설정 정보
  • BaseMQ.php MQ 기본 클래스
  • ProductMQ.php 생산자 류
  • ConsumerMQ.php 소비자 류
  • Consumer2MQ.php 소비자 2(여러 개 가능)
  • config.php
    
     <?php
     return [
      //  
      'host' => [
       'host' => '127.0.0.1',
       'port' => '5672',
       'login' => 'guest',
       'password' => 'guest',
       'vhost'=>'/',
      ],
      //   
      'exchange'=>'word',
      //  
      'routes' => [],
     ];
    BaseMQ.php
    
     <?php
     /**
      * Created by PhpStorm.
      * User: pc
      * Date: 2018/12/13
      * Time: 14:11
      */
     
     namespace MyObjSummary\rabbitMQ;
     
     /** Member
      *  AMQPChannel
      *  AMQPConnection
      *  AMQPEnvelope
      *  AMQPExchange
      *  AMQPQueue
      * Class BaseMQ
      * @package MyObjSummary\rabbitMQ
      */
     class BaseMQ
     {
      /** MQ Channel
       * @var \AMQPChannel
       */
      public $AMQPChannel ;
     
      /** MQ Link
       * @var \AMQPConnection
       */
      public $AMQPConnection ;
     
      /** MQ Envelope
       * @var \AMQPEnvelope
       */
      public $AMQPEnvelope ;
     
      /** MQ Exchange
       * @var \AMQPExchange
       */
      public $AMQPExchange ;
     
      /** MQ Queue
       * @var \AMQPQueue
       */
      public $AMQPQueue ;
     
      /** conf
       * @var
       */
      public $conf ;
     
      /** exchange
       * @var
       */
      public $exchange ;
     
      /** link
       * BaseMQ constructor.
       * @throws \AMQPConnectionException
       */
      public function __construct()
      {
       $conf = require 'config.php' ;
       if(!$conf)
        throw new \AMQPConnectionException('config error!');
       $this->conf  = $conf['host'] ;
       $this->exchange = $conf['exchange'] ;
       $this->AMQPConnection = new \AMQPConnection($this->conf);
       if (!$this->AMQPConnection->connect())
        throw new \AMQPConnectionException("Cannot connect to the broker!
    "); } /** * close link */ public function close() { $this->AMQPConnection->disconnect(); } /** Channel * @return \AMQPChannel * @throws \AMQPConnectionException */ public function channel() { if(!$this->AMQPChannel) { $this->AMQPChannel = new \AMQPChannel($this->AMQPConnection); } return $this->AMQPChannel; } /** Exchange * @return \AMQPExchange * @throws \AMQPConnectionException * @throws \AMQPExchangeException */ public function exchange() { if(!$this->AMQPExchange) { $this->AMQPExchange = new \AMQPExchange($this->channel()); $this->AMQPExchange->setName($this->exchange); } return $this->AMQPExchange ; } /** queue * @return \AMQPQueue * @throws \AMQPConnectionException * @throws \AMQPQueueException */ public function queue() { if(!$this->AMQPQueue) { $this->AMQPQueue = new \AMQPQueue($this->channel()); } return $this->AMQPQueue ; } /** Envelope * @return \AMQPEnvelope */ public function envelope() { if(!$this->AMQPEnvelope) { $this->AMQPEnvelope = new \AMQPEnvelope(); } return $this->AMQPEnvelope; } }
    ProductMQ.php
    
     <?php
     //    P
     namespace MyObjSummary\rabbitMQ;
     require 'BaseMQ.php';
     class ProductMQ extends BaseMQ
     {
      private $routes = ['hello','word']; //  key
     
      /**
       * ProductMQ constructor.
       * @throws \AMQPConnectionException
       */
      public function __construct()
      {
       parent::__construct();
      }
     
      /**                   
       * @throws \AMQPChannelException
       * @throws \AMQPConnectionException
       * @throws \AMQPExchangeException
       */
      public function run()
      {
       //  
       $channel = $this->channel();
       //       
       $ex = $this->exchange();
       //    
       $message = 'product message '.rand(1,99999);
       //    
       $channel->startTransaction();
       $sendEd = true ;
       foreach ($this->routes as $route) {
        $sendEd = $ex->publish($message, $route) ;
        echo "Send Message:".$sendEd."
    "; } if(!$sendEd) { $channel->rollbackTransaction(); } $channel->commitTransaction(); // $this->close(); die ; } } try{ (new ProductMQ())->run(); }catch (\Exception $exception){ var_dump($exception->getMessage()) ; }
    ConsumerMQ.php
    
     <?php
     //    C
     namespace MyObjSummary\rabbitMQ;
     require 'BaseMQ.php';
     class ConsumerMQ extends BaseMQ
     {
      private $q_name = 'hello'; //   
      private $route = 'hello'; //  key
     
      /**
       * ConsumerMQ constructor.
       * @throws \AMQPConnectionException
       */
      public function __construct()
      {
       parent::__construct();
      }
     
      /**                  
       * @throws \AMQPChannelException
       * @throws \AMQPConnectionException
       * @throws \AMQPExchangeException
       * @throws \AMQPQueueException
       */
      public function run()
      {
     
       //     
       $ex = $this->exchange();
       $ex->setType(AMQP_EX_TYPE_DIRECT); //direct  
       $ex->setFlags(AMQP_DURABLE); //   
       //echo "Exchange Status:".$ex->declare()."
    "; // $q = $this->queue(); //var_dump($q->declare());exit(); $q->setName($this->q_name); $q->setFlags(AMQP_DURABLE); // //echo "Message Total:".$q->declareQueue()."
    "; // , echo 'Queue Bind: '.$q->bind($this->exchange, $this->route)."
    "; // echo "Message:
    "; while(True){ $q->consume(function ($envelope,$queue){ $msg = $envelope->getBody(); echo $msg."
    "; // $queue->ack($envelope->getDeliveryTag()); // ACK }); //$q->consume('processMessage', AMQP_AUTOACK); // ACK } $this->close(); } } try{ (new ConsumerMQ)->run(); }catch (\Exception $exception){ var_dump($exception->getMessage()) ; }
    총결산
    이상 은 이 글 의 모든 내용 입 니 다.본 고의 내용 이 여러분 의 학습 이나 업무 에 어느 정도 참고 학습 가 치 를 가지 기 를 바 랍 니 다.여러분 의 저희 에 대한 지지 에 감 사 드 립 니 다.

    좋은 웹페이지 즐겨찾기