国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

分布式消息隊列:RabbitMQ(1)

這篇具有很好參考價值的文章主要介紹了分布式消息隊列:RabbitMQ(1)。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

目錄

一:中間件

二:分布式消息隊列?

2.1:是消息隊列

2.1.1:消息隊列的優(yōu)勢

2.1.1.1:異步處理化

2.1.1.2:削峰填谷

2.2:分布式消息隊列

2.2.1:分布式消息隊列的優(yōu)勢

2.2.1.1:數(shù)據(jù)的持久化

2.2.1.2:可擴展性

2.2.1.3:應(yīng)用解耦

2.2.1.4:發(fā)送訂閱?

2.2.2:分布式消息隊列的應(yīng)用場景?

三:Rabbitmq

3.1:基本概念

3.2:快速入門?

3.2.1:引入消息隊列Java客戶端

3.2.2:單消費開發(fā)生產(chǎn)者和消費者

?3.2.3:多消費開發(fā)生產(chǎn)者和消費者

3.3.3:交換機

3.3.3.1:交換機的類別

a):fanout


一:中間件

連接多個系統(tǒng),幫助多個系統(tǒng)緊密協(xié)作的技術(shù)(組件)

分布式消息隊列:RabbitMQ(1),消息中間件,分布式,rabbitmq,1024程序員

二:分布式消息隊列?

2.1:是消息隊列

概念:存儲消息的隊列

關(guān)鍵詞:存儲,消息,隊列

存儲:存儲數(shù)據(jù)

消息:某種數(shù)據(jù)結(jié)構(gòu),比如l字符串,對象,二進制數(shù)據(jù),json等

隊列:先進先出的數(shù)據(jù)結(jié)構(gòu)

作用:在不同的系統(tǒng)下,應(yīng)用之間實現(xiàn)消息的傳輸,不需要考慮傳輸應(yīng)用的編程語言,系統(tǒng)和,框架等等,實現(xiàn)應(yīng)用解耦的作用。

? ? ? ? eg:可以讓Java開發(fā)的應(yīng)用發(fā)消息,讓php開發(fā)的應(yīng)用收消息。

分布式消息隊列:RabbitMQ(1),消息中間件,分布式,rabbitmq,1024程序員?針對生產(chǎn)者來說:不需要關(guān)心消費者什么時候接受消息,什么時候消費,我只需要把我的工作完成就好了。生產(chǎn)者和消費者之間實現(xiàn)了解耦。

分布式消息隊列:RabbitMQ(1),消息中間件,分布式,rabbitmq,1024程序員

針對上圖,同樣我們會發(fā)現(xiàn),當(dāng)小李要別的書籍的時候,小王也可以將別的書籍放到消息隊列中。生產(chǎn)者和消費者從某一種程度上實現(xiàn)了解耦合。

2.1.1:消息隊列的優(yōu)勢

2.1.1.1:異步處理化

生產(chǎn)者發(fā)送消息之后,可以繼續(xù)去忙別的,消費者什么時候消費都可以,不產(chǎn)生阻塞。

2.1.1.2:削峰填谷

先把用戶的請求放到消息隊列種,消費者(實際執(zhí)行操作的應(yīng)用)可以按照自己的需求,慢慢去取。

舉個栗子:

原本:

12點時來了10萬個請求,原本情況下,10萬個請求都在系統(tǒng)內(nèi)部立刻處理,很快系統(tǒng)壓力過大宕機。

現(xiàn)在:

把10萬個請求放到消息隊列中,處理系統(tǒng)以自己的恒定速率(比如每秒1個)慢慢執(zhí)行,穩(wěn)定處理。

2.2:分布式消息隊列

2.2.1:分布式消息隊列的優(yōu)勢

分布式消息隊列繼承于消息隊列的優(yōu)勢,并進行了一部分的拓展。

2.2.1.1:數(shù)據(jù)的持久化

把消息集中存儲在硬盤當(dāng)中,服務(wù)器重啟就不會丟失。

2.2.1.2:可擴展性

可以根據(jù)需求,隨時增加(或減少)節(jié)點,繼續(xù)保持穩(wěn)定的服務(wù)。

2.2.1.3:應(yīng)用解耦

