消息
消息的發(fā)送方:生產(chǎn)者
消息的接收方:消費(fèi)者
同步消息:發(fā)送方發(fā)送消息到接收方,接收方有所回應(yīng)后才能夠進(jìn)行下一次的消息發(fā)送
異步消息:不需要接收方回應(yīng)就可以進(jìn)行下一步的發(fā)送
消息隊(duì)列
什么是消息隊(duì)列?
當(dāng)此時(shí)有很多個(gè)用戶同時(shí)訪問(wèn)服務(wù)器,需要服務(wù)器進(jìn)行操作,但此時(shí)由于操作太多服務(wù)器運(yùn)轉(zhuǎn)不過(guò)來(lái),這時(shí)將非常多的操作轉(zhuǎn)換成消息的格式儲(chǔ)存器來(lái),所有的子服務(wù)器從中獲取到消息進(jìn)行操作分擔(dān)主服務(wù)器的壓力,而這個(gè)中間存儲(chǔ)消息的容器我們一般稱為消息隊(duì)列
- 企業(yè)級(jí)應(yīng)用中廣泛使用的三種異步消息傳遞技術(shù)(實(shí)現(xiàn)高并發(fā)的有效處理):
- JMS
- AMQP
- MQTT
JMS
(java Message Service):一個(gè)規(guī)范,等同于JDBC規(guī)范,提供了與消息服務(wù)相關(guān)的API接口
- JMS消息模型
- peer-2-peer: 點(diǎn)對(duì)點(diǎn)模型,消息發(fā)送到一個(gè)隊(duì)列中,隊(duì)列保存信息,隊(duì)列的消息只能被一個(gè)消費(fèi)者消費(fèi),或超時(shí)
- publish-subscribe:發(fā)布訂閱模式,消息可以被多個(gè)消費(fèi)者消費(fèi),生產(chǎn)者和消費(fèi)者完全獨(dú)立,不需要感知對(duì)方存在
- JMS消息種類
TextMessage,MapMessage, BytesMessage,StreamMessage,ObjectMessage,Message(Message只有消息頭和屬性)
- 實(shí)現(xiàn)JMS規(guī)范的MQ
ActiveMQ,Redis,HornetMQ,RabbitMQ,RocketMQ(RocketMQ并未完全遵守JMS規(guī)范)
AMQP
AMQP(advanced message queuing protocol):一種協(xié)議(高級(jí)隊(duì)列協(xié)議,消息代理規(guī)范),規(guī)范了網(wǎng)絡(luò)交換的數(shù)據(jù)格式,兼容JMS
JMS存在一定的問(wèn)題,JMS規(guī)范對(duì)對(duì)應(yīng)的語(yǔ)言進(jìn)行了規(guī)范,但若是我使用不是規(guī)范語(yǔ)言進(jìn)行操作的時(shí)候就會(huì)出現(xiàn)問(wèn)題,這時(shí)我們推出AMQP,這更像是一種協(xié)議,規(guī)范消息的格式,就是無(wú)論用什么語(yǔ)言什么環(huán)境都無(wú)所謂,它只人消息的格式
優(yōu)點(diǎn):跨平臺(tái)性,服務(wù)器供應(yīng)商,生產(chǎn)者,消費(fèi)者可以使用不同的語(yǔ)言來(lái)實(shí)現(xiàn)
- AMQP的消息模型
direct exchange,fanout exchange,topic exchange,headers exchange,system exchange
AMQP的消息種類:byte[]
- 實(shí)現(xiàn)AMQP的MQ:
RabbitMQ,StormMQ,RocketMQ
MQTT
(Message Queueing Telemetry Transport)消息隊(duì)列遙測(cè)傳輸,專為小設(shè)備設(shè)計(jì),是物聯(lián)網(wǎng)(IOT)生態(tài)系統(tǒng)中主要成分之一
Kafka
kafka,一種高吞吐量的分布式訂閱消息系統(tǒng),提供實(shí)時(shí)消息功能
Spring整合消息隊(duì)列
模擬消息隊(duì)列的工作流程
模擬消息隊(duì)列的處理過(guò)程
import java.util.ArrayList;
@Service
public class Messageservice implements MessageService {
private ArrayList<String> megList=new ArrayList<String>();
@Override
public void sendMessage(String id) {
System.out.println("將待發(fā)送的消息訂單納入到處理隊(duì)列.id:"+id);
megList.add(id);
}
@Override
public String doMessage() {
String remove = megList.remove(0);
System.out.println("已完成短信業(yè)務(wù)的發(fā)送,id:"+remove);
return remove;
}
}
模擬將消息導(dǎo)入到消息隊(duì)列
@Service
public class orderserviceimpl implements orderService {
@Autowired
private MessageService messageService;
@Override
public void order(String id) {
//發(fā)送消息隊(duì)列
messageService.sendMessage(id);
}
}
Spring整合ActiveMQ
首先下載activeMQ
下載地址:https://activemq.apache.org/components/classic/download/
下載之后進(jìn)行解壓縮
- 啟動(dòng)服務(wù)
打開(kāi)x64的bin目錄下執(zhí)行activemq.bat命令啟動(dòng)服務(wù)
啟動(dòng)成功 ,其中給出其web控制臺(tái)的訪問(wèn)地址:
進(jìn)入其管理界面:
默認(rèn)用戶名&密碼:admin
- SpringBoot進(jìn)行整合activemq
添加依賴:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
添加配置:
配置Spirng連接的地址,以及后邊消息存入的位置
server:
port: 80
spring:
activemq:
# 說(shuō)明spring連接的active的端口地址
broker-url: tcp://localhost:61616
進(jìn)行消息隊(duì)列的操作:
@Service
public class Messageservice implements MessageService {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Override
public void sendMessage(String id) {
System.out.println("將待發(fā)送的消息訂單納入到處理隊(duì)列.id:"+id);
jmsMessagingTemplate.convertAndSend(id);
}
@Override
public String doMessage() {
//將消息隊(duì)列中的類型轉(zhuǎn)移出來(lái),并在參數(shù)中規(guī)定轉(zhuǎn)移出來(lái)的消息類型
String s = jmsMessagingTemplate.receiveAndConvert(String.class);
System.out.println("已完成短信業(yè)務(wù)的發(fā)送,id:"+s);
return s;
}
}
在發(fā)送和獲取期間也可以規(guī)定名稱
jmsMessagingTemplate.convertAndSend("order.shishi.id",id);
String s = jmsMessagingTemplate.receiveAndConvert("order.shishi.id",String.class);
上述之中也有一個(gè)小問(wèn)題,就是在并不是每次消費(fèi)都需要進(jìn)行訪問(wèn),而是當(dāng)消息隊(duì)列中有消息就開(kāi)始消費(fèi)我們可以創(chuàng)建一個(gè)Listener
@Component
public class MessageListener {
@JmsListener(destination = "order.shishi.id")
public void receive(String id){
System.out.println("已完成的短信業(yè)務(wù):id:"+id);
}
}
這樣就自動(dòng)監(jiān)聽(tīng)指定位置下的消息,一有消息就自動(dòng)開(kāi)始消費(fèi),從服務(wù)開(kāi)始就一直存在
還有一個(gè)消息轉(zhuǎn)發(fā)的操作:
@Component
public class MessageListener {
@JmsListener(destination = "order.shishi.id")
@SendTo("order.bushi.id")
public void receive(String id){
System.out.println("已完成的短信業(yè)務(wù):id:"+id);
}
}
注解 @SendTo的作用是將監(jiān)聽(tīng)到的消息消費(fèi)之后將返回值返回到對(duì)應(yīng)的消息中去
上述使用的都是點(diǎn)對(duì)點(diǎn)的模型,如果要使用發(fā)布訂閱的模型,可以在配置文件中進(jìn)行配置:
spring:
activemq:
broker-url: tcp://localhost:61616
jms:
template:
default-destination: shishi
pub-sub-domain: true
Spring整合RabbitMQ
rabbitMQ基于Erlang語(yǔ)言編寫,需要安裝Erlang
首先需要下載Erlang:
下載地址:https://www.erlang.org/downloads
下載完成之需要重啟一下操作系統(tǒng)(重啟電腦)
配置環(huán)境變量
添加path:
安裝完成后下載RabbitMQ
下載地址:https://rabbitmq.com/install-windows.html
- 啟動(dòng)rabbitMQ
注意:要啟動(dòng)rabbitMQ服務(wù)需要命令行進(jìn)入到管理員身份運(yùn)行
rabbitMQ的控制臺(tái)界面(需要手動(dòng)配置插件):
在sbin目錄下找到:rabbitmq:plugins.bat命令
執(zhí)行命令展示其插件列表,通過(guò)命令開(kāi)啟插件
這樣就可以訪問(wèn)它的控制臺(tái)界面,端口號(hào)是15672,地址:http://localhost:15672
輸入默認(rèn)密碼:guest
Spring進(jìn)行整合rabbitMQ首先添加依賴:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
在配置文件中進(jìn)行rabbits的配置:
spring:
activemq:
broker-url: tcp://localhost:61616
jms:
template:
default-destination: shishi
pub-sub-domain: true
rabbitmq:
host: localhost
port: 5672
直連交換機(jī)模式
使用直連模式的交換機(jī)進(jìn)行消息隊(duì)列的開(kāi)發(fā):
首先需要在配置類中進(jìn)行直連交換機(jī)與消息隊(duì)列的綁定
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ConfigQM {
@Bean
public Queue directQueue(){
//第一個(gè)是消息隊(duì)列的名稱,第一個(gè)true表示消息持久化,第二個(gè)表示當(dāng)前的消息隊(duì)列是否是連接專用(連接一關(guān)消息隊(duì)列就關(guān)閉),第三個(gè)參數(shù)是是否刪除(當(dāng)消費(fèi)者生產(chǎn)者都不使用就刪除)
return new Queue("direct_queue",true,true,true);
}
//我們需要一個(gè)交換機(jī)去綁定消息隊(duì)列,此處設(shè)置一個(gè)交換機(jī)
@Bean
public DirectExchange directExchange(){
return new DirectExchange("directexchange");
}
@Bean
public Binding binding(){
//將消息隊(duì)列與交換機(jī)進(jìn)行綁定
return BindingBuilder.bind(directQueue()).to(directExchange()).with("direct");
}
}
綁定之后通過(guò)直連交換機(jī)進(jìn)行消息的存儲(chǔ)
@Service
public class amqpservice implements MessageService{
@Autowired
private AmqpTemplate amqpTemplate;
@Override
public void sendMessage(String id) {
//使用直連交換機(jī)
amqpTemplate.convertAndSend("directExchange","direct",id);
}
@Override
public String doMessage() {
return null;
}
}
然后從消息隊(duì)列中讀取消息寫在rabbitMQ監(jiān)聽(tīng)器下面:
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class RabbitMQListener {
@RabbitListener(queues = "direct_queue")
public void reveive(String id){
System.out.println("已完成短信發(fā)送業(yè)務(wù) id:"+id);
}
}
主題交換機(jī)模式
主題交換機(jī)可以模糊設(shè)置交換機(jī)綁定的名稱來(lái)達(dá)到分發(fā)的目的
例如:
@Bean
public Binding binding(){
//將消息隊(duì)列與交換機(jī)進(jìn)行綁定
return BindingBuilder.bind(directQueue()).to(directExchange()).with("topic_*_id");
}
在消息進(jìn)入消息隊(duì)列的時(shí)候:
amqpTemplate.convertAndSend("directExchange","topic_ni_id",id);
amqpTemplate.convertAndSend("directExchange","topic_bu_id",id);
這個(gè)兩種消息都可以進(jìn)入到消息隊(duì)列中去,而且通過(guò)這種方式也可以使消息進(jìn)入到不同的消息隊(duì)列中去
- 綁定案件的規(guī)則:
*(星號(hào)):用來(lái)表示一個(gè)單詞,且該單詞必須出現(xiàn)
#(井號(hào)):用來(lái)表示任意數(shù)量
Spring整合RocketMQ
下載地址:https://rocketmq.apache.org/
默認(rèn)服務(wù)端口:9876
配置環(huán)境變量:ROCKETMQ_HOME,PATH,NAMESER_ADDR(建議):127.0.0.1:9876
- 命名服務(wù)器與broker
當(dāng)后期的業(yè)務(wù)服務(wù)器增多時(shí),就需要不停的進(jìn)行服務(wù)器之間的連接,會(huì)變得非常繁瑣,但是如果我們有一臺(tái)服務(wù)器將所有的業(yè)務(wù)服務(wù)器注冊(cè)進(jìn)行,消費(fèi)者與生產(chǎn)者只需要連接命名服務(wù)器即可
- 首先啟動(dòng)命名服務(wù)器
雙擊文件啟動(dòng)命名服務(wù)器
然后雙擊mqbroker文件啟動(dòng)服務(wù)器:
如何測(cè)試服務(wù)器是否正常啟動(dòng):
在bin目錄下啟動(dòng)cmd:
首先使用第一個(gè)命名生成對(duì)應(yīng)的消息:
再使用第二個(gè)命令對(duì)生成的消息進(jìn)行消費(fèi):
進(jìn)行整合:
首先導(dǎo)入依賴坐標(biāo):
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.1</version>
</dependency>
在配置文件中配置其命名服務(wù)器:
rocketmq是與spring在同一層次下
rocketmq:
name-server: localhost:9876
producer:
group: group_rocketmq
進(jìn)行消息隊(duì)列的相關(guān)操作:
@Service
public class MessageRocketmqimpl implements MessageService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Override
public void sendMessage(String id) {
rocketMQTemplate.convertAndSend("sdasda",id);
}
@Override
public String doMessage() {
return null;
}
}
消費(fèi)者監(jiān)聽(tīng)器:
@Component
@RocketMQMessageListener(topic = "sdasda",consumerGroup = "group_rocketmq")
public class MessageRocketmqListener implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
System.out.println("id:"+s);
}
}
使用異步方式進(jìn)行發(fā)送:
@Service
public class MessageRocketmqimpl implements MessageService {
@Autowired
private RocketMQTemplate rocketMQTemplate;
@Override
public void sendMessage(String id) {
SendCallback callback=new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("消息發(fā)送成功");
}
@Override
public void onException(Throwable throwable) {
System.out.println("消息發(fā)送失敗");
}
}
// rocketMQTemplate.convertAndSend("sdasda",id);
rocketMQTemplate.asyncSend("sdasda",id,callback);
}
- 同步發(fā)送與異步發(fā)送的區(qū)別:
同步發(fā)送和異步發(fā)送是兩種不同的消息發(fā)送方式。在同步發(fā)送中,發(fā)送線程會(huì)等待消息發(fā)送完成并收到發(fā)送結(jié)果后繼續(xù)執(zhí)行,而在異步發(fā)送中,發(fā)送線程不會(huì)阻塞,可以立即執(zhí)行后續(xù)邏輯。選擇哪種方式取決于業(yè)務(wù)需求和對(duì)消息發(fā)送結(jié)果的要求。
Spring整合kafka
下載地址:https://kafka.apache.org/downloads
下載之后進(jìn)行解壓縮文件
解壓之后首先需要運(yùn)行:zookeeper-server-start.bat文件
這個(gè)文件相當(dāng)于一個(gè)注冊(cè)中心,需要先進(jìn)行注冊(cè)才能夠啟動(dòng)kafka服務(wù)器,作用相當(dāng)于RocketMQ中的命名服務(wù)器,需要在對(duì)應(yīng)目錄下cmd命令攜帶參數(shù)進(jìn)行啟動(dòng):
啟動(dòng)注冊(cè)服務(wù)器后,然后開(kāi)啟kafka服務(wù)器:
spring進(jìn)行整合kafka:
導(dǎo)入依賴坐標(biāo):
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
在配置文件中進(jìn)行配置,配置注冊(cè)服務(wù)器的地址:文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-859848.html
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: order
@Service
public class kafka implements MessageService {
@Autowired
private KafkaTemplate<String,String> kafkaTemplate;
@Override
public void sendMessage(String id) {
kafkaTemplate.send("adad",id);
}
@Override
public String doMessage() {
return null;
}
}
創(chuàng)建消費(fèi)者監(jiān)聽(tīng)器:文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-859848.html
@Component
public class kafkaListener {
@KafkaListener(topics = "adad")
public void onMessage(ConsumerRecord<String,String> consumerRecord){
System.out.println("id:"+consumerRecord.value());
}
}
到了這里,關(guān)于SpringBoot整合消息中間件(ActiveMQ,RabbitMQ,RocketMQ,Kafka)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!