国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

分布式消息隊(duì)列:Rabbitmq(2)

這篇具有很好參考價(jià)值的文章主要介紹了分布式消息隊(duì)列:Rabbitmq(2)。希望對大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

目錄

一:交換機(jī)

1:Direct交換機(jī)

1.1生產(chǎn)者端代碼:

?1.2:消費(fèi)者端代碼:

2:Topic主題交換機(jī)?

2.1:生產(chǎn)者代碼:?

2.2:消費(fèi)者代碼:

?二:核心特性

2.1:消息過期機(jī)制

2.1.1:給隊(duì)列中的全部消息指定過期時(shí)間

2.1.2:給某條消息指定過期時(shí)間?

2.2:死信隊(duì)列


一:交換機(jī)

1:Direct交換機(jī)

分布式消息隊(duì)列:Rabbitmq(2),消息中間件,分布式,1024程序員

分布式消息隊(duì)列:Rabbitmq(2),消息中間件,分布式,1024程序員

綁定:讓交換機(jī)和隊(duì)列進(jìn)行關(guān)聯(lián),可以指定讓交換機(jī)把什么樣的消息發(fā)送給隊(duì)列。

rountingkey:路由鍵,控制消息要發(fā)送哪個(gè)隊(duì)列。

特點(diǎn):根據(jù)路由鍵指定要轉(zhuǎn)發(fā)到指定的隊(duì)列

場景:特定的消息指定給特定的隊(duì)列

1.1生產(chǎn)者端代碼:

我們規(guī)定,通過控制臺(tái)輸入消息和路由,來指定誰完成該任務(wù)。

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.util.Scanner;

public class DirectProducer {


        private static final String EXCHANGE_NAME = "2";

        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            try (Connection connection = factory.newConnection();
                 Channel channel = connection.createChannel()) {
                //創(chuàng)建交換機(jī)的名稱
                channel.exchangeDeclare(EXCHANGE_NAME, "direct");
                Scanner scanner=new Scanner(System.in);
                while(scanner.hasNext()){
                    String userInput=scanner.nextLine();
                    String[] s = userInput.split(" ");
                    if(s.length<1){
                        continue;
                    }
                    //指定路由key
                    String message=s[0];
                    String routingKey=s[1];
                    //發(fā)布消息
                    /*
                      第一個(gè)參數(shù):發(fā)布到哪個(gè)交換機(jī)
                      第二個(gè)參數(shù):路由鍵
                     */
                    channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes("UTF-8"));
                    System.out.println("[x] Sent"+message+"with rounting"+routingKey+" ");
                }


            }
        }
        //..

    }

?1.2:消費(fèi)者端代碼:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.util.Scanner;

public class DirectProducer {


        private static final String EXCHANGE_NAME = "2";

        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            try (Connection connection = factory.newConnection();
                 Channel channel = connection.createChannel()) {
                //創(chuàng)建交換機(jī)的名稱
                channel.exchangeDeclare(EXCHANGE_NAME, "direct");
                Scanner scanner=new Scanner(System.in);
                while(scanner.hasNext()){
                    String userInput=scanner.nextLine();
                    String[] s = userInput.split(" ");
                    if(s.length<1){
                        continue;
                    }
                    //指定路由key
                    String message=s[0];
                    String routingKey=s[1];
                    //發(fā)布消息
                    /*
                      第一個(gè)參數(shù):發(fā)布到哪個(gè)交換機(jī)
                      第二個(gè)參數(shù):路由鍵
                     */
                    channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes("UTF-8"));
                    System.out.println("[x] Sent"+message+"with rounting"+routingKey+" ");
                }


            }
        }
        //..

    }

運(yùn)行結(jié)果:

分布式消息隊(duì)列:Rabbitmq(2),消息中間件,分布式,1024程序員

分布式消息隊(duì)列:Rabbitmq(2),消息中間件,分布式,1024程序員

2:Topic主題交換機(jī)?

特點(diǎn):消息會(huì)根據(jù)一個(gè)模糊的路由鍵轉(zhuǎn)發(fā)到指定的隊(duì)列中。

場景:特定的一類消息只交給特定的一類系統(tǒng)(程序來處理)。

綁定關(guān)系:模糊匹配消息隊(duì)列? *:匹配一個(gè)單詞? ? ? ?#:匹配0個(gè)或多個(gè)單詞

分布式消息隊(duì)列:Rabbitmq(2),消息中間件,分布式,1024程序員

分布式消息隊(duì)列:Rabbitmq(2),消息中間件,分布式,1024程序員

2.1:生產(chǎn)者代碼:?

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.util.Scanner;