可以連接不同語言(Java,PHP),框架開發(fā)的系統(tǒng),讓這些系統(tǒng)讀取數(shù)據(jù)。

示例:

以前的項目:

分布式消息隊列:RabbitMQ(1),消息中間件,分布式,rabbitmq,1024程序員

加了分布式消息隊列之后的項目:?

1:一個系統(tǒng)掛了,不影響另一個系統(tǒng)。

2:系統(tǒng)掛了之后并恢復(fù),仍然可以從消息隊列中取消息

3:只要發(fā)送消息到隊列,就可以立即進行返回,不用同步調(diào)用所有系統(tǒng),性能更高

分布式消息隊列:RabbitMQ(1),消息中間件,分布式,rabbitmq,1024程序員

2.2.1.4:發(fā)送訂閱?

假設(shè)情景:當(dāng)QQ進行了一部分改革之后,其他使用QQ的APP也應(yīng)該處理
這部分改革。
QQ做了一個情景,要讓其他系統(tǒng)知道,比如公告消息。如果QQ一次性給這些應(yīng)用發(fā)消息,所引出的問題如下:
1.每次發(fā)通知都要調(diào)用很多系統(tǒng),很麻煩,很可能失敗
2.不知道哪個系統(tǒng)需要這些QQ的改革。
解決方案:大的核心系統(tǒng)始終往消息隊列發(fā)消息,其他的系統(tǒng)都去訂閱這個消息隊列的消息,用的時候進行取就OK。

分布式消息隊列:RabbitMQ(1),消息中間件,分布式,rabbitmq,1024程序員

2.2.2:分布式消息隊列的應(yīng)用場景?

1:耗時場景。

2:高并發(fā)場景。

3:分布式系統(tǒng)的協(xié)作。(跨團隊,跨業(yè)務(wù)合作,應(yīng)用解耦)

4:強穩(wěn)定的場景(金融業(yè)務(wù),持久化,可靠性,削鋒填谷)??

三:Rabbitmq

特點:生態(tài)好,易學(xué)習(xí),易于理解,時效性強,支持不同語言的客戶端,擴展性,可用性都很不錯。

3.1:基本概念

AMPQ協(xié)議:Rabbitmq是遵循AMPQ協(xié)議的一種消息中間件。

生產(chǎn)者:發(fā)消息到交換機

消費者:收消息的,從某個隊列中取消息

交換機(exchange):負(fù)責(zé)把消息轉(zhuǎn)發(fā)到對應(yīng)的隊列

隊列(Queue):存儲消息的

路由(Rountes):轉(zhuǎn)發(fā),怎么把一個消息從一個地方轉(zhuǎn)發(fā)到另一個地方(比如生產(chǎn)者轉(zhuǎn)發(fā)到某個隊列)

Rabbitmq:端口占用? ?5672:程序連接的端口 15672:管理界面端口

分布式消息隊列:RabbitMQ(1),消息中間件,分布式,rabbitmq,1024程序員

Rabbitmq的安裝:https://blog.csdn.net/qq_25919879/article/details/113055350

管理器頁面打不開:

分布式消息隊列:RabbitMQ(1),消息中間件,分布式,rabbitmq,1024程序員

3.2:快速入門?

3.2.1:引入消息隊列Java客戶端

<dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>5.17.0</version>
 </dependency>

3.2.2:單消費開發(fā)生產(chǎn)者和消費者

生產(chǎn)者端代碼:

public class SingeProducer {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        //創(chuàng)建連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             //頻道相當(dāng)于客戶端(jdbcClient,redisClient),提供了和消隊列server建立通信,程序通過channel進行發(fā)送消息
             Channel channel = connection.createChannel()) {
            //創(chuàng)建消息隊列,第二個參數(shù)(durable):是否開啟持久化,第三個參數(shù)exclusiove:是否允許當(dāng)前這個創(chuàng)建消息隊列的
            //連接操作消息隊列 第四個參數(shù):沒有人使用隊列,是否需要刪除
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //發(fā)送消息
            String message = "Hello World!";
            channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));
            System.out.println(" [x] Sent '" + message + "'");
        }
    }
}

消費者代碼:

