1、composer 安裝 AMQP 擴(kuò)展
composer require php-amqplib/php-amqplib
2、RabbitMQ 配置
?在 config 目錄下創(chuàng)建 rabbitmq.php 文件
<?php
return [
'host'=>'',
'port'=>'5672',
'user'=>'',
'password'=>'',
'vhost'=>'',
'exchange_name' => '',
'queue_name' => '',
'route_key' => '',
'consumer_tag' => '',
];
3、生產(chǎn)者代碼
app目錄下創(chuàng)建Producer.php
<?php
namespace app;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Message\AMQPMessage;
class Producer
{
private $connection;
private $channel;
private $mq_config;
public function __construct()
{
$this->mq_config = config('rabbit_mq');
$this->connection = new AMQPStreamConnection(
$this->mq_config['host'],
$this->mq_config['port'],
$this->mq_config['user'],
$this->mq_config['password'],
'itcast'
);
//創(chuàng)建通道
$this->channel = $this->connection->channel();
}
public function send($data)
{
/**
* 創(chuàng)建隊(duì)列(Queue)
* name: hello // 隊(duì)列名稱
* passive: false // 如果設(shè)置true存在則返回OK,否則就報(bào)錯(cuò)。設(shè)置false存在返回OK,不存在則自動(dòng)創(chuàng)建
* durable: true // 是否持久化,設(shè)置false是存放到內(nèi)存中的,RabbitMQ重啟后會(huì)丟失;設(shè)置true,則代表是一個(gè)持久化的隊(duì)列,服務(wù)重啟后也會(huì)存在,因?yàn)榉?wù)會(huì)把持久化的queue存放到磁盤上當(dāng)服務(wù)重啟的時(shí)候,會(huì)重新加載之前被持久化的queue
* exclusive: false // 是否排他,指定該選項(xiàng)為true則隊(duì)列只對(duì)當(dāng)前連接有效,連接斷開后自動(dòng)刪除
* auto_delete: false // 是否自動(dòng)刪除,當(dāng)最后一個(gè)消費(fèi)者斷開連接之后隊(duì)列是否自動(dòng)被刪除
*
*/
$this->channel->queue_declare($this->mq_config['queue_name'], false, true, false, false);
/**
* 創(chuàng)建交換機(jī)(Exchange)
* name: vckai_exchange// 交換機(jī)名稱
* type: direct // 交換機(jī)類型,分別為direct/fanout/topic,參考另外文章的Exchange Type說(shuō)明。
* passive: false // 如果設(shè)置true存在則返回OK,否則就報(bào)錯(cuò)。設(shè)置false存在返回OK,不存在則自動(dòng)創(chuàng)建
* durable: false // 是否持久化,設(shè)置false是存放到內(nèi)存中的,RabbitMQ重啟后會(huì)丟失
* auto_delete: false // 是否自動(dòng)刪除,當(dāng)最后一個(gè)消費(fèi)者斷開連接之后隊(duì)列是否自動(dòng)被刪除
*/
$this->channel->exchange_declare($this->mq_config['exchange_name'], AMQPExchangeType::DIRECT, false, true, false);
// 綁定消息交換機(jī)和隊(duì)列
$this->channel->queue_bind($this->mq_config['queue_name'], $this->mq_config['exchange_name'],$this->mq_config['route_key']);
$messageBody = json_encode($data);//將要發(fā)送數(shù)據(jù)變?yōu)閖son字符串
/**
* 創(chuàng)建AMQP消息類型
* delivery_mode 消息是否持久化
* AMQPMessage::DELIVERY_MODE_NON_PERSISTENT 不持久化
* AMQPMessage::DELIVERY_MODE_PERSISTENT 持久化
*/
$message = new AMQPMessage($messageBody, array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));
/**
* 發(fā)送消息
* msg: $message // AMQP消息內(nèi)容
* exchange: vckai_exchange // 交換機(jī)名稱
* routing_key: hello // 路由key
*/
$this->channel->basic_publish($message, $this->mq_config['exchange_name'], $this->mq_config['route_key']);
//關(guān)閉連接
$this->stop();
}
//關(guān)閉進(jìn)程
public function stop()
{
$this->channel->close();
$this->connection->close();
}
}
4、消費(fèi)者代碼
app目錄下創(chuàng)建Consumer.php
<?php
namespace app;
use app\index\controller\ApiCommunity;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use think\db\exception\PDOException;
use think\facade\Log;
class Consumer
{
private $connection;
private $channel;
private $mq_config;
public function __construct()
{
$this->mq_config = config('rabbit_mq');
$this->connection = new AMQPStreamConnection(
$this->mq_config['host'],
$this->mq_config['port'],
$this->mq_config['user'],
$this->mq_config['password'],
$this->mq_config['vhost']
);
//創(chuàng)建通道
$this->channel = $this->connection->channel();
}
/**
* @param $channel
* @param $connection
* 關(guān)閉進(jìn)程
*/
function shutdown($channel, $connection)
{
$channel->close();
$connection->close();
}
/**
* @param $message
* 消息處理
*/
function process_message($message)
{
//消息處理邏輯
echo $message->body . "\n";
if ($message->body !== 'quit') {
$obj = json_decode($message->body);
if (!isset($obj->id)) {
Log::write("error data:" . $message->body, 2);
} else {
try {
Log::write("data:" . json_encode($message));
//消息處理
} catch (\Think\Exception $e) {
Log::write($e->getMessage(), 2);
Log::write(json_encode($message), 2);
} catch (PDOException $pe) {
Log::write($pe->getMessage(), 2);
Log::write(json_encode($message), 2);
}
}
}
// 手動(dòng)確認(rèn)ack,確保消息已經(jīng)處理
$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
// Send a message with the string "quit" to cancel the consumer.
if ($message->body === 'quit') {
$message->delivery_info['channel']->basic_cancel($message->delivery_info['consumer_tag']);
}
}
/**
* @throws \ErrorException
* 啟動(dòng)
*
* nohup php index.php index/Message_Consume/start &
*/
public function start()
{
// 設(shè)置消費(fèi)者(Consumer)客戶端同時(shí)只處理一條隊(duì)列
// 這樣是告訴RabbitMQ,再同一時(shí)刻,不要發(fā)送超過(guò)1條消息給一個(gè)消費(fèi)者(Consumer),直到它已經(jīng)處理了上一條消息并且作出了響應(yīng)。這樣,RabbitMQ就會(huì)把消息分發(fā)給下一個(gè)空閑的消費(fèi)者(Consumer)。
//消費(fèi)者端要把自動(dòng)確認(rèn)autoAck設(shè)置為false,basic_qos才有效。
//$this->channel->basic_qos(0, 1, false);
// 同樣是創(chuàng)建路由和隊(duì)列,以及綁定路由隊(duì)列,注意要跟producer(生產(chǎn)者)的一致
// 這里其實(shí)可以不用設(shè)置,但是為了防止隊(duì)列沒有被創(chuàng)建所以做的容錯(cuò)處理
$this->channel->queue_declare($this->mq_config['queue_name'], false, true, false, false);
$this->channel->exchange_declare($this->mq_config['exchange_name'], AMQPExchangeType::DIRECT, false, true, false);
$this->channel->queue_bind($this->mq_config['queue_name'], $this->mq_config['exchange_name'], $this->mq_config['route_key']);
/**
*
* queue: queue_name // 被消費(fèi)的隊(duì)列名稱
* consumer_tag: consumer_tag // 消費(fèi)者客戶端身份標(biāo)識(shí),用于區(qū)分多個(gè)客戶端
* no_local: false // 這個(gè)功能屬于AMQP的標(biāo)準(zhǔn),但是RabbitMQ并沒有做實(shí)現(xiàn)
* no_ack: true // 收到消息后,是否不需要回復(fù)確認(rèn)即被認(rèn)為被消費(fèi)
* exclusive: false // 是否排他,即這個(gè)隊(duì)列只能由一個(gè)消費(fèi)者消費(fèi)。適用于任務(wù)不允許進(jìn)行并發(fā)處理的情況下
* nowait: false // 不返回執(zhí)行結(jié)果,但是如果排他開啟的話,則必須需要等待結(jié)果的,如果兩個(gè)一起開就會(huì)報(bào)錯(cuò)
* callback: $callback // 回調(diào)邏輯處理函數(shù)
*
*/
$this->channel->basic_consume($this->mq_config['queue_name'], $this->mq_config['consumer_tag'], false, false, false, false, array($this, 'process_message'));
register_shutdown_function(array($this, 'shutdown'), $this->channel, $this->connection);
while (count($this->channel->callbacks)) {
$this->channel->wait();
}
}
}
5、創(chuàng)建自定義命令
php think make:command Consumer
在項(xiàng)目跟目錄執(zhí)行以下命令,會(huì)自動(dòng)生成 在 command 目錄生成 Consumer 控制器?
<?php
declare (strict_types = 1);
namespace app\command;
use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\input\Option;
use think\console\Output;
class Consumer extends Command
{
protected function configure()
{
// 指令配置
$this->setName('consumer')
->setDescription('the consumer command');
}
protected function execute(Input $input, Output $output)
{
// 指令輸出
$output->writeln('consumer');
$consumer = new \app\Consumer();
// $consumer->process_message(11)
$consumer->start();
}
}
config/console.php 代碼增加如下:
// 指令定義
'commands' => [
'consumer' => 'app\command\Consumer',
],
6、命令
消費(fèi)者命令文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-648920.html
php think consumer
?生產(chǎn)者執(zhí)行命令文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-648920.html
$producer = new Producer();
$data = [
'message' => "發(fā)送的消息內(nèi)容"
];
$producer->send($data);
到了這里,關(guān)于tp6 RabbitMQ的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!