一、消息隊(duì)列
什么是消息隊(duì)列
消息隊(duì)列,即MQ,Message Queue。
消息隊(duì)列是典型的:生產(chǎn)者、消費(fèi)者模型。生產(chǎn)者不斷向消息隊(duì)列中生產(chǎn)消息,消費(fèi)者不斷的從隊(duì)列中獲取消息。因?yàn)橄⒌纳a(chǎn)和消費(fèi)都是異步的,而且只關(guān)心消息的發(fā)送和接收,沒有業(yè)務(wù)邏輯的侵入,這樣就實(shí)現(xiàn)了生產(chǎn)者和消費(fèi)者的解耦。
二、RabbitMQ
RabbitMQ是基于AMQP的一款消息管理系統(tǒng)。
支持主流的操作系統(tǒng),Linux、Windows、MacOX等。
支持多種開發(fā)語言,Java、Python、Ruby、.NET、PHP、C/C++、node.js等。
官網(wǎng):?Messaging that just works — RabbitMQ
官方教程:RabbitMQ Tutorials — RabbitMQ
RabbitMQ 基本概念
Message
消息,消息是不具名的,它由消息頭和消息體組成。消息體是不透明的,而消息頭則由一系列的可選屬性組成,這些屬性包括routing-key(路由鍵)、priority(相對于其他消息的優(yōu)先權(quán))、delivery-mode(指出該消息可能需要持久性存儲(chǔ))等。Publisher
消息的生產(chǎn)者,也是一個(gè)向交換器發(fā)布消息的客戶端應(yīng)用程序。Exchange
交換器,用來接收生產(chǎn)者發(fā)送的消息并將這些消息路由給服務(wù)器中的隊(duì)列。Binding
綁定,用于消息隊(duì)列和交換器之間的關(guān)聯(lián)。一個(gè)綁定就是基于路由鍵將交換器和消息隊(duì)列連接起來的路由規(guī)則,所以可以將交換器理解成一個(gè)由綁定構(gòu)成的路由表。Queue
消息隊(duì)列,用來保存消息直到發(fā)送給消費(fèi)者。它是消息的容器,也是消息的終點(diǎn)。一個(gè)消息可投入一個(gè)或多個(gè)隊(duì)列。消息一直在隊(duì)列里面,等待消費(fèi)者連接到這個(gè)隊(duì)列將其取走。Connection
網(wǎng)絡(luò)連接,比如一個(gè)TCP連接。無論生產(chǎn)者還是消費(fèi)者,都需要與RabbitMQ建立連接后才可以完成消息的生產(chǎn)和消費(fèi)。Channel
信道,多路復(fù)用連接中的一條獨(dú)立的雙向數(shù)據(jù)流通道。信道是建立在真實(shí)的TCP連接內(nèi)地虛擬連接,AMQP 命令都是通過信道發(fā)出去的,不管是發(fā)布消息、訂閱隊(duì)列還是接收消息,這些動(dòng)作都是通過信道完成。因?yàn)閷τ诓僮飨到y(tǒng)來說建立和銷毀 TCP 都是非常昂貴的開銷,所以引入了信道的概念,以復(fù)用一條 TCP 連接。Consumer
消息的消費(fèi)者,表示一個(gè)從消息隊(duì)列中取得消息的客戶端應(yīng)用程序。Virtual Host
虛擬主機(jī),表示一批交換器、消息隊(duì)列和相關(guān)對象。虛擬主機(jī)是共享相同的身份認(rèn)證和加密環(huán)境的獨(dú)立服務(wù)器域。每個(gè) vhost 本質(zhì)上就是一個(gè) mini 版的 RabbitMQ 服務(wù)器,擁有自己的隊(duì)列、交換器、綁定和權(quán)限機(jī)制。vhost 是 AMQP 概念的基礎(chǔ),必須在連接時(shí)指定,RabbitMQ 默認(rèn)的 vhost 是 / 。Broker
表示消息隊(duì)列服務(wù)器實(shí)體。
?三、簡單消息模型使用
RabbitMQ是一個(gè)消息代理:它接受和轉(zhuǎn)發(fā)消息。 你可以把它想象成一個(gè)郵局:當(dāng)你把郵件放在郵箱里時(shí),你可以確定郵差先生最終會(huì)把郵件發(fā)送給你的收件人。 在這個(gè)比喻中,RabbitMQ是郵政信箱,郵局和郵遞員。
RabbitMQ與郵局的主要區(qū)別是它不處理紙張,而是接受,存儲(chǔ)和轉(zhuǎn)發(fā)數(shù)據(jù)消息的二進(jìn)制數(shù)據(jù)塊。
P(producer/ publisher):生產(chǎn)者,一個(gè)發(fā)送消息的用戶應(yīng)用程序。
C(consumer):消費(fèi)者,消費(fèi)和接收有類似的意思,消費(fèi)者是一個(gè)主要用來等待接收消息的用戶應(yīng)用程序
隊(duì)列(紅色區(qū)域):rabbitmq內(nèi)部類似于郵箱的一個(gè)概念。雖然消息流經(jīng)rabbitmq和你的應(yīng)用程序,但是它們只能存儲(chǔ)在隊(duì)列中。隊(duì)列只受主機(jī)的內(nèi)存和磁盤限制,實(shí)質(zhì)上是一個(gè)大的消息緩沖區(qū)。許多生產(chǎn)者可以發(fā)送消息到一個(gè)隊(duì)列,許多消費(fèi)者可以嘗試從一個(gè)隊(duì)列接收數(shù)據(jù)。
總之:生產(chǎn)者將消息發(fā)送到隊(duì)列,消費(fèi)者從隊(duì)列中獲取消息,隊(duì)列是存儲(chǔ)消息的緩沖區(qū)。
代碼演示
引入依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.0.6.RELEASE</version>
</dependency>
?獲取連接
package com.util;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class ConnectionUtil {
/**
* 建立與RabbitMQ的連接
* @return
* @throws Exception
*/
public static Connection getConnection() throws Exception {
//定義連接工廠
ConnectionFactory factory = new ConnectionFactory();
//設(shè)置服務(wù)地址
factory.setHost("192.168.33.88");
//TCP端口
factory.setPort(5672);
//設(shè)置賬號信息,用戶名、密碼、vhost
factory.setVirtualHost("vhost_NetDataGather");
factory.setUsername("admin");
factory.setPassword("123456");
// 通過工程獲取連接
Connection connection = factory.newConnection();
return connection;
}
}
生產(chǎn)者
import com.util.ConnectionUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* 生產(chǎn)者
*/
public class Send {
//聲明隊(duì)列名稱
private final static String QUEUE_NAME = "simple_queue";
public static void main(String[] argv) throws Exception {
// 生產(chǎn)者和Broker建立TCP連接。
Connection connection = ConnectionUtil.getConnection();
// 生產(chǎn)者和Broker建立通道。
Channel channel = connection.createChannel();
// 聲明(創(chuàng)建)隊(duì)列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 消息內(nèi)容
String message = "Hello World!";
for (int i = 0; i < 10; i++) {
// 向指定的隊(duì)列中發(fā)送消息
message=message+i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
}
//關(guān)閉通道和連接
channel.close();
connection.close();
}
}
消費(fèi)者
import java.io.IOException;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.util.ConnectionUtil;
/**
* 消費(fèi)者
*/
public class Recv {
private final static String QUEUE_NAME = "simple_queue";
public static void main(String[] argv) throws Exception {
// 獲取到連接
Connection connection = ConnectionUtil.getConnection();
// 創(chuàng)建通道
Channel channel = connection.createChannel();
// 聲明隊(duì)列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 定義隊(duì)列的消費(fèi)者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 獲取消息,并且處理,這個(gè)方法類似事件監(jiān)聽,如果有消息的時(shí)候,會(huì)被自動(dòng)調(diào)用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
// body 即消息體
String msg = new String(body);
}
};
// 監(jiān)聽隊(duì)列,第二個(gè)參數(shù):是否自動(dòng)進(jìn)行消息確認(rèn)。
channel.basicConsume(QUEUE_NAME, true, consumer);
}
}
上述代碼中:消息一旦被消費(fèi)者接收,隊(duì)列中的消息就會(huì)被刪除。
如果消費(fèi)者領(lǐng)取消息后,還沒執(zhí)行操作就掛掉了呢?或者拋出了異常?消息消費(fèi)失敗,但是RabbitMQ無從得知,這樣消息就丟失了!
因此,RabbitMQ有一個(gè)ACK機(jī)制。當(dāng)消費(fèi)者獲取消息后,會(huì)向RabbitMQ發(fā)送回執(zhí)ACK,告知消息已經(jīng)被接收。不過這種回執(zhí)ACK分兩種情況:
自動(dòng)ACK:消息一旦被接收,消費(fèi)者自動(dòng)發(fā)送ACK。
手動(dòng)ACK:消息接收后,不會(huì)發(fā)送ACK,需要手動(dòng)調(diào)用。文章來源:http://www.zghlxwxcb.cn/news/detail-659799.html
手動(dòng)ACK
import java.io.IOException;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.util.ConnectionUtil;
/**
* 消費(fèi)者,手動(dòng)進(jìn)行ACK
*/
public class Recv2 {
private final static String QUEUE_NAME = "simple_queue";
public static void main(String[] argv) throws Exception {
// 獲取到連接
Connection connection = ConnectionUtil.getConnection();
// 創(chuàng)建通道
final Channel channel = connection.createChannel();
// 聲明隊(duì)列
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
// 定義隊(duì)列的消費(fèi)者
DefaultConsumer consumer = new DefaultConsumer(channel) {
// 獲取消息,并且處理,這個(gè)方法類似事件監(jiān)聽,如果有消息的時(shí)候,會(huì)被自動(dòng)調(diào)用
@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
// body 即消息體
String msg = new String(body);
// 手動(dòng)進(jìn)行ACK
channel.basicAck(envelope.getDeliveryTag(), false);
}
};
// 監(jiān)聽隊(duì)列,第二個(gè)參數(shù)false,手動(dòng)進(jìn)行ACK
// 如果第二個(gè)參數(shù)為true,則會(huì)自動(dòng)進(jìn)行ACK;如果為false,則需要手動(dòng)ACK。方法的聲明:
channel.basicConsume(QUEUE_NAME, false, consumer);
}
}
以上就是RabbitMQ的簡單使用講解,更多詳細(xì)使用內(nèi)容,請?jiān)俣喽鄬W(xué)習(xí)。文章來源地址http://www.zghlxwxcb.cn/news/detail-659799.html
到了這里,關(guān)于Java RabbitMQ消息隊(duì)列簡單使用的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!