public class SingeConsumer {

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        //創(chuàng)建連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        //創(chuàng)建頻道,提供通信
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        //如何處理消息
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
    }
}

?3.2.3:多消費開發(fā)生產(chǎn)者和消費者

場景:一個生產(chǎn)者給隊列里面發(fā)了一條消息,多個消費者進行消費。適用于多個機器同時去接收并處理任務(wù)(每個機器處理任務(wù)有限)

分布式消息隊列:RabbitMQ(1),消息中間件,分布式,rabbitmq,1024程序員

隊列持久化:

durable:

參數(shù)設(shè)置為true,服務(wù)器隊列不丟失

 channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

?消息持久化:

?指定MessageProperties.PERSISTENY_TEXT_PLAIN參數(shù)

    channel.basicPublish("", TASK_QUEUE_NAME,
                    MessageProperties.PERSISTENT_TEXT_PLAIN,
                    message.getBytes("UTF-8"));

生產(chǎn)者端代碼:

public class MultiProducer {
    private static final String TASK_QUEUE_NAME = "multi_queue";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
            Scanner scanner=new Scanner(System.in);
            while(scanner.hasNext()){
                String message = scanner.nextLine();
                channel.basicPublish("", TASK_QUEUE_NAME,
                        MessageProperties.PERSISTENT_TEXT_PLAIN,
                        message.getBytes("UTF-8"));
                System.out.println(" [x] Sent '" + message + "'");
            }

        }
    }
}

分布式消息隊列:RabbitMQ(1),消息中間件,分布式,rabbitmq,1024程序員

分布式消息隊列:RabbitMQ(1),消息中間件,分布式,rabbitmq,1024程序員

消費者代碼:?

在消費者代碼中,如何測驗一個消費者只能取一個任務(wù),我們利用for循環(huán)來進行解決。

指定確認(rèn)某條消息:

第一個參數(shù):獲取消息的信息

第二個參數(shù):如果是true,把所有的歷史消息全都確認(rèn)了。如果為false,取出當(dāng)前的消息。

   //第二個參數(shù):是否一次性取所有的消息。如果為true,則要取所有的擠壓在消息隊列中的消息
   //如果為false,則為一次性取一個消息
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

指定拒絕某條消息

第一個參數(shù):獲取消息的信息

第二個參數(shù):如果是true,則代表是否要拒絕所有的歷史消息。

第三個參數(shù):如果是false, 則代表失敗的任務(wù)是否要重新入隊。

  channel.basicNack(delivery.getEnvelope().getDeliveryTag(),false,false);
public class MultiConsumer {
    private static final String TASK_QUEUE_NAME = "multi_queue";

    public static void main(String[] argv) throws Exception {
        //建立連接
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        final Connection connection = factory.newConnection();
        for (int i = 0; i <= 2; i++) {
            final Channel channel = connection.createChannel();
            int finalI=i;
            //聲明隊列
            channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
            //控制單個消費者的任務(wù)積壓數(shù):每個消費者最多處理一個任務(wù),每個消費者智能處理一個任務(wù)
            channel.basicQos(1);
            //處理從隊列中取的的消息
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                try {
                    //處理工作
                    System.out.println(" [x] Received '" +"編號:"+finalI+ message + "'");

                    //停20秒模擬一個機器處理工作能力有限
                    Thread.sleep(20000);
                    //第二個參數(shù):是否一次性取所有的消息。如果為true,則要取所有的擠壓在消息隊列中的消息
                    //如果為false,則為一次性取一個消息
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                } finally {
                    System.out.println(" [x] Done");
                    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                }
            };
            //開啟消費監(jiān)聽
            channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });

        }

    }

}

3.3.3:交換機

一個生產(chǎn)者給多個隊列發(fā)消息,一個生產(chǎn)者對多個隊列。交換機:轉(zhuǎn)發(fā)功能,怎么把消息轉(zhuǎn)發(fā)到不同的隊列上。

分布式消息隊列:RabbitMQ(1),消息中間件,分布式,rabbitmq,1024程序員

3.3.3.1:交換機的類別
a):fanout

場景:很適用于發(fā)布訂閱的場景。

特點:消息會被轉(zhuǎn)發(fā)到所有綁定到交換機的隊列。

