消費(fèi)模式
參考官網(wǎng):https://www.rabbitmq.com/getstarted.html
-
簡單模式 Simple, 參考RabbitMQ詳解(二):消息模式 Simple(簡單)模式
簡單模式是最簡單的消息模式,它包含一個生產(chǎn)者、一個消費(fèi)者和一個隊列。生產(chǎn)者向隊列里發(fā)送消息,消費(fèi)者從隊列中獲取消息并消費(fèi)。
-
發(fā)布訂閱模式 fanout
同時向多個消費(fèi)者發(fā)送消息的模式(類似廣播的形式)
-
路由模式 direct
根據(jù)路由鍵選擇性給多個消費(fèi)者發(fā)送消息的模式
-
主題模式 topic
是direct模式上的一種疊加,增加了模糊路由RoutingKey的模式
-
工作模式 work
分發(fā)機(jī)制
-
…
消息模式-fanout(發(fā)布訂閱)模式
- 類型:fanout
- 特點:Fanout—發(fā)布與訂閱模式,是一種廣播機(jī)制,它是沒有路由key的模式。
創(chuàng)建交換機(jī)
注意 type 類型為fanout
綁定隊列
-
圖像化管理頁面新建queue02、queue03隊列
-
點擊交換器后,綁定創(chuàng)建的三個隊列
-
綁定成功后會如圖所示
定義生產(chǎn)者
package com.cn.fanout;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* fanout(發(fā)布訂閱) 生產(chǎn)者
*/
public class Producer {
public static void main(String[] args) {
//1.創(chuàng)建連接工廠
ConnectionFactory factory = new ConnectionFactory();
//2.設(shè)置工廠屬性
factory.setHost("請?zhí)顚懽约旱膇p地址");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
factory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
//3.從連接工廠中獲取連接
connection = factory.newConnection("生產(chǎn)者1");
//4.從連接中獲取通道
channel = connection.createChannel();
//5.申請隊列存儲信息,此步驟不需要了,我們手動在圖形管理頁面創(chuàng)建好交換機(jī)及綁定好隊列queue01、queue02、queue03
//6.準(zhǔn)備發(fā)送消息的內(nèi)容
String message = "hello,rabbitmq!";
//7.1.準(zhǔn)備交換機(jī)
String exchangeName = "fanout-exchange";
//7.2.定義路由key,fanout模式?jīng)]有routingKey參數(shù)
String routingKey = "";
// 7.3: 發(fā)送消息給中間件rabbitmq-server
/*
* @params1: 交換機(jī)exchange
* @params2: 隊列名稱/routingkey
* @params3: 屬性配置
* @params4: 發(fā)送消息的內(nèi)容
*/
channel.basicPublish(exchangeName, routingKey, null, message.getBytes());
System.out.println("消息發(fā)送成功!");
} catch (Exception e) {
e.printStackTrace();
System.out.println("發(fā)送消息出現(xiàn)異常...");
} finally {
// 8: 釋放連接關(guān)閉通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
-
啟動生產(chǎn)者, 會看到每個隊列都投遞了一條消息
定義消費(fèi)者
package com.cn.fanout;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.Charset;
/**
* fanout(發(fā)布訂閱) 消費(fèi)者
*/
public class Consumer {
public static Runnable runnable = new Runnable(){
@Override
public void run() {
//1.創(chuàng)建連接工廠
ConnectionFactory factory = new ConnectionFactory();
//2.設(shè)置工廠屬性
factory.setHost("請?zhí)顚懽约旱膇p地址");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
factory.setVirtualHost("/");
final String queueName = Thread.currentThread().getName();
Connection connection = null;
Channel channel = null;
try {
//3.從連接工廠中獲取連接
connection = factory.newConnection("生產(chǎn)者1");
//4.從連接中獲取通道
channel = connection.createChannel();
//5.接收消息
channel.basicConsume(queueName, true, new DeliverCallback() {
public void handle(String s, Delivery delivery) throws IOException {
System.out.println(queueName + "收到消息是:" + new String(delivery.getBody(), Charset.defaultCharset()));
}
}, new CancelCallback() {
public void handle(String s) throws IOException {
System.out.println("接收消息失敗了...");
}
});
System.out.println(queueName + "開始接收消息 ");
System.in.read();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 6: 釋放連接關(guān)閉通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
};
public static void main(String[] args) {
// 啟動三個線程去執(zhí)行
new Thread(runnable, "queue01").start();
new Thread(runnable, "queue02").start();
new Thread(runnable, "queue03").start();
}
}
-
啟動消費(fèi)者,會看到隊列中消息已經(jīng)被消費(fèi)
-
查看控制臺打印日志
消費(fèi)模式-Direct(路由)模式
- 類型:direct
- 特點:Direct模式是fanout模式上的一種疊加,增加了路由RoutingKey的模式。
創(chuàng)建交換機(jī)
綁定隊列
定義生產(chǎn)者
package com.cn.direct;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* direct(路由) 生產(chǎn)者
*/
public class Producer {
public static void main(String[] args) {
//1.創(chuàng)建連接工廠
ConnectionFactory factory = new ConnectionFactory();
//2.設(shè)置工廠屬性
factory.setHost("請?zhí)顚懽约旱膇p地址");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
factory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
//3.從連接工廠中獲取連接
connection = factory.newConnection("生產(chǎn)者1");
//4.從連接中獲取通道
channel = connection.createChannel();
//5.申請隊列存儲信息,此步驟不需要了,我們手動在圖形管理頁面創(chuàng)建好交換機(jī)及綁定好隊列queue01、queue02、queue03
//6.準(zhǔn)備發(fā)送消息的內(nèi)容
String message = "hello,rabbitmq,direct!";
//7.1.準(zhǔn)備交換機(jī)
String exchangeName = "direct-exchange";
//7.2.定義路由key, direct需要增加routingKey1參數(shù)
String routingKey1 = "email";
// String routingKey2 = "sms";
// 7.3: 發(fā)送消息給中間件rabbitmq-server
/*
* @params1: 交換機(jī)exchange
* @params2: 隊列名稱/routingkey
* @params3: 屬性配置
* @params4: 發(fā)送消息的內(nèi)容
*/
channel.basicPublish(exchangeName, routingKey1, null, message.getBytes());
// channel.basicPublish(exchangeName, routingKey2, null, message.getBytes());
System.out.println("消息發(fā)送成功!");
} catch (Exception e) {
e.printStackTrace();
System.out.println("發(fā)送消息出現(xiàn)異常...");
} finally {
// 8: 釋放連接關(guān)閉通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
-
啟動生產(chǎn)者, 會看到只有quque01隊列投遞了一條消息
-
因為我們的routingKey指定為email,綁定的隊列信息如下,所有只有queue01接收到了消息
定義消費(fèi)者
//同fanout模式消費(fèi)者代碼相同
-
啟動消費(fèi)者,會看到隊列中消息已經(jīng)被消費(fèi)
-
查看控制臺打印日志
消費(fèi)模式-Topic(主題)模式
- 類型:topic
- 特點:Topic模式是direct模式上的一種疊加,增加了模糊路由RoutingKey的模式。
- “#” : 匹配一個或者多個
“**”:匹配一個*
創(chuàng)建交換機(jī)
綁定隊列
定義生產(chǎn)者
package com.cn.topic;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* topic(主題) 生產(chǎn)者
*/
public class Producer {
public static void main(String[] args) {
//1.創(chuàng)建連接工廠
ConnectionFactory factory = new ConnectionFactory();
//2.設(shè)置工廠屬性
factory.setHost("請?zhí)顚懽约旱膇p地址");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
factory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
//3.從連接工廠中獲取連接
connection = factory.newConnection("生產(chǎn)者1");
//4.從連接中獲取通道
channel = connection.createChannel();
//5.申請隊列存儲信息,此步驟不需要了,我們手動在圖形管理頁面創(chuàng)建好交換機(jī)及綁定好隊列queue01、queue02、queue03
//6.準(zhǔn)備發(fā)送消息的內(nèi)容
String message = "hello,rabbitmq,topic!";
//7.1.準(zhǔn)備交換機(jī)
String exchangeName = "topic-exchange";
//7.2.定義路由key, 模糊匹配
String routingKey1 = "com.order.xxx";
// 7.3: 發(fā)送消息給中間件rabbitmq-server
/*
* @params1: 交換機(jī)exchange
* @params2: 隊列名稱/routingkey
* @params3: 屬性配置
* @params4: 發(fā)送消息的內(nèi)容
*/
channel.basicPublish(exchangeName, routingKey1, null, message.getBytes());
System.out.println("消息發(fā)送成功!");
} catch (Exception e) {
e.printStackTrace();
System.out.println("發(fā)送消息出現(xiàn)異常...");
} finally {
// 8: 釋放連接關(guān)閉通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
-
啟動生產(chǎn)者, 會看到quque01、queue02隊列分別投遞了一條消息
-
因為我們的routingKey指定為com.order.xxx,綁定的隊列信息如下,所有queue01、queue02接收到了消息
-
定義消費(fèi)者
//同fanout模式消費(fèi)者代碼相同
-
啟動消費(fèi)者,會看到隊列中消息已經(jīng)被消費(fèi)
-
查看控制臺打印日志
完整的聲明創(chuàng)建方式
上面操作的案例 我們都是在管理頁面端進(jìn)行交換機(jī)的創(chuàng)建以及綁定,現(xiàn)在我們使用純代碼的方式進(jìn)行操作
定義生產(chǎn)者
package com.cn.all;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* 完整 生產(chǎn)者
*/
public class Producer {
public static void main(String[] args) {
//1.創(chuàng)建連接工廠
ConnectionFactory factory = new ConnectionFactory();
//2.設(shè)置工廠屬性
factory.setHost("請?zhí)顚懽约旱膇p地址");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
factory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
//3.從連接工廠中獲取連接
connection = factory.newConnection("生產(chǎn)者1");
//4.從連接中獲取通道
channel = connection.createChannel();
//5.準(zhǔn)備發(fā)送消息的內(nèi)容
String message = "hello,rabbitmq,all!";
//6.1.準(zhǔn)備交換機(jī)
String exchangeName = "direct-message-exchange";
//6.2.交換機(jī)類型
String exchangeType = "direct";
//6.3.聲明交換機(jī)(是否持久化,true代表交換機(jī)不會隨著服務(wù)器重啟丟失)
channel.exchangeDeclare(exchangeName,exchangeType,true);
//7.聲明隊列
channel.queueDeclare("queue04", true, false ,false, null);
channel.queueDeclare("queue05", true, false ,false, null);
channel.queueDeclare("queue06", true, false ,false, null);
//8.定義路由key
String routingKey1 = "order";
String routingKey2 = "course";
//9.隊列和交換機(jī)進(jìn)行綁定
channel.queueBind("queue04", exchangeName, routingKey1);
channel.queueBind("queue05", exchangeName, routingKey1);
channel.queueBind("queue06", exchangeName, routingKey2);
//10: 發(fā)送消息給中間件rabbitmq-server
/*
* @params1: 交換機(jī)exchange
* @params2: 隊列名稱/routingkey
* @params3: 屬性配置
* @params4: 發(fā)送消息的內(nèi)容
*/
channel.basicPublish(exchangeName, routingKey1, null, message.getBytes());
System.out.println("消息發(fā)送成功!");
} catch (Exception e) {
e.printStackTrace();
System.out.println("發(fā)送消息出現(xiàn)異常...");
} finally {
// 8: 釋放連接關(guān)閉通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
-
啟動生產(chǎn)者, 會看到交換機(jī)和隊列都已創(chuàng)建好,并且已經(jīng)互相綁定好
定義消費(fèi)者
同fanout模式消費(fèi)者代碼相同
-
啟動消費(fèi)者,會看到隊列中消息已經(jīng)被消費(fèi)
-
查看控制臺打印日志
消費(fèi)模式-Work(工作)模式
當(dāng)有多個消費(fèi)者時,我們的消費(fèi)會被哪個消費(fèi)者消費(fèi)呢?我們該如何均衡消費(fèi)者消費(fèi)信息的多少呢?
- 輪詢模式:一個消費(fèi)者一條,按均分發(fā)
- 公平分發(fā): 根據(jù)消費(fèi)者消費(fèi)能力進(jìn)行公平分發(fā),處理快的處理的快,處理慢的處理的少,按勞分配
輪詢模式
- 類型:無
- 特點:該模式接收消息是當(dāng)有多個消費(fèi)者接入時,消息的分配模式是一個消費(fèi)者分配一條,直至消息消費(fèi)完成;
定義生產(chǎn)者
package com.cn.work.roundrobin;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
public static void main(String[] args) {
//1.創(chuàng)建連接工廠
ConnectionFactory factory = new ConnectionFactory();
//2.設(shè)置工廠屬性
factory.setHost("請?zhí)顚懽约旱膇p地址");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
factory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
//3.從連接工廠中獲取連接
connection = factory.newConnection("生產(chǎn)者7");
//4.從連接中獲取通道
channel = connection.createChannel();
//5.申請隊列存儲信息
/*
* 如果隊列不存在,則會創(chuàng)建
* Rabbitmq不允許創(chuàng)建兩個相同的隊列名稱,否則會報錯。
*
* @params1: queue 隊列的名稱
* @params2: durable 隊列是否持久化
* @params3: exclusive 是否排他,即是否私有的,如果為true,會對當(dāng)前隊列加鎖,其他的通道不能訪問,并且連接自動關(guān)閉
* @params4: autoDelete 是否自動刪除,當(dāng)最后一個消費(fèi)者斷開連接之后是否自動刪除消息。
* @params5: arguments 可以設(shè)置隊列附加參數(shù),設(shè)置隊列的有效期,消息的最大長度,隊列的消息生命周期等等。
*/
channel.queueDeclare("queue07", true ,false,false, null);
//6.準(zhǔn)備發(fā)送消息的內(nèi)容
for (int i = 0; i < 20; i++) {
String message = "hello,rabbitmq,work!" + i;
// 7: 發(fā)送消息給中間件rabbitmq-server
/*
* @params1: 交換機(jī)exchange
* @params2: 隊列名稱/routing
* @params3: 屬性配置
* @params4: 發(fā)送消息的內(nèi)容
*/
channel.basicPublish("", "queue07", null, message.getBytes());
}
System.out.println("消息發(fā)送成功!");
} catch (Exception e) {
e.printStackTrace();
System.out.println("發(fā)送消息出現(xiàn)異常...");
} finally {
// 8: 釋放連接關(guān)閉通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
定義消費(fèi)者1
package com.cn.work.roundrobin;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.Charset;
public class Consumer1 {
public static void main(String[] args) {
//1.創(chuàng)建連接工廠
ConnectionFactory factory = new ConnectionFactory();
//2.設(shè)置工廠屬性
factory.setHost("請?zhí)顚懽约旱膇p地址");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
factory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
//3.從連接工廠中獲取連接
connection = factory.newConnection("消費(fèi)者1");
//4.從連接中獲取通道
channel = connection.createChannel();
//5.接收消息(應(yīng)答機(jī)制參數(shù)為true 自動應(yīng)答)
channel.basicConsume("queue07", true, new DeliverCallback() {
public void handle(String s, Delivery delivery) throws IOException {
System.out.println("Consumer1收到消息是:" + new String(delivery.getBody(), Charset.defaultCharset()));
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, new CancelCallback() {
public void handle(String s) throws IOException {
System.out.println("Consumer1接收消息失敗了...");
}
});
System.out.println("Consumer1開始接收消息");
System.in.read();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 6: 釋放連接關(guān)閉通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
定義消費(fèi)者2
同上,名稱稍修改即可
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
先在管理頁面創(chuàng)建好隊列queue,然后啟動消費(fèi)者1和2,最后啟動生產(chǎn)者看頁面日志
消費(fèi)者1和消費(fèi)者2
work1和work2的消息處理能力不同,但是最后處理的消息條數(shù)相同,是“按均分配”。
公平分發(fā)
- 類型:無
- 特點:由于消息接收者處理消息的能力不同,存在處理快慢的問題,我們就需要能者多勞,處理快的多處理,處理慢的少處理;
定義生產(chǎn)者
//同上輪詢模式的生產(chǎn)者代碼相同
定義消費(fèi)者1
注意:
-
//設(shè)置消費(fèi)消息指標(biāo)
finalChannel.basicQos(1);
-
finalChannel.basicConsume(“queue1”, false, new DeliverCallback() { … })
-
//修改為手動應(yīng)答
finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
package com.cn.work.fairdispatch;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.nio.charset.Charset;
public class Consumer1 {
public static void main(String[] args) {
//1.創(chuàng)建連接工廠
ConnectionFactory factory = new ConnectionFactory();
//2.設(shè)置工廠屬性
factory.setHost("請?zhí)顚懽约旱膇p地址");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
factory.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
//3.從連接工廠中獲取連接
connection = factory.newConnection("消費(fèi)者1");
//4.從連接中獲取通道
channel = connection.createChannel();
//5.接收消息(應(yīng)答機(jī)制參數(shù)為false 手動應(yīng)答)
final Channel finalChannel = channel;
finalChannel.basicQos(1);
finalChannel.basicConsume("queue07", false, new DeliverCallback() {
public void handle(String s, Delivery delivery) throws IOException {
System.out.println("Consumer1收到消息是:" + new String(delivery.getBody(), Charset.defaultCharset()));
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//修改為手動應(yīng)答
finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}, new CancelCallback() {
public void handle(String s) throws IOException {
System.out.println("Consumer1接收消息失敗了...");
}
});
System.out.println("Consumer1開始接收消息");
System.in.read();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 6: 釋放連接關(guān)閉通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
定義消費(fèi)者2
同上,名稱稍修改即可
先在管理頁面創(chuàng)建好隊列queue,然后啟動消費(fèi)者1和2,最后啟動生產(chǎn)者看頁面日志
消費(fèi)者1和消費(fèi)者2
文章來源:http://www.zghlxwxcb.cn/news/detail-696071.html
文章來源地址http://www.zghlxwxcb.cn/news/detail-696071.html
小結(jié)
- 消費(fèi)者一次接收一條消息,代碼channel.BasicQos(0, 1, false);
- 公平分發(fā)需要消費(fèi)者開啟手動應(yīng)答,關(guān)閉自動應(yīng)答
- 關(guān)閉自動應(yīng)答代碼channel.BasicConsume(“queue_test”, false, consumer);
- 消費(fèi)者開啟手動應(yīng)答代碼:channel.BasicAck(ea.DeliveryTag, false);
到了這里,關(guān)于RabbitMQ詳解(三):消息模式(fanout、direct、topic、work)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!