目錄
一:交換機(jī)
1:Direct交換機(jī)
1.1生產(chǎn)者端代碼:
?1.2:消費(fèi)者端代碼:
2:Topic主題交換機(jī)?
2.1:生產(chǎn)者代碼:?
2.2:消費(fèi)者代碼:
?二:核心特性
2.1:消息過期機(jī)制
2.1.1:給隊(duì)列中的全部消息指定過期時(shí)間
2.1.2:給某條消息指定過期時(shí)間?
2.2:死信隊(duì)列
一:交換機(jī)
1:Direct交換機(jī)
綁定:讓交換機(jī)和隊(duì)列進(jìn)行關(guān)聯(lián),可以指定讓交換機(jī)把什么樣的消息發(fā)送給隊(duì)列。
rountingkey:路由鍵,控制消息要發(fā)送哪個(gè)隊(duì)列。
特點(diǎn):根據(jù)路由鍵指定要轉(zhuǎn)發(fā)到指定的隊(duì)列
場景:特定的消息指定給特定的隊(duì)列
1.1生產(chǎn)者端代碼:
我們規(guī)定,通過控制臺(tái)輸入消息和路由,來指定誰完成該任務(wù)。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.Scanner;
public class DirectProducer {
private static final String EXCHANGE_NAME = "2";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
//創(chuàng)建交換機(jī)的名稱
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
Scanner scanner=new Scanner(System.in);
while(scanner.hasNext()){
String userInput=scanner.nextLine();
String[] s = userInput.split(" ");
if(s.length<1){
continue;
}
//指定路由key
String message=s[0];
String routingKey=s[1];
//發(fā)布消息
/*
第一個(gè)參數(shù):發(fā)布到哪個(gè)交換機(jī)
第二個(gè)參數(shù):路由鍵
*/
channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes("UTF-8"));
System.out.println("[x] Sent"+message+"with rounting"+routingKey+" ");
}
}
}
//..
}
?1.2:消費(fèi)者端代碼:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.Scanner;
public class DirectProducer {
private static final String EXCHANGE_NAME = "2";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
//創(chuàng)建交換機(jī)的名稱
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
Scanner scanner=new Scanner(System.in);
while(scanner.hasNext()){
String userInput=scanner.nextLine();
String[] s = userInput.split(" ");
if(s.length<1){
continue;
}
//指定路由key
String message=s[0];
String routingKey=s[1];
//發(fā)布消息
/*
第一個(gè)參數(shù):發(fā)布到哪個(gè)交換機(jī)
第二個(gè)參數(shù):路由鍵
*/
channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes("UTF-8"));
System.out.println("[x] Sent"+message+"with rounting"+routingKey+" ");
}
}
}
//..
}
運(yùn)行結(jié)果:
2:Topic主題交換機(jī)?
特點(diǎn):消息會(huì)根據(jù)一個(gè)模糊的路由鍵轉(zhuǎn)發(fā)到指定的隊(duì)列中。
場景:特定的一類消息只交給特定的一類系統(tǒng)(程序來處理)。
綁定關(guān)系:模糊匹配消息隊(duì)列? *:匹配一個(gè)單詞? ? ? ?#:匹配0個(gè)或多個(gè)單詞
2.1:生產(chǎn)者代碼:?
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.Scanner;
public class TopicProducer {
private static final String EXCHANGE_NAME = "3";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
Scanner scanner=new Scanner(System.in);
while(scanner.hasNext()){
String userInput=scanner.nextLine();
String[] s = userInput.split(" ");
if(s.length<1){
continue;
}
//指定路由key
String message=s[0];
String routingKey=s[1];
//發(fā)布消息
/*
第一個(gè)參數(shù):發(fā)布到哪個(gè)交換機(jī)
第二個(gè)參數(shù):路由鍵
*/
channel.basicPublish(EXCHANGE_NAME,routingKey,null,message.getBytes("UTF-8"));
System.out.println("[x] Sent"+message+"with rounting"+routingKey+" ");
}
}
}
}
2.2:消費(fèi)者代碼:
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DeliverCallback;
public class TopicConsumer {
private static final String EXCHANGE_NAME = "3";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
//創(chuàng)建消息隊(duì)列
String queueName="fronted_queue";
channel.queueDeclare(queueName,true,false,false,null);
channel.queueBind(queueName,EXCHANGE_NAME,"#.前端.#");
String queueName2="backed-_queue";
channel.queueDeclare(queueName2,true,false,false,null);
channel.queueBind(queueName2,EXCHANGE_NAME,"#.后端.#");
String queueName3="product_queue";
channel.queueDeclare(queueName3,true,false,false,null);
channel.queueBind(queueName3,EXCHANGE_NAME,"#.產(chǎn)品.#");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback1 = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [前端] Received '" +
delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
DeliverCallback deliverCallback2 = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [后端] Received '" +
delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
DeliverCallback deliverCallback3 = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [產(chǎn)品] Received '" +
delivery.getEnvelope().getRoutingKey() + "':'" + message + "'");
};
channel.basicConsume(queueName, true, deliverCallback1, consumerTag -> { });
channel.basicConsume(queueName2, true, deliverCallback2, consumerTag -> { });
channel.basicConsume(queueName3, true, deliverCallback3, consumerTag -> { });
}
}
運(yùn)行結(jié)果:
?二:核心特性
2.1:消息過期機(jī)制
特點(diǎn):給每條消息指定一個(gè)有效期,一段時(shí)間內(nèi)未被消費(fèi),就過期了。
2.1.1:給隊(duì)列中的全部消息指定過期時(shí)間
在消費(fèi)者中對于隊(duì)列的全部消息指定過期時(shí)間,如果在過期時(shí)間內(nèi),還沒有消費(fèi)者取消息,消息才會(huì)過期,如果消息已經(jīng)接收到,但是沒確認(rèn),是不會(huì)過期的。
public class TTLConsumer {
private final static String QUEUE_NAME = "ttl_queue";
public static void main(String[] argv) throws Exception {
//創(chuàng)建連接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
//創(chuàng)建頻道,提供通信
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//指定消息隊(duì)列的過期時(shí)間
Map<String ,Object> args=new HashMap<>();
args.put("x-message-ttl",5000);
//args:指定參數(shù)
channel.queueDeclare(QUEUE_NAME, false, false,false, args);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
//如何處理消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
};
channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { });
}
}
2.1.2:給某條消息指定過期時(shí)間?
//在發(fā)送者這邊設(shè)置過期時(shí)間
public class TTLProducer {
private final static String QUEUE_NAME = "ttl_queue";
public static void main(String[] argv) throws Exception {
//創(chuàng)建連接工廠
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
//頻道相當(dāng)于客戶端(jdbcClient,redisClient),提供了和消隊(duì)列server建立通信,程序通過channel進(jìn)行發(fā)送消息
Channel channel = connection.createChannel()) {
//創(chuàng)建消息隊(duì)列,第二個(gè)參數(shù)(durable):是否開啟持久化,第三個(gè)參數(shù)exclusiove:是否允許當(dāng)前這個(gè)創(chuàng)建消息隊(duì)列的
//連接操作消息隊(duì)列 第四個(gè)參數(shù):沒有人使用隊(duì)列,是否需要?jiǎng)h除
String message = "Hello World!";
//給消息指定過期時(shí)間
AMQP.BasicProperties properties=new AMQP.BasicProperties.Builder()
.expiration("1000")
.build();
channel.basicPublish("", QUEUE_NAME, properties, message.getBytes(StandardCharsets.UTF_8));
System.out.println(" [x] Sent '" + message + "'");
}
}
2.2:死信隊(duì)列
為了保證消息的可靠性,比如每條消息都成功消費(fèi),需要提供一個(gè)容錯(cuò)機(jī)制,即失敗的消息怎么處理,相當(dāng)于死信。
死信:過期的消息,拒收的消息,處理失敗的消息,消息隊(duì)列滿了統(tǒng)稱為死信。
死信隊(duì)列:處理死信的隊(duì)列。
死信交換機(jī):給死信隊(duì)列發(fā)送消息的交換機(jī),也存在路由綁定。
a:創(chuàng)建死信交換機(jī)和死信隊(duì)列
//聲明死信交換機(jī)
channel.exchangeDeclare(WORK_NAME,"direct");
//聲明死信隊(duì)列
String queueName="boss_queue";
channel.queueDeclare(queueName,true,false,false,null);
channel.queueBind(queueName,EXCHANGE_Name,"boss");
String queueName2="waibao_queue";
channel.queueDeclare(queueName2, false, false, false, null);
channel.queueBind(queueName2,EXCHANGE_Name,"waibao");
文章來源:http://www.zghlxwxcb.cn/news/detail-715742.html
b:給失敗后的需要容錯(cuò)的隊(duì)列綁定死信交換機(jī)文章來源地址http://www.zghlxwxcb.cn/news/detail-715742.html
//聲明交換機(jī)
channel.exchangeDeclare(WORK_NAME, "direct");
Map<String,Object> map=new HashMap<>();
//聲明要綁定的死信交換機(jī)
map.put("x-dead-letter-exchange",DEAD_EXCHANGE_NAME);
//聲明要綁定的死信隊(duì)列
map.put("x-dead-letter-routing-key","waibao_queue");
//創(chuàng)建消息隊(duì)列
String queueName="xiaodog_queue";
channel.queueDeclare(queueName,true,false,false,map);
channel.queueBind(queueName,WORK_NAME,"xiaodog");
Map<String,Object> map2=new HashMap<>();
//聲明要綁定的死信交換機(jī)
map2.put("x-dead-letter-exchange",DEAD_EXCHANGE_NAME);
map2.put("x-dead-letter-routing-key","boss_queue");
String queueName2="xiaocat_queue";
channel.queueDeclare(queueName2,true,false,false,map2);
channel.queueBind(queueName2,WORK_NAME,"xiaocat");
到了這里,關(guān)于分布式消息隊(duì)列:Rabbitmq(2)的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!