生產(chǎn)者代碼:當(dāng)生產(chǎn)者發(fā)送消息后,由交換機放到消息隊列中,消費者從消息隊列中取。

public class FonoutProducer {

        private static final String EXCHANGE_NAME = "1";

        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            try (Connection connection = factory.newConnection();
                 Channel channel = connection.createChannel()) {
                //創(chuàng)建交換機
                channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
                Scanner scanner=new Scanner(System.in);
                while(scanner.hasNext()){
                    String message = scanner.nextLine();
                    channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
                    System.out.println(" [x] Sent '" + message + "'");
                }

            }
        }

}

消費者代碼:

public class FonoutConsumer {


    private static final String EXCHANGE_NAME = "1";
    public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
            Channel channel2= connection.createChannel();
            //聲明交換機
            //創(chuàng)建隊列,隨機分配一個隊列名稱
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
            String queueName="xiaowang";
            channel.queueDeclare(queueName,true,false,false,null);
            channel.queueBind(queueName, EXCHANGE_NAME, "");

            channel2.exchangeDeclare(EXCHANGE_NAME, "fanout");
            String queueName2="xiaoli";
            channel2.queueDeclare(queueName2,true,false,false,null);
            channel2.queueBind(queueName2,EXCHANGE_NAME,"");

            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

            DeliverCallback deliverCallback1 = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [小王] Received '" + message + "'");
            };
            DeliverCallback deliverCallback2 = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [小李] Received '" + message + "'");
            };
            channel.basicConsume(queueName, true, deliverCallback1, consumerTag -> { });
            channel.basicConsume(queueName2, true, deliverCallback2, consumerTag -> { });
        }
    }

運行結(jié)果:

分布式消息隊列:RabbitMQ(1),消息中間件,分布式,rabbitmq,1024程序員

分布式消息隊列:RabbitMQ(1),消息中間件,分布式,rabbitmq,1024程序員文章來源地址http://www.zghlxwxcb.cn/news/detail-715747.html

