国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

RabbitMQ詳解(三):消息模式(fanout、direct、topic、work)

這篇具有很好參考價值的文章主要介紹了RabbitMQ詳解(三):消息模式(fanout、direct、topic、work)。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

消費(fèi)模式

參考官網(wǎng):https://www.rabbitmq.com/getstarted.html

  • 簡單模式 Simple, 參考RabbitMQ詳解(二):消息模式 Simple(簡單)模式

    簡單模式是最簡單的消息模式,它包含一個生產(chǎn)者、一個消費(fèi)者和一個隊列。生產(chǎn)者向隊列里發(fā)送消息,消費(fèi)者從隊列中獲取消息并消費(fèi)。

  • 發(fā)布訂閱模式 fanout

    同時向多個消費(fèi)者發(fā)送消息的模式(類似廣播的形式)

  • 路由模式 direct

    根據(jù)路由鍵選擇性給多個消費(fèi)者發(fā)送消息的模式

  • 主題模式 topic

    是direct模式上的一種疊加,增加了模糊路由RoutingKey的模式

  • 工作模式 work

    分發(fā)機(jī)制

消息模式-fanout(發(fā)布訂閱)模式

fanout機(jī)制,中間件,java-rabbitmq,rabbitmq,分布式

  • 類型:fanout
  • 特點:Fanout—發(fā)布與訂閱模式,是一種廣播機(jī)制,它是沒有路由key的模式。

創(chuàng)建交換機(jī)

fanout機(jī)制,中間件,java-rabbitmq,rabbitmq,分布式

注意 type 類型為fanout

fanout機(jī)制,中間件,java-rabbitmq,rabbitmq,分布式

fanout機(jī)制,中間件,java-rabbitmq,rabbitmq,分布式

綁定隊列

  • 圖像化管理頁面新建queue02、queue03隊列

    fanout機(jī)制,中間件,java-rabbitmq,rabbitmq,分布式

  • 點擊交換器后,綁定創(chuàng)建的三個隊列

    fanout機(jī)制,中間件,java-rabbitmq,rabbitmq,分布式

  • 綁定成功后會如圖所示

    fanout機(jī)制,中間件,java-rabbitmq,rabbitmq,分布式

定義生產(chǎn)者

package com.cn.fanout;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * fanout(發(fā)布訂閱) 生產(chǎn)者
 */
public class Producer {