public class TopicProducer {
    private static final String EXCHANGE_NAME = "3";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {

            channel.exchangeDeclare(EXCHANGE_NAME, "topic");
            Scanner scanner=new Scanner(System.in);
            while(scanner.hasNext()){
                String userInput=scanner.nextLine();
                String[] s = userInput.split(" ");
                if(s.length<1){
                    continue;
                }
                //指定路由key
                String message=s[0];
                String routingKey=s[1];
                //發(fā)布消息
                    /*
                      第一個(gè)參數(shù):發(fā)布到哪個(gè)交換機(jī)
                      第二個(gè)參數(shù):路由鍵
                     */
                channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes("UTF-8"));
                System.out.println("[x] Sent"+message+"with rounting"+routingKey+" ");
            }

        }
    }
}

2.2:消費(fèi)者代碼:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;



public class TopicConsumer {
    private static final String EXCHANGE_NAME = "3";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();

        channel.exchangeDeclare(EXCHANGE_NAME, "topic");
        //創(chuàng)建消息隊(duì)列
        String queueName="fronted_queue";
        channel.queueDeclare(queueName,true,false,false,null);
        channel.queueBind(queueName,EXCHANGE_NAME,"#.前端.#");
        String queueName2="backed-_queue";
        channel.queueDeclare(queueName2,true,false,false,null);
        channel.queueBind(queueName2,EXCHANGE_NAME,"#.后端.#");
        String queueName3="product_queue";
        channel.queueDeclare(queueName3,true,false,false,null);
        channel.queueBind(queueName3,EXCHANGE_NAME,"#.產(chǎn)品.#");
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

        DeliverCallback deliverCallback1 = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [前端] Received '" +
                    delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };
        DeliverCallback deliverCallback2 = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [后端] Received '" +
                    delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };
        DeliverCallback deliverCallback3 = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [產(chǎn)品] Received '" +
                    delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
        };
        channel.basicConsume(queueName, true, deliverCallback1, consumerTag -> { });
        channel.basicConsume(queueName2, true, deliverCallback2, consumerTag -> { });
        channel.basicConsume(queueName3, true, deliverCallback3, consumerTag -> { });
    }

}

運(yùn)行結(jié)果:

分布式消息隊(duì)列:Rabbitmq(2),消息中間件,分布式,1024程序員

分布式消息隊(duì)列:Rabbitmq(2),消息中間件,分布式,1024程序員

?二:核心特性

2.1:消息過期機(jī)制

特點(diǎn):給每條消息指定一個(gè)有效期,一段時(shí)間內(nèi)未被消費(fèi),就過期了。

2.1.1:給隊(duì)列中的全部消息指定過期時(shí)間

在消費(fèi)者中對于隊(duì)列的全部消息指定過期時(shí)間,如果在過期時(shí)間內(nèi),還沒有消費(fèi)者取消息,消息才會(huì)過期,如果消息已經(jīng)接收到,但是沒確認(rèn),是不會(huì)過期的。

分布式消息隊(duì)列:Rabbitmq(2),消息中間件,分布式,1024程序員

public class TTLConsumer {

    private final static String QUEUE_NAME = "ttl_queue";

    public static void main(String[] argv) throws Exception {
        //創(chuàng)建連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        //創(chuàng)建頻道,提供通信
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        //指定消息隊(duì)列的過期時(shí)間
        Map<String ,Object> args=new HashMap<>();
        args.put("x-message-ttl",5000);
        //args:指定參數(shù)
        channel.queueDeclare(QUEUE_NAME, false, false,false, args);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        //如何處理消息
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(" [x] Received '" + message + "'");
        };
        channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { });
    }
}

2.1.2:給某條消息指定過期時(shí)間?

分布式消息隊(duì)列:Rabbitmq(2),消息中間件,分布式,1024程序員

//在發(fā)送者這邊設(shè)置過期時(shí)間
public class TTLProducer {

    private final static String QUEUE_NAME = "ttl_queue";

    public static void main(String[] argv) throws Exception {
        //創(chuàng)建連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             //頻道相當(dāng)于客戶端(jdbcClient,redisClient),提供了和消隊(duì)列server建立通信,程序通過channel進(jìn)行發(fā)送消息
             Channel channel = connection.createChannel()) {
            //創(chuàng)建消息隊(duì)列,第二個(gè)參數(shù)(durable):是否開啟持久化,第三個(gè)參數(shù)exclusiove:是否允許當(dāng)前這個(gè)創(chuàng)建消息隊(duì)列的
            //連接操作消息隊(duì)列 第四個(gè)參數(shù):沒有人使用隊(duì)列,是否需要?jiǎng)h除
            String message = "Hello World!";
            //給消息指定過期時(shí)間
            AMQP.BasicProperties properties=new AMQP.BasicProperties.Builder()
                    .expiration("1000")
                            .build();

            channel.basicPublish("", QUEUE_NAME, properties, message.getBytes(StandardCharsets.UTF_8));
            System.out.println(" [x] Sent '" + message + "'");
        }
    }