到了這里,關(guān)于分布式消息隊列:RabbitMQ(1)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • 分布式消息隊列:Kafka vs RabbitMQ vs ActiveMQ

    在現(xiàn)代分布式系統(tǒng)中,消息隊列是一種常見的異步通信模式,它可以幫助系統(tǒng)處理高并發(fā)、高可用性以及容錯等問題。在這篇文章中,我們將深入探討三種流行的分布式消息隊列:Apache Kafka、RabbitMQ和ActiveMQ。我們將討論它們的核心概念、算法原理、特點以及使用場景。 隨著

    2024年02月02日
    瀏覽(19)
  • 分布式搜索引擎(Elastic Search)+消息隊列(RabbitMQ)部署(商城4)

    分布式搜索引擎(Elastic Search)+消息隊列(RabbitMQ)部署(商城4)

    1、全文搜索 Elastic search可以用于實現(xiàn)全文搜索功能,例如商城中對商品搜索、搜索、分類搜索、訂單搜索、客戶搜索等。它支持復(fù)雜的查詢語句、中文分詞、近似搜索等功能,可以快速地搜索并返回匹配的結(jié)果。 2、日志分析 Elastic search可以用于實現(xiàn)實時日志分析,例

    2024年02月04日
    瀏覽(21)
  • RabbitMQ 消息中間件 消息隊列

    RabbitMQ 消息中間件 消息隊列

    RabbitMQ 1、RabbitMQ簡介 RabbiMQ是?Erang開發(fā)的,集群?常?便,因為Erlang天?就是??分布式語?,但其本身并 不?持負(fù)載均衡。支持高并發(fā),支持可擴展。支持AJAX,持久化,用于在分布式系統(tǒng)中存儲轉(zhuǎn)發(fā)消息,在易用性、擴展性、高可用性等方面表現(xiàn)不俗。 2、RabbitMQ 特點 可

    2024年02月03日
    瀏覽(93)
  • 中間件RabbitMQ消息隊列介紹

    中間件RabbitMQ消息隊列介紹

    1.1 什么是 MQ MQ ( message queue ),從字面意思上看,本質(zhì)是個隊列, FIFO 先入先出,只不過隊列中存放的內(nèi)容是 message 而已,還是一種跨進程的通信機制,用于上下游傳遞消息。在互聯(lián)網(wǎng)架構(gòu)中, MQ 是一種非常常 見的上下游 邏輯解耦+物理解耦 的消息通信服務(wù)。使用了 MQ 之

    2024年02月13日
    瀏覽(118)
  • 消息隊列中間件(二)- RabbitMQ(一)

    消息隊列中間件(二)- RabbitMQ(一)

    接收,存儲,轉(zhuǎn)發(fā)消息 生產(chǎn)者 交換機 隊列 消費者 簡單模式 工作模式 發(fā)布 路由模式 主題模式 發(fā)布訂閱模式 Broker 接收和分發(fā)消息的應(yīng)用 Virtual host 虛擬分組 Connection: TCP連接 Channel: 節(jié)省連接,每次訪問建立一次Connection消耗太大,所以使用信道代替連接 交換機 隊列 www.r

    2024年02月11日
    瀏覽(94)
  • 「中間件」rabbitmq 消息隊列基礎(chǔ)知識

    RabbitMQ是一個消息隊列軟件,用于在應(yīng)用程序之間轉(zhuǎn)發(fā)消息。以下是RabbitMQ的基本概念: 消息:RabbitMQ中的消息是傳遞的基本單位,它由消息頭和消息體組成。 隊列(Queue):隊列是消息的緩沖區(qū),用于存儲待處理的消息。 交換器(Exchange):交換器是接收生產(chǎn)者發(fā)送的消息并

    2024年02月07日
    瀏覽(97)
  • 微服務(wù)中間件--分布式事務(wù)

    微服務(wù)中間件--分布式事務(wù)

    1) CAP定理 分布式系統(tǒng)有三個指標(biāo): Consistency(一致性): 用戶訪問分布式系統(tǒng)中的任意節(jié)點,得到的數(shù)據(jù)必須一致 Availability(可用性): 用戶訪問集群中的任意健康節(jié)點,必須能得到響應(yīng),而不是超時或拒絕 Partition tolerance (分區(qū)容錯性) Partition(分區(qū)): 因為網(wǎng)絡(luò)故障或其它

    2024年02月12日
    瀏覽(24)
  • 深入詳解高性能消息隊列中間件 RabbitMQ

    深入詳解高性能消息隊列中間件 RabbitMQ

    ? 目錄 1、引言 2、什么是 RabbitMQ ? 3、RabbitMQ 優(yōu)勢 4、RabbitMQ 整體架構(gòu)剖析 4.1、發(fā)送消息流程 4.2、消費消息流程 5、RabbitMQ 應(yīng)用 5.1、廣播 5.2、RPC VC++常用功能開發(fā)匯總(專欄文章列表,歡迎訂閱,持續(xù)更新...) https://blog.csdn.net/chenlycly/article/details/124272585 C++軟件異常排查從入

    2024年02月05日
    瀏覽(97)
  • golang分布式中間件之kafka

    Kafka是一個分布式發(fā)布-訂閱消息系統(tǒng),由LinkedIn公司開發(fā)。它被設(shè)計為快速、可靠且具有高吞吐量的數(shù)據(jù)流平臺,旨在處理大量的實時數(shù)據(jù)。Kafka的架構(gòu)是基于發(fā)布-訂閱模型構(gòu)建的,可以支持多個生產(chǎn)者和消費者。 在本文中,我們將討論如何使用Go語言來實現(xiàn)Kafka分布式中間件

    2024年02月07日
    瀏覽(26)
  • 微服務(wù)中間件-分布式緩存Redis

    微服務(wù)中間件-分布式緩存Redis

    – 基于Redis集群解決單機Redis存在的問題 單機的Redis存在四大問題: 1.數(shù)據(jù)丟失問題: Redis是內(nèi)存存儲,服務(wù)重啟可能會丟失數(shù)據(jù) 2.并發(fā)能力問題: 單節(jié)點Redis并發(fā)能力雖然不錯,但也無法滿足如618這樣的高并發(fā)場景 3.故障恢復(fù)問題: 如果Redis宕機,則服務(wù)不可用,需要一種自動

    2024年02月12日
    瀏覽(21)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包