PHP+RabbitMQ 메시지 큐 의 전체 코드 구현
왜 ActiveMq 나 RocketMq 가 아 닌 RabbitMq 를 사용 합 니까?
우선,업무 상 으로 볼 때 저 는 메시지 의 100%수 용 률 을 요구 하지 않 습 니 다.그리고 phop 개발 과 결합 해 야 합 니 다.RabbitMq 는 RocketMq 에 비해 지연 이 비교적 낮 습 니 다(미묘 한 등급).ActiveMq 에 대해 서 는 문제 가 많은 것 같 습 니 다.RabbitMq 는 다양한 언어 에 대한 지원 이 좋 기 때문에 RabbitMq 를 선택 하 십시오.
PHP 에 대응 하 는 RabbitMQ 를 먼저 설치 합 니 다.여 기 는 php 를 사용 합 니 다.amqp 의 서로 다른 확장 실현 방식 은 미세한 차이 가 있 을 수 있 습 니 다.
php 확장 주소:http://pecl.php.net/package/amqp
구체 적 으로 는 홈 페이지 를 기준 으로 한다 http://www.rabbitmq.com/getstarted.html
소개 하 다.
<?php
return [
//
'host' => [
'host' => '127.0.0.1',
'port' => '5672',
'login' => 'guest',
'password' => 'guest',
'vhost'=>'/',
],
//
'exchange'=>'word',
//
'routes' => [],
];
BaseMQ.php
<?php
/**
* Created by PhpStorm.
* User: pc
* Date: 2018/12/13
* Time: 14:11
*/
namespace MyObjSummary\rabbitMQ;
/** Member
* AMQPChannel
* AMQPConnection
* AMQPEnvelope
* AMQPExchange
* AMQPQueue
* Class BaseMQ
* @package MyObjSummary\rabbitMQ
*/
class BaseMQ
{
/** MQ Channel
* @var \AMQPChannel
*/
public $AMQPChannel ;
/** MQ Link
* @var \AMQPConnection
*/
public $AMQPConnection ;
/** MQ Envelope
* @var \AMQPEnvelope
*/
public $AMQPEnvelope ;
/** MQ Exchange
* @var \AMQPExchange
*/
public $AMQPExchange ;
/** MQ Queue
* @var \AMQPQueue
*/
public $AMQPQueue ;
/** conf
* @var
*/
public $conf ;
/** exchange
* @var
*/
public $exchange ;
/** link
* BaseMQ constructor.
* @throws \AMQPConnectionException
*/
public function __construct()
{
$conf = require 'config.php' ;
if(!$conf)
throw new \AMQPConnectionException('config error!');
$this->conf = $conf['host'] ;
$this->exchange = $conf['exchange'] ;
$this->AMQPConnection = new \AMQPConnection($this->conf);
if (!$this->AMQPConnection->connect())
throw new \AMQPConnectionException("Cannot connect to the broker!
");
}
/**
* close link
*/
public function close()
{
$this->AMQPConnection->disconnect();
}
/** Channel
* @return \AMQPChannel
* @throws \AMQPConnectionException
*/
public function channel()
{
if(!$this->AMQPChannel) {
$this->AMQPChannel = new \AMQPChannel($this->AMQPConnection);
}
return $this->AMQPChannel;
}
/** Exchange
* @return \AMQPExchange
* @throws \AMQPConnectionException
* @throws \AMQPExchangeException
*/
public function exchange()
{
if(!$this->AMQPExchange) {
$this->AMQPExchange = new \AMQPExchange($this->channel());
$this->AMQPExchange->setName($this->exchange);
}
return $this->AMQPExchange ;
}
/** queue
* @return \AMQPQueue
* @throws \AMQPConnectionException
* @throws \AMQPQueueException
*/
public function queue()
{
if(!$this->AMQPQueue) {
$this->AMQPQueue = new \AMQPQueue($this->channel());
}
return $this->AMQPQueue ;
}
/** Envelope
* @return \AMQPEnvelope
*/
public function envelope()
{
if(!$this->AMQPEnvelope) {
$this->AMQPEnvelope = new \AMQPEnvelope();
}
return $this->AMQPEnvelope;
}
}
ProductMQ.php
<?php
// P
namespace MyObjSummary\rabbitMQ;
require 'BaseMQ.php';
class ProductMQ extends BaseMQ
{
private $routes = ['hello','word']; // key
/**
* ProductMQ constructor.
* @throws \AMQPConnectionException
*/
public function __construct()
{
parent::__construct();
}
/**
* @throws \AMQPChannelException
* @throws \AMQPConnectionException
* @throws \AMQPExchangeException
*/
public function run()
{
//
$channel = $this->channel();
//
$ex = $this->exchange();
//
$message = 'product message '.rand(1,99999);
//
$channel->startTransaction();
$sendEd = true ;
foreach ($this->routes as $route) {
$sendEd = $ex->publish($message, $route) ;
echo "Send Message:".$sendEd."
";
}
if(!$sendEd) {
$channel->rollbackTransaction();
}
$channel->commitTransaction(); //
$this->close();
die ;
}
}
try{
(new ProductMQ())->run();
}catch (\Exception $exception){
var_dump($exception->getMessage()) ;
}
ConsumerMQ.php
<?php
// C
namespace MyObjSummary\rabbitMQ;
require 'BaseMQ.php';
class ConsumerMQ extends BaseMQ
{
private $q_name = 'hello'; //
private $route = 'hello'; // key
/**
* ConsumerMQ constructor.
* @throws \AMQPConnectionException
*/
public function __construct()
{
parent::__construct();
}
/**
* @throws \AMQPChannelException
* @throws \AMQPConnectionException
* @throws \AMQPExchangeException
* @throws \AMQPQueueException
*/
public function run()
{
//
$ex = $this->exchange();
$ex->setType(AMQP_EX_TYPE_DIRECT); //direct
$ex->setFlags(AMQP_DURABLE); //
//echo "Exchange Status:".$ex->declare()."
";
//
$q = $this->queue();
//var_dump($q->declare());exit();
$q->setName($this->q_name);
$q->setFlags(AMQP_DURABLE); //
//echo "Message Total:".$q->declareQueue()."
";
// ,
echo 'Queue Bind: '.$q->bind($this->exchange, $this->route)."
";
//
echo "Message:
";
while(True){
$q->consume(function ($envelope,$queue){
$msg = $envelope->getBody();
echo $msg."
"; //
$queue->ack($envelope->getDeliveryTag()); // ACK
});
//$q->consume('processMessage', AMQP_AUTOACK); // ACK
}
$this->close();
}
}
try{
(new ConsumerMQ)->run();
}catch (\Exception $exception){
var_dump($exception->getMessage()) ;
}
총결산이상 은 이 글 의 모든 내용 입 니 다.본 고의 내용 이 여러분 의 학습 이나 업무 에 어느 정도 참고 학습 가 치 를 가지 기 를 바 랍 니 다.여러분 의 저희 에 대한 지지 에 감 사 드 립 니 다.
이 내용에 흥미가 있습니까?
현재 기사가 여러분의 문제를 해결하지 못하는 경우 AI 엔진은 머신러닝 분석(스마트 모델이 방금 만들어져 부정확한 경우가 있을 수 있음)을 통해 가장 유사한 기사를 추천합니다:
Laravel - 변환된 유효성 검사 규칙으로 API 요청 제공동적 콘텐츠를 위해 API를 통해 Laravel CMS에 연결하는 모바일 앱(또는 웹사이트) 구축을 고려하십시오. 이제 앱은 CMS에서 번역된 콘텐츠를 받을 것으로 예상되는 다국어 앱이 될 수 있습니다. 일반적으로 ...
텍스트를 자유롭게 공유하거나 복사할 수 있습니다.하지만 이 문서의 URL은 참조 URL로 남겨 두십시오.
CC BY-SA 2.5, CC BY-SA 3.0 및 CC BY-SA 4.0에 따라 라이센스가 부여됩니다.