国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

tp6 RabbitMQ

這篇具有很好參考價(jià)值的文章主要介紹了tp6 RabbitMQ。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

1、composer 安裝 AMQP 擴(kuò)展
composer require php-amqplib/php-amqplib
2、RabbitMQ 配置

?在 config 目錄下創(chuàng)建 rabbitmq.php 文件

<?php
return [
    'host'=>'',
    'port'=>'5672',
    'user'=>'',
    'password'=>'',
    'vhost'=>'',
    'exchange_name' => '',
    'queue_name' => '',
    'route_key' => '',
    'consumer_tag' => '',
];
3、生產(chǎn)者代碼

app目錄下創(chuàng)建Producer.php

<?php

namespace app;

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Message\AMQPMessage;

class Producer
{

    private $connection;
    private $channel;
    private $mq_config;

    public function __construct()
    {
        $this->mq_config = config('rabbit_mq');
        $this->connection = new AMQPStreamConnection(
            $this->mq_config['host'],
            $this->mq_config['port'],
            $this->mq_config['user'],
            $this->mq_config['password'],
            'itcast'
        );
        //創(chuàng)建通道
        $this->channel = $this->connection->channel();
    }

    public function send($data)
    {
        /**
         * 創(chuàng)建隊(duì)列(Queue)
         * name: hello         // 隊(duì)列名稱
         * passive: false      // 如果設(shè)置true存在則返回OK,否則就報(bào)錯(cuò)。設(shè)置false存在返回OK,不存在則自動(dòng)創(chuàng)建
         * durable: true       // 是否持久化,設(shè)置false是存放到內(nèi)存中的,RabbitMQ重啟后會(huì)丟失;設(shè)置true,則代表是一個(gè)持久化的隊(duì)列,服務(wù)重啟后也會(huì)存在,因?yàn)榉?wù)會(huì)把持久化的queue存放到磁盤上當(dāng)服務(wù)重啟的時(shí)候,會(huì)重新加載之前被持久化的queue
         * exclusive: false    // 是否排他,指定該選項(xiàng)為true則隊(duì)列只對(duì)當(dāng)前連接有效,連接斷開后自動(dòng)刪除
         * auto_delete: false // 是否自動(dòng)刪除,當(dāng)最后一個(gè)消費(fèi)者斷開連接之后隊(duì)列是否自動(dòng)被刪除
         *
         */
        $this->channel->queue_declare($this->mq_config['queue_name'], false, true, false, false);

        /**
         * 創(chuàng)建交換機(jī)(Exchange)
         * name: vckai_exchange// 交換機(jī)名稱
         * type: direct        // 交換機(jī)類型,分別為direct/fanout/topic,參考另外文章的Exchange Type說(shuō)明。
         * passive: false      // 如果設(shè)置true存在則返回OK,否則就報(bào)錯(cuò)。設(shè)置false存在返回OK,不存在則自動(dòng)創(chuàng)建
         * durable: false      // 是否持久化,設(shè)置false是存放到內(nèi)存中的,RabbitMQ重啟后會(huì)丟失
         * auto_delete: false  // 是否自動(dòng)刪除,當(dāng)最后一個(gè)消費(fèi)者斷開連接之后隊(duì)列是否自動(dòng)被刪除
         */
        $this->channel->exchange_declare($this->mq_config['exchange_name'], AMQPExchangeType::DIRECT, false, true, false);

        // 綁定消息交換機(jī)和隊(duì)列
        $this->channel->queue_bind($this->mq_config['queue_name'], $this->mq_config['exchange_name'],$this->mq_config['route_key']);

        $messageBody = json_encode($data);//將要發(fā)送數(shù)據(jù)變?yōu)閖son字符串

        /**
         * 創(chuàng)建AMQP消息類型
         * delivery_mode 消息是否持久化
         * AMQPMessage::DELIVERY_MODE_NON_PERSISTENT  不持久化
         * AMQPMessage::DELIVERY_MODE_PERSISTENT      持久化
         */
        $message = new AMQPMessage($messageBody, array('content_type' => 'text/plain', 'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT));

        /**
         * 發(fā)送消息
         * msg: $message            // AMQP消息內(nèi)容
         * exchange: vckai_exchange // 交換機(jī)名稱
         * routing_key: hello       // 路由key
         */
        $this->channel->basic_publish($message, $this->mq_config['exchange_name'], $this->mq_config['route_key']);

        //關(guān)閉連接
        $this->stop();

    }

    //關(guān)閉進(jìn)程
    public function stop()
    {
        $this->channel->close();
        $this->connection->close();
    }

}
4、消費(fèi)者代碼

app目錄下創(chuàng)建Consumer.php

<?php

namespace app;

use app\index\controller\ApiCommunity;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use think\db\exception\PDOException;
use think\facade\Log;

class Consumer
{
    private $connection;
    private $channel;
    private $mq_config;

    public function __construct()
    {
        $this->mq_config = config('rabbit_mq');
        $this->connection = new AMQPStreamConnection(
            $this->mq_config['host'],
            $this->mq_config['port'],
            $this->mq_config['user'],
            $this->mq_config['password'],
            $this->mq_config['vhost']
        );
        //創(chuàng)建通道
        $this->channel = $this->connection->channel();
    }

    /**
     * @param $channel
     * @param $connection
     * 關(guān)閉進(jìn)程
     */
    function shutdown($channel, $connection)
    {
        $channel->close();
        $connection->close();
    }

    /**
     * @param $message
     * 消息處理
     */
    function process_message($message)
    {
        //消息處理邏輯
        echo $message->body . "\n";

        if ($message->body !== 'quit') {
            $obj = json_decode($message->body);
            if (!isset($obj->id)) {
                Log::write("error data:" . $message->body, 2);
            } else {
                try {
                    Log::write("data:" . json_encode($message));
                    //消息處理
                } catch (\Think\Exception  $e) {
                    Log::write($e->getMessage(), 2);
                    Log::write(json_encode($message), 2);
                } catch (PDOException $pe) {
                    Log::write($pe->getMessage(), 2);
                    Log::write(json_encode($message), 2);
                }
            }
        }

        // 手動(dòng)確認(rèn)ack,確保消息已經(jīng)處理
        $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']);
        // Send a message with the string "quit" to cancel the consumer.
        if ($message->body === 'quit') {
            $message->delivery_info['channel']->basic_cancel($message->delivery_info['consumer_tag']);
        }
    }


    /**
     * @throws \ErrorException
     * 啟動(dòng)
     *
     * nohup php index.php index/Message_Consume/start &
     */
    public function start()
    {
        // 設(shè)置消費(fèi)者(Consumer)客戶端同時(shí)只處理一條隊(duì)列
        // 這樣是告訴RabbitMQ,再同一時(shí)刻,不要發(fā)送超過(guò)1條消息給一個(gè)消費(fèi)者(Consumer),直到它已經(jīng)處理了上一條消息并且作出了響應(yīng)。這樣,RabbitMQ就會(huì)把消息分發(fā)給下一個(gè)空閑的消費(fèi)者(Consumer)。
        //消費(fèi)者端要把自動(dòng)確認(rèn)autoAck設(shè)置為false,basic_qos才有效。
        //$this->channel->basic_qos(0, 1, false);

        // 同樣是創(chuàng)建路由和隊(duì)列,以及綁定路由隊(duì)列,注意要跟producer(生產(chǎn)者)的一致
        // 這里其實(shí)可以不用設(shè)置,但是為了防止隊(duì)列沒有被創(chuàng)建所以做的容錯(cuò)處理
        $this->channel->queue_declare($this->mq_config['queue_name'], false, true, false, false);
        $this->channel->exchange_declare($this->mq_config['exchange_name'], AMQPExchangeType::DIRECT, false, true, false);
        $this->channel->queue_bind($this->mq_config['queue_name'], $this->mq_config['exchange_name'], $this->mq_config['route_key']);

        /**
         *
         * queue: queue_name    // 被消費(fèi)的隊(duì)列名稱
         * consumer_tag: consumer_tag // 消費(fèi)者客戶端身份標(biāo)識(shí),用于區(qū)分多個(gè)客戶端
         * no_local: false      // 這個(gè)功能屬于AMQP的標(biāo)準(zhǔn),但是RabbitMQ并沒有做實(shí)現(xiàn)
         * no_ack: true         // 收到消息后,是否不需要回復(fù)確認(rèn)即被認(rèn)為被消費(fèi)
         * exclusive: false     // 是否排他,即這個(gè)隊(duì)列只能由一個(gè)消費(fèi)者消費(fèi)。適用于任務(wù)不允許進(jìn)行并發(fā)處理的情況下
         * nowait: false        // 不返回執(zhí)行結(jié)果,但是如果排他開啟的話,則必須需要等待結(jié)果的,如果兩個(gè)一起開就會(huì)報(bào)錯(cuò)
         * callback: $callback  // 回調(diào)邏輯處理函數(shù)
         *
         */
        $this->channel->basic_consume($this->mq_config['queue_name'], $this->mq_config['consumer_tag'], false, false, false, false, array($this, 'process_message'));

        register_shutdown_function(array($this, 'shutdown'), $this->channel, $this->connection);

        while (count($this->channel->callbacks)) {
            $this->channel->wait();
        }
    }
}
5、創(chuàng)建自定義命令
php think make:command Consumer

在項(xiàng)目跟目錄執(zhí)行以下命令,會(huì)自動(dòng)生成 在 command 目錄生成 Consumer 控制器?

<?php
declare (strict_types = 1);

namespace app\command;

use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\input\Option;
use think\console\Output;

class Consumer extends Command
{
    protected function configure()
    {
        // 指令配置
        $this->setName('consumer')
            ->setDescription('the consumer command');
    }

    protected function execute(Input $input, Output $output)
    {
        // 指令輸出
        $output->writeln('consumer');
        $consumer = new \app\Consumer();
//        $consumer->process_message(11)
        $consumer->start();

    }
}

config/console.php 代碼增加如下:

// 指令定義
'commands' => [
    'consumer' => 'app\command\Consumer',
],
6、命令

消費(fèi)者命令

php think consumer

?生產(chǎn)者執(zhí)行命令文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-648920.html

$producer = new Producer();
$data = [
    'message' => "發(fā)送的消息內(nèi)容"
];
$producer->send($data);

到了這里,關(guān)于tp6 RabbitMQ的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • tp6 redirect用法

    一.響應(yīng)操作 1. 響應(yīng)輸出,有好幾種:包括 return、json()和 view()等等; 2. 默認(rèn)輸出方式是以 html 格式輸出,如果你發(fā)起 json 請(qǐng)求,則輸出 json; 3. 而背后是 response 對(duì)象,可以用 response()輸出達(dá)到相同的效果; return response($data); 4. 使用 response()方法可以設(shè)置第二參數(shù),狀態(tài)碼,

    2024年02月11日
    瀏覽(99)
  • TP6 開啟關(guān)閉debug

    TP6 開啟關(guān)閉debug

    config 不起作用,還得來(lái)這里改: 或者單個(gè)方法里加:

    2024年02月10日
    瀏覽(99)
  • tp6 v3微信退款

    調(diào)用

    2024年02月13日
    瀏覽(97)
  • tp6用redis存儲(chǔ)session

    tp6用redis存儲(chǔ)session

    隨著業(yè)務(wù)量的增加,很多時(shí)候會(huì)需要共享session的情況。共享session,其實(shí)就是說(shuō)多臺(tái)服務(wù)器共用一個(gè)session,或者是說(shuō)一個(gè)主域跟多個(gè)子域之間共用一個(gè)session。工作中用tp也多一些,那么,我就用tp6來(lái)給大家講解一下。 在共享session,我們需要用到redis。這兒我假設(shè)大家都能讀懂

    2024年02月08日
    瀏覽(107)
  • TP6 對(duì)接阿里云短信接口2.0

    TP6 對(duì)接阿里云短信接口2.0

    首先下載 安裝阿里云短信SDK composer require alibabacloud/sdk 安裝 Alibaba Cloud SDK for PHP 作為依賴項(xiàng) composer require alibabacloud/darabonba-openapi 最后安裝對(duì)應(yīng)的包 composer require alibabacloud/dysmsapi-20170525? 上面3個(gè)都下載了,官方網(wǎng)站并沒有說(shuō)明下載 sdk,這個(gè)沒有下載,查詢資料花了幾個(gè)小時(shí),

    2024年02月16日
    瀏覽(104)
  • TP6----------阿里云短信包驗(yàn)證碼登錄

    首先開通阿里云短信包,之后申請(qǐng)短信包簽名,這里大家自行去阿里云申請(qǐng) 安裝阿里云sdk 首先創(chuàng)建Sample類方便調(diào)用,我們需要有短信簽名,簽名模板,阿里云keyId和accessKeySecret 阿里云key在阿里云首頁(yè),右上角有個(gè)acesskey管理就可以看到 Sample.php文件 業(yè)務(wù)邏輯層生成隨機(jī)驗(yàn)證碼

    2024年02月15日
    瀏覽(93)
  • tp6 rules內(nèi)置驗(yàn)證規(guī)則thinkphp

    1、驗(yàn)證某個(gè)字段必須:‘name’=‘require’ 2、驗(yàn)證某個(gè)字段的值是否為純數(shù)字:‘num’=‘number’ 3、驗(yàn)證某個(gè)字段的值是否為整數(shù):‘num’=‘integer’ 4、驗(yàn)證某個(gè)字段的值是否為浮點(diǎn)數(shù)字:‘num’=‘float’ 5、驗(yàn)證某個(gè)字段的值是否為布爾值:‘num’=‘boolean’ 6、驗(yàn)證某個(gè)字

    2024年04月28日
    瀏覽(101)
  • tp6怎么做阿里云OSS存儲(chǔ)呢?

    tp6怎么做阿里云OSS存儲(chǔ)呢?

    作者:陳業(yè)貴 華為云享專家 51cto(專家博主 明日之星 TOP紅人) 阿里云專家博主 阿里云oss存儲(chǔ)是得買的。買后,還要獲取參數(shù)。填入代碼中就可以啦 獲得四個(gè)參數(shù): $accessKeyId $accessKeySecret $endpoint $bucket 不然不對(duì)了哈 告訴你怎么做阿里云OSS存儲(chǔ).

    2024年02月15日
    瀏覽(28)
  • TP6 使用閉合語(yǔ)句查詢多個(gè)or的模型語(yǔ)句

    需要傳入?yún)?shù)查詢的,可以參照下面的: ? ? ? ? 查詢出學(xué)校名稱和昵稱中有中學(xué)的所有學(xué)校

    2024年02月11日
    瀏覽(26)
  • TP6 + GatewayWorker 輕松實(shí)現(xiàn)web項(xiàng)目 websocket 功能

    TP6 + GatewayWorker 輕松實(shí)現(xiàn)web項(xiàng)目 websocket 功能

    一、在tp6項(xiàng)目下安裝? GatewayWorker? 安裝成功后在配置文件目錄下會(huì)出現(xiàn)gateway_worker.php 開始配置gateway_worker? 下邊我貼出了我的配置文件供大家參考 ? 下面對(duì)gateway 配置部分的屬性解釋 name :?可以設(shè)置Gateway進(jìn)程的名稱,方便status命令中查看統(tǒng)計(jì) count :可以設(shè)置Gateway進(jìn)程的數(shù)量

    2024年02月07日
    瀏覽(23)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包