2.2:死信隊(duì)列

為了保證消息的可靠性,比如每條消息都成功消費(fèi),需要提供一個(gè)容錯(cuò)機(jī)制,即失敗的消息怎么處理,相當(dāng)于死信。

死信:過期的消息,拒收的消息,處理失敗的消息,消息隊(duì)列滿了統(tǒng)稱為死信。

死信隊(duì)列:處理死信的隊(duì)列。

死信交換機(jī):給死信隊(duì)列發(fā)送消息的交換機(jī),也存在路由綁定。

分布式消息隊(duì)列:Rabbitmq(2),消息中間件,分布式,1024程序員

a:創(chuàng)建死信交換機(jī)和死信隊(duì)列

   //聲明死信交換機(jī)
            channel.exchangeDeclare(WORK_NAME,"direct");
            //聲明死信隊(duì)列
            String queueName="boss_queue";
            channel.queueDeclare(queueName,true,false,false,null);
            channel.queueBind(queueName,EXCHANGE_Name,"boss");
            String queueName2="waibao_queue";
            channel.queueDeclare(queueName2, false, false, false, null);
            channel.queueBind(queueName2,EXCHANGE_Name,"waibao");

分布式消息隊(duì)列:Rabbitmq(2),消息中間件,分布式,1024程序員

b:給失敗后的需要容錯(cuò)的隊(duì)列綁定死信交換機(jī)文章來源地址http://www.zghlxwxcb.cn/news/detail-715742.html

  //聲明交換機(jī)
        channel.exchangeDeclare(WORK_NAME, "direct");
        Map<String,Object> map=new HashMap<>();
        //聲明要綁定的死信交換機(jī)
        map.put("x-dead-letter-exchange",DEAD_EXCHANGE_NAME);
        //聲明要綁定的死信隊(duì)列

        map.put("x-dead-letter-routing-key","waibao_queue");
        //創(chuàng)建消息隊(duì)列
        String queueName="xiaodog_queue";
        channel.queueDeclare(queueName,true,false,false,map);
        channel.queueBind(queueName,WORK_NAME,"xiaodog");
        Map<String,Object> map2=new HashMap<>();
        //聲明要綁定的死信交換機(jī)
        map2.put("x-dead-letter-exchange",DEAD_EXCHANGE_NAME);
        map2.put("x-dead-letter-routing-key","boss_queue");
        String queueName2="xiaocat_queue";
        channel.queueDeclare(queueName2,true,false,false,map2);
        channel.queueBind(queueName2,WORK_NAME,"xiaocat");