    public static void main(String[] args) {

        //1.創(chuàng)建連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //2.設(shè)置工廠屬性
        factory.setHost("請?zhí)顚懽约旱膇p地址");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setVirtualHost("/");

        Connection connection = null;
        Channel channel = null;
        try {
            //3.從連接工廠中獲取連接
            connection = factory.newConnection("生產(chǎn)者1");
            //4.從連接中獲取通道
            channel = connection.createChannel();
              //5.申請隊列存儲信息,此步驟不需要了,我們手動在圖形管理頁面創(chuàng)建好交換機(jī)及綁定好隊列queue01、queue02、queue03
            //6.準(zhǔn)備發(fā)送消息的內(nèi)容
            String message = "hello,rabbitmq!";
            //7.1.準(zhǔn)備交換機(jī)
            String exchangeName = "fanout-exchange";
            //7.2.定義路由key,fanout模式?jīng)]有routingKey參數(shù)
            String routingKey = "";
            // 7.3: 發(fā)送消息給中間件rabbitmq-server
            /*
             * @params1: 交換機(jī)exchange
             * @params2: 隊列名稱/routingkey
             * @params3: 屬性配置
             * @params4: 發(fā)送消息的內(nèi)容
             */
            channel.basicPublish(exchangeName, routingKey, null, message.getBytes());
            System.out.println("消息發(fā)送成功!");
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("發(fā)送消息出現(xiàn)異常...");
        }  finally {
            // 8: 釋放連接關(guān)閉通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }
}
  • 啟動生產(chǎn)者, 會看到每個隊列都投遞了一條消息

    fanout機(jī)制,中間件,java-rabbitmq,rabbitmq,分布式

定義消費(fèi)者

package com.cn.fanout;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.nio.charset.Charset;

/**
 * fanout(發(fā)布訂閱) 消費(fèi)者
 */
public class Consumer {

    public static Runnable runnable =  new Runnable(){
        @Override
        public void run() {
            //1.創(chuàng)建連接工廠
            ConnectionFactory factory = new ConnectionFactory();
            //2.設(shè)置工廠屬性
            factory.setHost("請?zhí)顚懽约旱膇p地址");
            factory.setPort(5672);
            factory.setUsername("admin");
            factory.setPassword("admin");
            factory.setVirtualHost("/");

            final String queueName = Thread.currentThread().getName();
            Connection connection = null;
            Channel channel = null;
            try {
                //3.從連接工廠中獲取連接
                connection = factory.newConnection("生產(chǎn)者1");
                //4.從連接中獲取通道
                channel = connection.createChannel();
                //5.接收消息
                channel.basicConsume(queueName, true, new DeliverCallback() {
                    public void handle(String s, Delivery delivery) throws IOException {
                        System.out.println(queueName + "收到消息是:" + new String(delivery.getBody(), Charset.defaultCharset()));
                    }
                }, new CancelCallback() {
                    public void handle(String s) throws IOException {
                        System.out.println("接收消息失敗了...");
                    }
                });
                System.out.println(queueName + "開始接收消息 ");
                System.in.read();
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                // 6: 釋放連接關(guān)閉通道
                if (channel != null && channel.isOpen()) {
                    try {
                        channel.close();
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                }
                if (connection != null) {
                    try {
                        connection.close();
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                }
            }
        }
    };


    public static void main(String[] args) {
        // 啟動三個線程去執(zhí)行
        new Thread(runnable, "queue01").start();
        new Thread(runnable, "queue02").start();
        new Thread(runnable, "queue03").start();
    }

}
  • 啟動消費(fèi)者,會看到隊列中消息已經(jīng)被消費(fèi)

    fanout機(jī)制,中間件,java-rabbitmq,rabbitmq,分布式

  • 查看控制臺打印日志
    fanout機(jī)制,中間件,java-rabbitmq,rabbitmq,分布式

消費(fèi)模式-Direct(路由)模式

fanout機(jī)制,中間件,java-rabbitmq,rabbitmq,分布式

  • 類型:direct
  • 特點:Direct模式是fanout模式上的一種疊加,增加了路由RoutingKey的模式。

創(chuàng)建交換機(jī)

fanout機(jī)制,中間件,java-rabbitmq,rabbitmq,分布式

綁定隊列

fanout機(jī)制,中間件,java-rabbitmq,rabbitmq,分布式

定義生產(chǎn)者

package com.cn.direct;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * direct(路由) 生產(chǎn)者
 */
public class Producer {

    public static void main(String[] args) {

        //1.創(chuàng)建連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //2.設(shè)置工廠屬性
        factory.setHost("請?zhí)顚懽约旱膇p地址");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setVirtualHost("/");

        Connection connection = null;
        Channel channel = null;
        try {
            //3.從連接工廠中獲取連接
            connection = factory.newConnection("生產(chǎn)者1");
            //4.從連接中獲取通道
            channel = connection.createChannel();
            //5.申請隊列存儲信息,此步驟不需要了,我們手動在圖形管理頁面創(chuàng)建好交換機(jī)及綁定好隊列queue01、queue02、queue03
            //6.準(zhǔn)備發(fā)送消息的內(nèi)容
            String message = "hello,rabbitmq,direct!";
            //7.1.準(zhǔn)備交換機(jī)
            String exchangeName = "direct-exchange";
            //7.2.定義路由key, direct需要增加routingKey1參數(shù)
            String routingKey1 = "email";
//            String routingKey2 = "sms";
            // 7.3: 發(fā)送消息給中間件rabbitmq-server
            /*
             * @params1: 交換機(jī)exchange
             * @params2: 隊列名稱/routingkey
             * @params3: 屬性配置
             * @params4: 發(fā)送消息的內(nèi)容
             */
            channel.basicPublish(exchangeName, routingKey1, null, message.getBytes());
//            channel.basicPublish(exchangeName, routingKey2, null, message.getBytes());
            System.out.println("消息發(fā)送成功!");
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("發(fā)送消息出現(xiàn)異常...");
        }  finally {
            // 8: 釋放連接關(guān)閉通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }
}
  • 啟動生產(chǎn)者, 會看到只有quque01隊列投遞了一條消息

    fanout機(jī)制,中間件,java-rabbitmq,rabbitmq,分布式

  • 因為我們的routingKey指定為email,綁定的隊列信息如下,所有只有queue01接收到了消息

    fanout機(jī)制,中間件,java-rabbitmq,rabbitmq,分布式

定義消費(fèi)者

//同fanout模式消費(fèi)者代碼相同 
  • 啟動消費(fèi)者,會看到隊列中消息已經(jīng)被消費(fèi)

    fanout機(jī)制,中間件,java-rabbitmq,rabbitmq,分布式

  • 查看控制臺打印日志

    fanout機(jī)制,中間件,java-rabbitmq,rabbitmq,分布式

消費(fèi)模式-Topic(主題)模式

fanout機(jī)制,中間件,java-rabbitmq,rabbitmq,分布式

  • 類型:topic
  • 特點:Topic模式是direct模式上的一種疊加,增加了模糊路由RoutingKey的模式。
  • “#” : 匹配一個或者多個
    “**”:匹配一個*

創(chuàng)建交換機(jī)

fanout機(jī)制,中間件,java-rabbitmq,rabbitmq,分布式

綁定隊列

fanout機(jī)制,中間件,java-rabbitmq,rabbitmq,分布式

定義生產(chǎn)者

package com.cn.topic;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * topic(主題) 生產(chǎn)者
 */
public class Producer {

    public static void main(String[] args) {

        //1.創(chuàng)建連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //2.設(shè)置工廠屬性
        factory.setHost("請?zhí)顚懽约旱膇p地址");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setVirtualHost("/");

        Connection connection = null;
        Channel channel = null;
        try {
            //3.從連接工廠中獲取連接
            connection = factory.newConnection("生產(chǎn)者1");
            //4.從連接中獲取通道
            channel = connection.createChannel();
            //5.申請隊列存儲信息,此步驟不需要了,我們手動在圖形管理頁面創(chuàng)建好交換機(jī)及綁定好隊列queue01、queue02、queue03
            //6.準(zhǔn)備發(fā)送消息的內(nèi)容
            String message = "hello,rabbitmq,topic!";
            //7.1.準(zhǔn)備交換機(jī)
            String exchangeName = "topic-exchange";
            //7.2.定義路由key, 模糊匹配
            String routingKey1 = "com.order.xxx";
            // 7.3: 發(fā)送消息給中間件rabbitmq-server
            /*
             * @params1: 交換機(jī)exchange
             * @params2: 隊列名稱/routingkey
             * @params3: 屬性配置
             * @params4: 發(fā)送消息的內(nèi)容
             */
            channel.basicPublish(exchangeName, routingKey1, null, message.getBytes());
            System.out.println("消息發(fā)送成功!");
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("發(fā)送消息出現(xiàn)異常...");
        }  finally {
            // 8: 釋放連接關(guān)閉通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }
}
  • 啟動生產(chǎn)者, 會看到quque01、queue02隊列分別投遞了一條消息

    fanout機(jī)制,中間件,java-rabbitmq,rabbitmq,分布式

    • 因為我們的routingKey指定為com.order.xxx,綁定的隊列信息如下,所有queue01、queue02接收到了消息

      fanout機(jī)制,中間件,java-rabbitmq,rabbitmq,分布式

定義消費(fèi)者

//同fanout模式消費(fèi)者代碼相同
  • 啟動消費(fèi)者,會看到隊列中消息已經(jīng)被消費(fèi)

    fanout機(jī)制,中間件,java-rabbitmq,rabbitmq,分布式

  • 查看控制臺打印日志

    fanout機(jī)制,中間件,java-rabbitmq,rabbitmq,分布式

完整的聲明創(chuàng)建方式

上面操作的案例 我們都是在管理頁面端進(jìn)行交換機(jī)的創(chuàng)建以及綁定,現(xiàn)在我們使用純代碼的方式進(jìn)行操作

定義生產(chǎn)者

package com.cn.all;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * 完整 生產(chǎn)者
 */
public class Producer {

    public static void main(String[] args) {

        //1.創(chuàng)建連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //2.設(shè)置工廠屬性
        factory.setHost("請?zhí)顚懽约旱膇p地址");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setVirtualHost("/");

        Connection connection = null;
        Channel channel = null;
        try {
            //3.從連接工廠中獲取連接
            connection = factory.newConnection("生產(chǎn)者1");
            //4.從連接中獲取通道
            channel = connection.createChannel();
            //5.準(zhǔn)備發(fā)送消息的內(nèi)容
            String message = "hello,rabbitmq,all!";
            //6.1.準(zhǔn)備交換機(jī)
            String exchangeName = "direct-message-exchange";
            //6.2.交換機(jī)類型
            String exchangeType = "direct";
            //6.3.聲明交換機(jī)(是否持久化,true代表交換機(jī)不會隨著服務(wù)器重啟丟失)
            channel.exchangeDeclare(exchangeName,exchangeType,true);
            //7.聲明隊列
            channel.queueDeclare("queue04", true, false ,false, null);
            channel.queueDeclare("queue05", true, false ,false, null);
            channel.queueDeclare("queue06", true, false ,false, null);
            //8.定義路由key
            String routingKey1 = "order";
            String routingKey2 = "course";
            //9.隊列和交換機(jī)進(jìn)行綁定
            channel.queueBind("queue04", exchangeName, routingKey1);
            channel.queueBind("queue05", exchangeName, routingKey1);
            channel.queueBind("queue06", exchangeName, routingKey2);
            //10: 發(fā)送消息給中間件rabbitmq-server
            /*
             * @params1: 交換機(jī)exchange
             * @params2: 隊列名稱/routingkey
             * @params3: 屬性配置
             * @params4: 發(fā)送消息的內(nèi)容
             */
            channel.basicPublish(exchangeName, routingKey1, null, message.getBytes());
            System.out.println("消息發(fā)送成功!");
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("發(fā)送消息出現(xiàn)異常...");
        }  finally {
            // 8: 釋放連接關(guān)閉通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }
}
  • 啟動生產(chǎn)者, 會看到交換機(jī)和隊列都已創(chuàng)建好,并且已經(jīng)互相綁定好

    fanout機(jī)制,中間件,java-rabbitmq,rabbitmq,分布式

    fanout機(jī)制,中間件,java-rabbitmq,rabbitmq,分布式

定義消費(fèi)者

同fanout模式消費(fèi)者代碼相同
  • 啟動消費(fèi)者,會看到隊列中消息已經(jīng)被消費(fèi)

    fanout機(jī)制,中間件,java-rabbitmq,rabbitmq,分布式

  • 查看控制臺打印日志

    fanout機(jī)制,中間件,java-rabbitmq,rabbitmq,分布式

消費(fèi)模式-Work(工作)模式

fanout機(jī)制,中間件,java-rabbitmq,rabbitmq,分布式

當(dāng)有多個消費(fèi)者時,我們的消費(fèi)會被哪個消費(fèi)者消費(fèi)呢?我們該如何均衡消費(fèi)者消費(fèi)信息的多少呢?

  • 輪詢模式:一個消費(fèi)者一條,按均分發(fā)
  • 公平分發(fā): 根據(jù)消費(fèi)者消費(fèi)能力進(jìn)行公平分發(fā),處理快的處理的快,處理慢的處理的少,按勞分配

輪詢模式

  • 類型:無
  • 特點:該模式接收消息是當(dāng)有多個消費(fèi)者接入時,消息的分配模式是一個消費(fèi)者分配一條,直至消息消費(fèi)完成;

定義生產(chǎn)者

package com.cn.work.roundrobin;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Producer {

    public static void main(String[] args) {

        //1.創(chuàng)建連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //2.設(shè)置工廠屬性
        factory.setHost("請?zhí)顚懽约旱膇p地址");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setVirtualHost("/");

        Connection connection = null;
        Channel channel = null;
        try {
            //3.從連接工廠中獲取連接
            connection = factory.newConnection("生產(chǎn)者7");
            //4.從連接中獲取通道
            channel = connection.createChannel();
            //5.申請隊列存儲信息
            /*
             *  如果隊列不存在,則會創(chuàng)建
             *  Rabbitmq不允許創(chuàng)建兩個相同的隊列名稱,否則會報錯。
             *
             *  @params1: queue 隊列的名稱
             *  @params2: durable 隊列是否持久化
             *  @params3: exclusive 是否排他,即是否私有的,如果為true,會對當(dāng)前隊列加鎖,其他的通道不能訪問,并且連接自動關(guān)閉
             *  @params4: autoDelete 是否自動刪除,當(dāng)最后一個消費(fèi)者斷開連接之后是否自動刪除消息。
             *  @params5: arguments 可以設(shè)置隊列附加參數(shù),設(shè)置隊列的有效期,消息的最大長度,隊列的消息生命周期等等。
             */
            channel.queueDeclare("queue07", true ,false,false, null);
            //6.準(zhǔn)備發(fā)送消息的內(nèi)容
            for (int i = 0; i < 20; i++) {
                String message = "hello,rabbitmq,work!" + i;
                // 7: 發(fā)送消息給中間件rabbitmq-server
                /*
                 * @params1: 交換機(jī)exchange
                 * @params2: 隊列名稱/routing
                 * @params3: 屬性配置
                 * @params4: 發(fā)送消息的內(nèi)容
                 */
                channel.basicPublish("", "queue07", null, message.getBytes());
            }
            System.out.println("消息發(fā)送成功!");
        } catch (Exception e) {
            e.printStackTrace();
            System.out.println("發(fā)送消息出現(xiàn)異常...");
        }  finally {
            // 8: 釋放連接關(guān)閉通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }
}

定義消費(fèi)者1

package com.cn.work.roundrobin;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.nio.charset.Charset;

public class Consumer1 {

    public static void main(String[] args) {

        //1.創(chuàng)建連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //2.設(shè)置工廠屬性
        factory.setHost("請?zhí)顚懽约旱膇p地址");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setVirtualHost("/");

        Connection connection = null;
        Channel channel = null;
        try {
            //3.從連接工廠中獲取連接
            connection = factory.newConnection("消費(fèi)者1");
            //4.從連接中獲取通道
            channel = connection.createChannel();
            //5.接收消息(應(yīng)答機(jī)制參數(shù)為true  自動應(yīng)答)
            channel.basicConsume("queue07", true, new DeliverCallback() {
                public void handle(String s, Delivery delivery) throws IOException {
                    System.out.println("Consumer1收到消息是:" + new String(delivery.getBody(), Charset.defaultCharset()));
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, new CancelCallback() {
                public void handle(String s) throws IOException {
                    System.out.println("Consumer1接收消息失敗了...");
                }
            });
            System.out.println("Consumer1開始接收消息");
            System.in.read();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 6: 釋放連接關(guān)閉通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }
}

定義消費(fèi)者2

同上,名稱稍修改即可
				    try {
                        Thread.sleep(200);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }

先在管理頁面創(chuàng)建好隊列queue,然后啟動消費(fèi)者1和2,最后啟動生產(chǎn)者看頁面日志

消費(fèi)者1和消費(fèi)者2

fanout機(jī)制,中間件,java-rabbitmq,rabbitmq,分布式
fanout機(jī)制,中間件,java-rabbitmq,rabbitmq,分布式

work1和work2的消息處理能力不同,但是最后處理的消息條數(shù)相同,是“按均分配”。

公平分發(fā)

  • 類型:無
  • 特點:由于消息接收者處理消息的能力不同,存在處理快慢的問題,我們就需要能者多勞,處理快的多處理,處理慢的少處理;

定義生產(chǎn)者

//同上輪詢模式的生產(chǎn)者代碼相同

定義消費(fèi)者1

注意:

  • //設(shè)置消費(fèi)消息指標(biāo)

    finalChannel.basicQos(1);

  • finalChannel.basicConsume(“queue1”, false, new DeliverCallback() { … })

  • //修改為手動應(yīng)答
    finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);

package com.cn.work.fairdispatch;

import com.rabbitmq.client.*;

import java.io.IOException;
import java.nio.charset.Charset;

public class Consumer1 {

    public static void main(String[] args) {

        //1.創(chuàng)建連接工廠
        ConnectionFactory factory = new ConnectionFactory();
        //2.設(shè)置工廠屬性
        factory.setHost("請?zhí)顚懽约旱膇p地址");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("admin");
        factory.setVirtualHost("/");

        Connection connection = null;
        Channel channel = null;
        try {
            //3.從連接工廠中獲取連接
            connection = factory.newConnection("消費(fèi)者1");
            //4.從連接中獲取通道
            channel = connection.createChannel();
            //5.接收消息(應(yīng)答機(jī)制參數(shù)為false  手動應(yīng)答)
            final Channel finalChannel = channel;
            finalChannel.basicQos(1);
            finalChannel.basicConsume("queue07", false, new DeliverCallback() {
                public void handle(String s, Delivery delivery) throws IOException {
                    System.out.println("Consumer1收到消息是:" + new String(delivery.getBody(), Charset.defaultCharset()));
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    //修改為手動應(yīng)答
                    finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
                }
            }, new CancelCallback() {
                public void handle(String s) throws IOException {
                    System.out.println("Consumer1接收消息失敗了...");
                }
            });
            System.out.println("Consumer1開始接收消息");
            System.in.read();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 6: 釋放連接關(guān)閉通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
            if (connection != null) {
                try {
                    connection.close();
                } catch (Exception ex) {
                    ex.printStackTrace();
                }
            }
        }
    }
}

定義消費(fèi)者2

同上,名稱稍修改即可

先在管理頁面創(chuàng)建好隊列queue,然后啟動消費(fèi)者1和2,最后啟動生產(chǎn)者看頁面日志

消費(fèi)者1和消費(fèi)者2

fanout機(jī)制,中間件,java-rabbitmq,rabbitmq,分布式

fanout機(jī)制,中間件,java-rabbitmq,rabbitmq,分布式文章來源地址http://www.zghlxwxcb.cn/news/detail-696071.html

小結(jié)

  • 消費(fèi)者一次接收一條消息,代碼channel.BasicQos(0, 1, false);
  • 公平分發(fā)需要消費(fèi)者開啟手動應(yīng)答,關(guān)閉自動應(yīng)答
  • 關(guān)閉自動應(yīng)答代碼channel.BasicConsume(“queue_test”, false, consumer);
  • 消費(fèi)者開啟手動應(yīng)答代碼:channel.BasicAck(ea.DeliveryTag, false);

到了這里,關(guān)于RabbitMQ詳解(三):消息模式(fanout、direct、topic、work)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • RabbitMQ入門 消息隊列快速入門 SpringAMQP WorkQueue 隊列和交換機(jī) Fanout Direct exchange RAbbitMQ單體部署

    RabbitMQ入門 消息隊列快速入門 SpringAMQP WorkQueue 隊列和交換機(jī) Fanout Direct exchange RAbbitMQ單體部署

    微服務(wù)間通訊有同步和異步兩種方式: 同步通訊:就像打電話,需要實時響應(yīng)。 異步通訊:就像發(fā)郵件,不需要馬上回復(fù)。 兩種方式各有優(yōu)劣,打電話可以立即得到響應(yīng),但是你卻不能跟多個人同時通話。發(fā)送郵件可以同時與多個人收發(fā)郵件,但是往往響應(yīng)會有延遲。 1.

    2024年04月08日
    瀏覽(19)
  • SpringBoot 2.2.5 整合RabbitMQ,實現(xiàn)Topic主題模式的消息發(fā)送及消費(fèi)

    1、simple簡單模式 消息產(chǎn)生著§將消息放入隊列 消息的消費(fèi)者(consumer) 監(jiān)聽(while) 消息隊列,如果隊列中有消息,就消費(fèi)掉,消息被拿走后,自動從隊列中刪除(隱患 消息可能沒有被消費(fèi)者正確處理,已經(jīng)從隊列中消失了,造成消息的丟失)應(yīng)用場景:聊天(中間有一個過度的服務(wù)器;p端,c端

    2024年02月02日
    瀏覽(26)
  • rabbitmq topic模式設(shè)置#通配符情況下 消費(fèi)者隊列未接收消息問題排查解決

    rabbitmq topic模式設(shè)置#通配符情況下 消費(fèi)者隊列未接收消息問題排查解決

    生產(chǎn)者配置 Exchange:topic_exchange_shcool Routing key:topic.shcool.# 消費(fèi)者代碼配置 Exchange:topic_exchange_shcool Routing key:topic.shcool.user 其實以上代碼看著沒有問題,意思是代碼生成一個隊列,并把【topic.shcool.user】隊列和生產(chǎn)者的【topic_exchange_shcool】exchange綁定,但是生產(chǎn)者發(fā)送消息是

    2024年02月11日
    瀏覽(39)
  • RabbitMq 消息確認(rèn)機(jī)制詳解

    RabbitMq 消息確認(rèn)機(jī)制詳解

    目錄 1.消息可靠性 1.1.生產(chǎn)者消息確認(rèn) 1.1.1.修改配置 1.1.2.定義Return回調(diào) 1.1.3.定義ConfirmCallback 1.2.消息持久化 1.2.1.交換機(jī)持久化 1.2.2.隊列持久化 1.2.3.消息持久化 1.3.消費(fèi)者消息確認(rèn) 1.3.1.演示none模式 1.3.2.演示auto模式 1.4.消費(fèi)失敗重試機(jī)制 1.4.1.本地重試 1.4.2.失敗策略 1.5.總結(jié)

    2024年01月21日
    瀏覽(17)
  • RabbitMQ:第一章:6 種工作模式以及消息確認(rèn)機(jī)制

    RabbitMQ:第一章:6 種工作模式以及消息確認(rèn)機(jī)制

    } System.out.println(“發(fā)送數(shù)據(jù)成功”); channel.close(); connection.close(); } } 消費(fèi)者一: import com.liao.rabbitmq.utils.RabbitConstant; import com.liao.rabbitmq.utils.RabbitUtils; import com.rabbitmq.client.*; import java.io.IOException; /** 消費(fèi)者1 */ public class ConsumerOne { public static void main(String[] args) throws Exception { Con

    2024年04月12日
    瀏覽(18)
  • Rabbitmq運(yùn)用之fanout模式

    Rabbitmq運(yùn)用之fanout模式

    rabiitmq 的 fanout 屬于多播模式,他的工作圖如下,應(yīng)用場景挺多的。比如訂單,客戶下單后,會發(fā)送消息告訴客戶下單成功,通知倉庫出貨等。 在上面的圖可以看到,項目里有四個啟動項目 這里用到RabbitmqFanoutConsumerSend,RabbitmqFanoutConsumerWMS,RabbitmqFanoutProduct這三個控制臺,另

    2024年02月12日
    瀏覽(40)
  • 202、RabbitMQ 之 使用 fanout 類型的Exchange 實現(xiàn) Pub-Sub 消息模型---fanout類型就是廣播類型

    202、RabbitMQ 之 使用 fanout 類型的Exchange 實現(xiàn) Pub-Sub 消息模型---fanout類型就是廣播類型

    就是聲明一個 fanout 類型的 Exchange 來分發(fā)消息。消費(fèi)者進(jìn)行消費(fèi) fanout 類型就是廣播模式 。 fanout 類型 的 Exchange 不會判斷消息的路由key,直接將消息分發(fā)給綁定到該Exchange的所有隊列。 生產(chǎn)者發(fā)送一條消息到fanout類型的Exchange后,綁定到該Exchange的所有隊列都會收到該消息的

    2024年02月07日
    瀏覽(19)
  • RabbitMQ--04--發(fā)布訂閱模式 (fanout)-案例

    RabbitMQ--04--發(fā)布訂閱模式 (fanout)-案例

    提示:文章寫完后,目錄可以自動生成,如何生成可參考右邊的幫助文檔 @RabbitListener和@RabbitHandler的使用 OrderService OrderServiceImpl 在項目的test中發(fā)送請求 訪問網(wǎng)址: http://localhost:15672/#/queues yml配置 SmsConsumerService、SmsConsumerServiceImpl EmailConsumerService、EmailConsumerServiceImpl DuanxinCo

    2024年04月14日
    瀏覽(25)
  • rabbitMQ:綁定Exchange發(fā)送和接收消息(direct)

    rabbitMQ:綁定Exchange發(fā)送和接收消息(direct)

    AMQP 協(xié)議中的核心思想就是生產(chǎn)者和消費(fèi)者的解耦,生產(chǎn)者從不直接將消息發(fā)送給隊列。生產(chǎn)者通常不知道是否一個消息會被發(fā)送到隊列中,只是將消息 發(fā)送到一個交換機(jī)。先由 Exchange 來接收,然后 Exchange 按照特定的策略轉(zhuǎn)發(fā)到 Queue 進(jìn)行存儲。Exchange 就類似于一個交換機(jī),

    2024年02月15日
    瀏覽(18)
  • Python如何操作RabbitMQ實現(xiàn)fanout發(fā)布訂閱模式?有錄播直播私教課視頻教程

    生產(chǎn)者 消費(fèi)者 生產(chǎn)者 消費(fèi)者 生產(chǎn)者 消費(fèi)者

    2024年01月17日
    瀏覽(20)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包