Hyperf連接使用RabbitMQ消息中間件
傳送門
- 使用Docker部署RabbitMQ,->傳送門<
- 使用Docker部署Hyperf,->傳送門-<
部署環(huán)境
安裝amqp擴(kuò)展
composer require hyperf/amqp
安裝command命令行擴(kuò)展
composer require hyperf/command
配置參數(shù)
假設(shè)已經(jīng)在rabbitmq設(shè)置了交換機(jī)exchange_test和隊列queue_test
新建 /config/autoload/amp.php配置文件,修改地址和用戶名密碼
<?php
return [
'default' => [
'host' => '127.0.0.1',//rabbitmq服務(wù)的地址
'port' => 5672,
'user' => 'user',
'password' => '123456',
'vhost' => '/',
'concurrent' => [
'limit' => 1,
],
'pool' => [
'connections' => 1,
],
'params' => [
'insist' => false,
'login_method' => 'AMQPLAIN',
'login_response' => null,
'locale' => 'en_US',
'connection_timeout' => 3.0,
'read_write_timeout' => 6.0,
'context' => null,
'keepalive' => false,
'heartbeat' => 3,
'close_on_destruct' => false,
],
],
'pool2' => [
...
]
];
生產(chǎn)數(shù)據(jù)
創(chuàng)建生產(chǎn)者中間件
php bin/hyperf.php gen:amqp-producer DemoProducer
exchange是交換機(jī),routingKey是隊列名
<?php
declare(strict_types=1);
namespace App\Amqp\Producers;
use Hyperf\Amqp\Annotation\Producer;
use Hyperf\Amqp\Message\ProducerMessage;
/**
* @Producer(exchange="exchange_test", routingKey="queue_test")
*/
class DemoProducer extends ProducerMessage
{
public function __construct($data)
{
//將收到的數(shù)據(jù)加入隊列
$this->plyload = $data;
}
}
創(chuàng)建生產(chǎn)者腳本
php bin/hyperf.php gen:command FooCommand
代碼
<?php
declare(strict_types=1);
namespace App\Command;
use Hyperf\Command\Command as HyperfCommand;
use Hyperf\Command\Annotation\Command;
use Hyperf\Amqp\Producer;
use App\Amqp\Producers\DemoProducer;
use Hyperf\Utils\ApplicationContext;
/**
* @Command
*/
class FooCommand extends HyperfCommand
{
/**
* 執(zhí)行的命令行
*
* @var string
*/
protected $name = 'foo:command';
public function handle()
{
//協(xié)程代碼,創(chuàng)建1000個協(xié)程分別處理
$wg = new \Hyperf\Utils\WaitGroup();
$wg->add(1000);// 計數(shù)器加1000
for($i=0;$i<1000;$i++){
// 創(chuàng)建協(xié)程$i
co(function () use ($wg) {
//amqp代碼,將數(shù)據(jù)加入生產(chǎn)者隊列
$message = new DemoProducer(['id'=>$i]);
$producer = ApplicationContext::getContainer()->get(Producer::class);
$result = $producer->produce($message);
// 計數(shù)器減一
$wg->done();
});
}
// 等待所有協(xié)程運(yùn)行完成
$wg->wait();
}
}
調(diào)用命令行,來生產(chǎn)數(shù)據(jù)
php bin/hyperf.php foo:command
至此,進(jìn)入rabbitmq后臺,對應(yīng)的隊列里就會有數(shù)據(jù)。
消費(fèi)數(shù)據(jù)
創(chuàng)建消費(fèi)者中間件
php bin/hyperf.php gen:amqp-consumer DemoConsumer
代碼解釋如上,多的queue也是隊列名,num是進(jìn)程數(shù)
<?php
declare(strict_types=1);
namespace App\Amqp\Consumers;
use Hyperf\Amqp\Annotation\Consumer;
use Hyperf\Amqp\Message\ConsumerMessage;
use Hyperf\Amqp\Result;
use PhpAmqpLib\Message\AMQPMessage;
#[Consumer(exchange: "hyperf", routingKey: "hyperf", queue: "hyperf", nums: 1)]
class DemoConsumer extends ConsumerMessage
{
public function consumeMessage($data, AMQPMessage $message): string
{
print_r($data);
return Result::ACK;
}
}
重啟框架會自動調(diào)用消費(fèi)者
php bin/hyperf.php start
原創(chuàng)碼字不易,喜歡請收藏關(guān)注文章來源:http://www.zghlxwxcb.cn/news/detail-548506.html
部分參考自:https://www.bilibili.com/video/BV1de4y1E7Ya/?vd_source=36102b089bcd7ff8177499ba833633e0文章來源地址http://www.zghlxwxcb.cn/news/detail-548506.html
到了這里,關(guān)于Hyperf使用RabbitMQ消息隊列的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!