1.簡單隊列
-
消息生產者
public class Send { private static final String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception { // 連接工廠 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.101.128"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("Gen123"); factory.setVirtualHost("/"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "這是一條消息!??!"; // 發(fā)送消息 channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8)); System.out.println("Send:" + message); } } }
-
消息消費者(會一直監(jiān)聽隊列)
public class Recv { private static final String QUEUE_NAME = "hello"; public static void main(String[] args) throws Exception { // 連接工廠 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.101.128"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("Gen123"); factory.setVirtualHost("/"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 回調 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println("Recv:" + message); }; // 自動確認消息 channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } }
2.工作隊列
-
工作隊列
- 消息生產能力大于消費能力,增加多個消費節(jié)點
- 和簡單隊列類似,增加多個消費節(jié)點,處于競爭關系
- 默認策略:round robin輪訓
-
生產者
public class Send { private static final String QUEUE_NAME = "work_mq"; public static void main(String[] args) throws Exception { // 連接工廠 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.101.128"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("Gen123"); factory.setVirtualHost("/"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 批量發(fā)送10個消息 for (int i = 0; i < 10; i++) { String message = "這是一條消息?。?!" + i; // 發(fā)送消息 channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8)); System.out.println("Send:" + message); } } } }
-
消費者1
public class Recv1 { private static final String QUEUE_NAME = "work_mq"; public static void main(String[] args) throws Exception { // 連接工廠 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.101.128"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("Gen123"); factory.setVirtualHost("/"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 回調 DeliverCallback deliverCallback = (consumerTag, delivery) -> { // 模擬消費者緩慢 try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { throw new RuntimeException(e); } String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println("Recv1:" + message); // 手工確認消息 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }; // 關閉自動確認消息 channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { }); } }
-
消費者2
public class Recv2 { private static final String QUEUE_NAME = "work_mq"; public static void main(String[] args) throws Exception { // 連接工廠 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.101.128"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("Gen123"); factory.setVirtualHost("/"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 聲明隊列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); // 回調 DeliverCallback deliverCallback = (consumerTag, delivery) -> { // 模擬消費者緩慢 try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { throw new RuntimeException(e); } String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println("Recv2:" + message); // 手工確認消息 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }; // 關閉自動確認消息 channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { }); } }
-
輪訓策略驗證
- 先啟動兩個消費者,再啟動生產者
- 缺點:存在部分節(jié)點消費過快,部分節(jié)點消費慢,導致不能合理處理消息
-
公平策略驗證
- 修改消費者策略
- 解決消費者能力消費不足的問題,降低消費時間問題
3.RabbitMQ的Exchange交換機
- 生產者將消息發(fā)送到Exchange,交換機將消息路由到一個或者多個隊列中,交換機有多個類型,隊列和交換機是多對多的關系
- 交換機只負責轉發(fā)消息,不具備存儲消息的能力,如果沒有隊列和交換機綁定或者沒有符合的路由規(guī)則,則消息會被丟失
- RabbitMQ有四種交換機類型,分別是Direct exchange、Fanout exchange、Topic exchange、Headers exchange,最后一種基本不用
- 交換機類型
- Direct exchange定向
- 將一個隊列綁定到交換機上,要求該消息與一個特定的路由鍵完全匹配
- 處理路由鍵
- Fanout exchange廣播
- 只需要簡單的將隊列綁定到交換機上,一個發(fā)送到交換機的消息都會被轉發(fā)到與該交換機綁定的所有隊列上。很像子網廣播,每臺子網內的主機都獲得了一份復制的消息
- Fanout交換機轉發(fā)消息是最快的,用于發(fā)布訂閱廣播形式
- 不處理路由鍵
- Topic exchange通配符
- 主題交換機是一種發(fā)布/訂閱的模式,結合了直連交換機與扇形交換機的特點
- 將路由鍵和某模式進行匹配,此時隊列需要綁定在一個模式上
- 符號"#“匹配一個或多個詞,符號”*"匹配不多不少一個詞
- Headers exchange(很少用)
- 根據發(fā)送的消息內容中的headers屬性進行匹配,在綁定Queue與Exchange時指定一組鍵值對
- 當消息發(fā)送到RabbitMQ時會取到該消息的headers與Exchange綁定時指定的鍵值對進行匹配
- 如果完全匹配則消息會路由到該隊列,否則不會路由到該隊列
- 不處理路由鍵
- Direct exchange定向
4.發(fā)布訂閱模型
-
什么是RabbitMQ的發(fā)布訂閱模式
- 發(fā)布訂閱模型中,消息生產者不再是直接面對隊列,而是直面交換機,都需要經過交換機來進行消息的發(fā)送,所有發(fā)往同一個fanout交換機的消息都會被所有監(jiān)聽這個交換機的消費者接收
- 發(fā)布訂閱模型引入fanout交換機
-
發(fā)布訂閱模型應用場景
- 微信公眾號
- 新浪微博關注
-
RabbitMQ發(fā)布訂閱模型
- 通過把消息發(fā)送給交換機,交換機轉發(fā)給對應綁定的隊列
- 交換機綁定的隊列是排他獨占隊列,自動刪除
-
發(fā)送端
public class Send { private static final String EXCHANGE_NAME = "fan_mq"; public static void main(String[] args) throws Exception { // 連接工廠 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.101.128"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("Gen123"); factory.setVirtualHost("/"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { // 綁定交換機,廣播類型 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); String message = "廣播發(fā)送消息:這是一條消息?。?!"; // 發(fā)送消息 channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8)); System.out.println("Send:" + message); } } }
-
消費端(兩個節(jié)點)
public class Recv1 { private static final String EXCHANGE_NAME = "fan_mq"; public static void main(String[] args) throws Exception { // 連接工廠 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.101.128"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("Gen123"); factory.setVirtualHost("/"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 綁定交換機,廣播類型 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT); // 獲取隊列(排它隊列) String queueName = channel.queueDeclare().getQueue(); // 綁定隊列和交換機 channel.queueBind(queueName, EXCHANGE_NAME, ""); // 回調 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println("Recv1:" + message); }; // 自動確認消息 channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }
5.路由模式
-
什么是RabbitMQ的路由模式
-
交換機類型是direct
-
隊列和交換機綁定,需要指定一個路由鍵(也叫binding key)
-
消息生產者發(fā)送消息給交換機,需要指定路由鍵
-
交換機根據消息的路由鍵,轉發(fā)給對應的隊列
-
-
消息生產者
public class Send { private static final String EXCHANGE_NAME = "direct_mq"; public static void main(String[] args) throws Exception { // 連接工廠 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.101.128"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("Gen123"); factory.setVirtualHost("/"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { // 綁定交換機,直連類型 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); String error = "我是錯誤日志"; String info = "我是info日志"; String warning = "我是warning日志"; // 發(fā)送消息 channel.basicPublish(EXCHANGE_NAME, "error", null, error.getBytes(StandardCharsets.UTF_8)); channel.basicPublish(EXCHANGE_NAME, "info", null, info.getBytes(StandardCharsets.UTF_8)); channel.basicPublish(EXCHANGE_NAME, "warning", null, warning.getBytes(StandardCharsets.UTF_8)); System.out.println("Send:消息發(fā)送成功!"); } } }
-
消費者一(只接收錯誤消息)
public class Recv1 { private static final String EXCHANGE_NAME = "direct_mq"; public static void main(String[] args) throws Exception { // 連接工廠 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.101.128"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("Gen123"); factory.setVirtualHost("/"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 綁定交換機,直連類型 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); // 獲取隊列 String queueName = channel.queueDeclare().getQueue(); // 綁定隊列和交換機 channel.queueBind(queueName, EXCHANGE_NAME, "error"); // 回調 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println("Recv1:" + message); }; // 自動確認消息 channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }
-
消費者二(接收全部消息)
public class Recv2 { private static final String EXCHANGE_NAME = "direct_mq"; public static void main(String[] args) throws Exception { // 連接工廠 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.101.128"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("Gen123"); factory.setVirtualHost("/"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 綁定交換機,直連類型 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); // 獲取隊列 String queueName = channel.queueDeclare().getQueue(); // 綁定隊列和交換機 channel.queueBind(queueName, EXCHANGE_NAME, "error"); channel.queueBind(queueName, EXCHANGE_NAME, "info"); channel.queueBind(queueName, EXCHANGE_NAME, "warning"); // 回調 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println("Recv2:" + message); }; // 自動確認消息 channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }
6.主題通配符模式
-
什么是RabbitMQ的主題模式
- 交換機是topic,可以實現發(fā)布訂閱模式fanout和路由模式direct的功能,更加靈活,支持通配符匹配
- 交換機通過通配符進行轉發(fā)到對應的隊列,*代表一個詞,#代表1個或多個詞,一般用#作為通配符居多,詞與詞之間使用.點進行分割
- 注意:交換機和隊列綁定時用的binding使用通配符的路由鍵;生產者發(fā)送消息時需要使用具體的路由鍵
-
生產者
public class Send { private static final String EXCHANGE_NAME = "topic_mq"; public static void main(String[] args) throws Exception { // 連接工廠 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.101.128"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("Gen123"); factory.setVirtualHost("/"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { // 綁定交換機,主題類型 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); String error = "我是錯誤日志"; String info = "我是info日志"; String warning = "我是warning日志"; // 發(fā)送消息 channel.basicPublish(EXCHANGE_NAME, "error", null, error.getBytes(StandardCharsets.UTF_8)); channel.basicPublish(EXCHANGE_NAME, "info", null, info.getBytes(StandardCharsets.UTF_8)); channel.basicPublish(EXCHANGE_NAME, "warning", null, warning.getBytes(StandardCharsets.UTF_8)); System.out.println("Send:消息發(fā)送成功!"); } } }
-
消費者一(只接收錯誤消息)
public class Recv1 { private static final String EXCHANGE_NAME = "topic_mq"; public static void main(String[] args) throws Exception { // 連接工廠 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.101.128"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("Gen123"); factory.setVirtualHost("/"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 綁定交換機,主題類型 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); // 獲取隊列 String queueName = channel.queueDeclare().getQueue(); // 綁定隊列和交換機 channel.queueBind(queueName, EXCHANGE_NAME, "error"); // 回調 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println("Recv1:" + message); }; // 自動確認消息 channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }
-
消費者二(接收全部消息)
public class Recv2 { private static final String EXCHANGE_NAME = "topic_mq"; public static void main(String[] args) throws Exception { // 連接工廠 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.101.128"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("Gen123"); factory.setVirtualHost("/"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // 綁定交換機,主題類型 channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); // 獲取隊列 String queueName = channel.queueDeclare().getQueue(); // 綁定隊列和交換機 channel.queueBind(queueName, EXCHANGE_NAME, "#"); // 回調 DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println("Recv2:" + message); }; // 自動確認消息 channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }
7.工作模式總結
-
簡單模式
- 一個生產者一個消費者,不用指定交換機,使用默認交換機
-
工作隊列模式
- 一個生產者多個消費者,可以有輪訓和公平策略,不用指定交換機,使用默認交換機
-
發(fā)布訂閱模式
- fanout類型交換機,通過交換機和隊列綁定,不用指定綁定路由鍵,生產者發(fā)送消息到交換機,fanout交換機直接進行轉發(fā),消息不用指定routingkey路由鍵
-
路由模式文章來源:http://www.zghlxwxcb.cn/news/detail-825773.html
- direct類型交換機,通過交換機和隊列綁定,指定綁定的路由鍵,生產者發(fā)送消息到交換機,交換機根據消息的路由key進行轉發(fā)到對應的隊列,消息要指定routingkey路由鍵
-
通配符模式文章來源地址http://www.zghlxwxcb.cn/news/detail-825773.html
- topic交換機,通過交換機和隊列綁定,指定綁定的通配符路由鍵,生產者發(fā)送消息到交換機,交換機根據消息的路由鍵進行轉發(fā)到對應的隊列,消息要指定routingkey路由鍵
到了這里,關于RabbitMQ五大常用工作模式的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網!