到了這里,關(guān)于分布式消息隊(duì)列:Rabbitmq(2)的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 分布式消息隊(duì)列:Kafka vs RabbitMQ vs ActiveMQ

    在現(xiàn)代分布式系統(tǒng)中,消息隊(duì)列是一種常見的異步通信模式,它可以幫助系統(tǒng)處理高并發(fā)、高可用性以及容錯(cuò)等問題。在這篇文章中,我們將深入探討三種流行的分布式消息隊(duì)列:Apache Kafka、RabbitMQ和ActiveMQ。我們將討論它們的核心概念、算法原理、特點(diǎn)以及使用場景。 隨著

    2024年02月02日
    瀏覽(19)
  • 分布式搜索引擎(Elastic Search)+消息隊(duì)列(RabbitMQ)部署(商城4)

    分布式搜索引擎(Elastic Search)+消息隊(duì)列(RabbitMQ)部署(商城4)

    1、全文搜索 Elastic search可以用于實(shí)現(xiàn)全文搜索功能,例如商城中對商品搜索、搜索、分類搜索、訂單搜索、客戶搜索等。它支持復(fù)雜的查詢語句、中文分詞、近似搜索等功能,可以快速地搜索并返回匹配的結(jié)果。 2、日志分析 Elastic search可以用于實(shí)現(xiàn)實(shí)時(shí)日志分析,例

    2024年02月04日
    瀏覽(21)
  • RabbitMQ 消息中間件 消息隊(duì)列

    RabbitMQ 消息中間件 消息隊(duì)列

    RabbitMQ 1、RabbitMQ簡介 RabbiMQ是?Erang開發(fā)的,集群?常?便,因?yàn)镋rlang天?就是??分布式語?,但其本身并 不?持負(fù)載均衡。支持高并發(fā),支持可擴(kuò)展。支持AJAX,持久化,用于在分布式系統(tǒng)中存儲(chǔ)轉(zhuǎn)發(fā)消息,在易用性、擴(kuò)展性、高可用性等方面表現(xiàn)不俗。 2、RabbitMQ 特點(diǎn) 可

    2024年02月03日
    瀏覽(93)
  • 中間件RabbitMQ消息隊(duì)列介紹

    中間件RabbitMQ消息隊(duì)列介紹

    1.1 什么是 MQ MQ ( message queue ),從字面意思上看,本質(zhì)是個(gè)隊(duì)列, FIFO 先入先出,只不過隊(duì)列中存放的內(nèi)容是 message 而已,還是一種跨進(jìn)程的通信機(jī)制,用于上下游傳遞消息。在互聯(lián)網(wǎng)架構(gòu)中, MQ 是一種非常常 見的上下游 邏輯解耦+物理解耦 的消息通信服務(wù)。使用了 MQ 之

    2024年02月13日
    瀏覽(118)
  • 消息隊(duì)列中間件(二)- RabbitMQ(一)

    消息隊(duì)列中間件(二)- RabbitMQ(一)

    接收,存儲(chǔ),轉(zhuǎn)發(fā)消息 生產(chǎn)者 交換機(jī) 隊(duì)列 消費(fèi)者 簡單模式 工作模式 發(fā)布 路由模式 主題模式 發(fā)布訂閱模式 Broker 接收和分發(fā)消息的應(yīng)用 Virtual host 虛擬分組 Connection: TCP連接 Channel: 節(jié)省連接,每次訪問建立一次Connection消耗太大,所以使用信道代替連接 交換機(jī) 隊(duì)列 www.r

    2024年02月11日
    瀏覽(94)
  • 「中間件」rabbitmq 消息隊(duì)列基礎(chǔ)知識(shí)

    RabbitMQ是一個(gè)消息隊(duì)列軟件,用于在應(yīng)用程序之間轉(zhuǎn)發(fā)消息。以下是RabbitMQ的基本概念: 消息:RabbitMQ中的消息是傳遞的基本單位,它由消息頭和消息體組成。 隊(duì)列(Queue):隊(duì)列是消息的緩沖區(qū),用于存儲(chǔ)待處理的消息。 交換器(Exchange):交換器是接收生產(chǎn)者發(fā)送的消息并

    2024年02月07日
    瀏覽(97)
  • 微服務(wù)中間件--分布式事務(wù)

    微服務(wù)中間件--分布式事務(wù)

    1) CAP定理 分布式系統(tǒng)有三個(gè)指標(biāo): Consistency(一致性): 用戶訪問分布式系統(tǒng)中的任意節(jié)點(diǎn),得到的數(shù)據(jù)必須一致 Availability(可用性): 用戶訪問集群中的任意健康節(jié)點(diǎn),必須能得到響應(yīng),而不是超時(shí)或拒絕 Partition tolerance (分區(qū)容錯(cuò)性) Partition(分區(qū)): 因?yàn)榫W(wǎng)絡(luò)故障或其它

    2024年02月12日
    瀏覽(24)
  • 深入詳解高性能消息隊(duì)列中間件 RabbitMQ

    深入詳解高性能消息隊(duì)列中間件 RabbitMQ

    ? 目錄 1、引言 2、什么是 RabbitMQ ? 3、RabbitMQ 優(yōu)勢 4、RabbitMQ 整體架構(gòu)剖析 4.1、發(fā)送消息流程 4.2、消費(fèi)消息流程 5、RabbitMQ 應(yīng)用 5.1、廣播 5.2、RPC VC++常用功能開發(fā)匯總(專欄文章列表,歡迎訂閱,持續(xù)更新...) https://blog.csdn.net/chenlycly/article/details/124272585 C++軟件異常排查從入

    2024年02月05日
    瀏覽(97)
  • golang分布式中間件之kafka

    Kafka是一個(gè)分布式發(fā)布-訂閱消息系統(tǒng),由LinkedIn公司開發(fā)。它被設(shè)計(jì)為快速、可靠且具有高吞吐量的數(shù)據(jù)流平臺(tái),旨在處理大量的實(shí)時(shí)數(shù)據(jù)。Kafka的架構(gòu)是基于發(fā)布-訂閱模型構(gòu)建的,可以支持多個(gè)生產(chǎn)者和消費(fèi)者。 在本文中,我們將討論如何使用Go語言來實(shí)現(xiàn)Kafka分布式中間件

    2024年02月07日
    瀏覽(26)
  • 微服務(wù)中間件-分布式緩存Redis

    微服務(wù)中間件-分布式緩存Redis

    – 基于Redis集群解決單機(jī)Redis存在的問題 單機(jī)的Redis存在四大問題: 1.數(shù)據(jù)丟失問題: Redis是內(nèi)存存儲(chǔ),服務(wù)重啟可能會(huì)丟失數(shù)據(jù) 2.并發(fā)能力問題: 單節(jié)點(diǎn)Redis并發(fā)能力雖然不錯(cuò),但也無法滿足如618這樣的高并發(fā)場景 3.故障恢復(fù)問題: 如果Redis宕機(jī),則服務(wù)不可用,需要一種自動(dòng)

    2024年02月12日
    瀏覽(21)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包