消息的發(fā)送與接收
消息的發(fā)送與接收不僅僅是在于聊天功能的實現(xiàn)。其實還有很多種情況也算"消息的發(fā)送與接收"。而且我們還可以通過多種方法去實現(xiàn)。我們可以基于實際情況來選擇。
WebSocket實現(xiàn)
node做后端。找了好多,前端頁面總是用到了jQuery,包括底下的java做后端的前端代碼等。我們先用最簡單的代碼來幫助我們吧!
首先,配好package.json。注意依賴要引入ws。運行項目前先要運行一下:npm i
{
"name": "chat-server",
"version": "1.0.0",
"description": "",
"main": "client.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1",
"start": "node server.js"
},
"dependencies": {
"debug": "~2.6.9",
"ejs": "~2.6.1",
"express": "~4.16.1",
"express-session": "^1.17.2",
"http-errors": "~1.6.3",
"jsonwebtoken": "^8.5.1",
"ws": "^8.5.0"
},
"keywords": [],
"author": "",
"license": "ISC"
}
創(chuàng)建server.js,這是服務端
const WebSocket = require("ws")
WebSocketServer = WebSocket.WebSocketServer
const wss = new WebSocketServer({ port: 8080 });
wss.on('connection', function connection(ws) {
ws.on('message', function message(data, isBinary) {
wss.clients.forEach(function each(client) {
if (client !== ws && client.readyState === WebSocket.OPEN) {
client.send(data, { binary: isBinary });
}
});
});
ws.send('歡迎加入聊天室');
});
創(chuàng)建client.js,這是客戶端
const WebSocket = require("ws")
var ws = new WebSocket("ws://localhost:8080")
ws.onopen = ()=>{
console.log("open")
}
ws.onmessage = (evt)=>{
console.log(evt.data)
}
創(chuàng)建test.html,此處用vscode插件live-server打開。
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<meta http-equiv="X-UA-Compatible" content="ie=edge">
<title>Document</title>
</head>
<body>
<h1>websockets簡單示例</h1><br>
<div id="message"></div>
<div>
<input type="text" id="sendText">
<button id="connect" onclick="connect()">建立連接</button>
<button id="sendData" onclick="sendData()">發(fā)送數(shù)據(jù)</button>
<button id="closeConnect" onclick="closeConnect()">關閉連接</button>
</div>
</body>
<script type="text/javascript">
let websockets;
//創(chuàng)建一個數(shù)組對象用于存放當前的連接的狀態(tài),以便在頁面上實時展示出來當前的狀態(tài)
let statusArr = [
{ state: 0, value: '正在連接' },
{ state: 1, value: '已建立連接' },
{ state: 2, value: '正在關閉連接' },
{ state: 3, value: '已關閉連接' },
]
/**
* 建立連接
*
*/
function connect() {
// 1. 創(chuàng)建websockets對象,參數(shù)為服務器websockets地址
websockets = new WebSocket("ws:127.0.0.1:8080");
// 2.監(jiān)聽websocket的狀態(tài)變化,接收的信息,關閉時的狀態(tài)
//監(jiān)聽連接狀態(tài)的變化
websockets.onopen = (event) => socketChange();
//監(jiān)聽接收消息的情況
websockets.onmessage = (res) => {
document.querySelector("#message").innerHTML += `<p>接收數(shù)據(jù): ${res.data}</p>`
}
//監(jiān)聽關閉時的狀態(tài)變化
websockets.onclose = (event) => socketChange();
}
/**
* socket狀態(tài)變化
*
*/
function socketChange() {
let state = websockets.readyState;
let val = statusArr.map((item) => {
if (item.state == state) {
return item.value
}
});
//實時顯示狀態(tài)的變化
document.querySelector("#message").innerHTML += `<p>當前的socket連接狀態(tài)是: ${val}</p>`
}
/**
* 發(fā)送數(shù)據(jù)
*
*/
function sendData() {
//1. 首先獲取輸入的信息,判斷信息是否可以發(fā)送
let val = document.querySelector("#sendText").value;
if (val == "" || val == undefined) {
document.querySelector("#message").innerHTML += "<p>發(fā)送數(shù)據(jù)為空,請?zhí)顚懲瓿珊笤侔l(fā)送!</p>";
return;
}
websockets.send(val);
document.querySelector("#message").innerHTML += `<p>發(fā)送數(shù)據(jù):${val}</p>`;
}
/**
* 關閉連接
*
*/
function closeConnect() {
websockets.close();
}
</script>
</html>
在本文件夾內(nèi)的控制臺輸入 node .\server.js node .\client.js 啟動服務器端和客戶端。
我們打開Document兩個窗口,嘗試進行消息交流。
我們發(fā)現(xiàn)消息是能正常發(fā)送出去并能被正常接收到。
Java使用Socket實現(xiàn)
java做后端。此處使用了Spring-websocket: Spring boot整合websocket實現(xiàn)即時通訊 (gitee.com)的代碼。
引入依賴:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
<version>2.1.6.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-thymeleaf</artifactId>
<version>2.1.6.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<version>2.1.6.RELEASE</version>
<exclusions><!-- 去掉springboot默認配置 -->
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<version>2.1.6.RELEASE</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.1.6.RELEASE</version>
<exclusions><!-- 去掉springboot默認配置 -->
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.25</version>
</dependency>
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>2.0.1</version>
</dependency>
<dependency>
<groupId>com.github.pagehelper</groupId>
<artifactId>pagehelper</artifactId>
<version>4.1.6</version>
</dependency>
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi</artifactId>
<version>3.14</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.20</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.2.5</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.68</version>
</dependency>
<dependency>
<groupId>commons-net</groupId>
<artifactId>commons-net</artifactId>
<version>3.1</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-log4j2</artifactId>
<version>2.1.6.RELEASE</version>
</dependency>
核心方法:
webSocketServer類
package boot.spring.service;
import java.io.IOException;
import java.util.Date;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import org.springframework.stereotype.Component;
import com.alibaba.fastjson.JSON;
import boot.spring.po.Message;
@ServerEndpoint("/webSocket/{username}")
@Component
public class WebSocketServer {
//靜態(tài)變量,用來記錄當前在線連接數(shù)。應該把它設計成線程安全的。
private static AtomicInteger onlineNum = new AtomicInteger();
//concurrent包的線程安全Set,用來存放每個客戶端對應的WebSocketServer對象。
private static ConcurrentHashMap<String, Session> sessionPools = new ConcurrentHashMap<>();
//發(fā)送消息
public void sendMessage(Session session, String message) throws IOException {
if(session != null){
synchronized (session) {
System.out.println("發(fā)送數(shù)據(jù):" + message);
session.getBasicRemote().sendText(message);
}
}
}
//給指定用戶發(fā)送信息
public void sendInfo(String userName, String message){
Session session = sessionPools.get(userName);
try {
sendMessage(session, message);
}catch (Exception e){
e.printStackTrace();
}
}
// 群發(fā)消息
public void broadcast(String message){
for (Session session: sessionPools.values()) {
try {
sendMessage(session, message);
} catch(Exception e){
e.printStackTrace();
continue;
}
}
}
//建立連接成功調(diào)用
@OnOpen
public void onOpen(Session session, @PathParam(value = "username") String userName){
sessionPools.put(userName, session);
addOnlineCount();
System.out.println(userName + "加入webSocket!當前人數(shù)為" + onlineNum);
// 廣播上線消息
Message msg = new Message();
msg.setDate(new Date());
msg.setTo("0");
msg.setText(userName);
broadcast(JSON.toJSONString(msg,true));
}
//關閉連接時調(diào)用
@OnClose
public void onClose(@PathParam(value = "username") String userName){
sessionPools.remove(userName);
subOnlineCount();
System.out.println(userName + "斷開webSocket連接!當前人數(shù)為" + onlineNum);
// 廣播下線消息
Message msg = new Message();
msg.setDate(new Date());
msg.setTo("-2");
msg.setText(userName);
broadcast(JSON.toJSONString(msg,true));
}
//收到客戶端信息后,根據(jù)接收人的username把消息推下去或者群發(fā)
// to=-1群發(fā)消息
@OnMessage
public void onMessage(String message) throws IOException{
System.out.println("server get" + message);
Message msg=JSON.parseObject(message, Message.class);
msg.setDate(new Date());
if (msg.getTo().equals("-1")) {
broadcast(JSON.toJSONString(msg,true));
} else {
sendInfo(msg.getTo(), JSON.toJSONString(msg,true));
}
}
//錯誤時調(diào)用
@OnError
public void onError(Session session, Throwable throwable){
System.out.println("發(fā)生錯誤");
throwable.printStackTrace();
}
public static void addOnlineCount(){
onlineNum.incrementAndGet();
}
public static void subOnlineCount() {
onlineNum.decrementAndGet();
}
public static AtomicInteger getOnlineNumber() {
return onlineNum;
}
public static ConcurrentHashMap<String, Session> getSessionPools() {
return sessionPools;
}
}
controller層
@Controller
public class ChatController {
@Autowired
LoginService loginservice;
@RequestMapping("/onlineusers")
@ResponseBody
public Set<String> onlineusers(@RequestParam("currentuser") String currentuser) {
ConcurrentHashMap<String, Session> map = WebSocketServer.getSessionPools();
Set<String> set = map.keySet();
Iterator<String> it = set.iterator();
Set<String> nameset = new HashSet<String>();
while (it.hasNext()) {
String entry = it.next();
if (!entry.equals(currentuser))
nameset.add(entry);
}
return nameset;
}
@RequestMapping("getuid")
@ResponseBody
public User getuid(@RequestParam("username") String username) {
Long a = loginservice.getUidbyname(username);
User u = new User();
u.setUid(a);
return u;
}
}
啟動項目,訪問:http://localhost:8080/login。此前需要先運行項目里的stuff.sql文件,并配好其數(shù)據(jù)庫密碼等,點擊登錄即可。
這個項目包含了:即時通信,查看當前在線的其他用戶,用戶的上線提醒,群發(fā)消息等功能。
消息中間件實現(xiàn)
消息中間件市面上常見的四種:ActiveMQ 、RabbitMQ 、 RocketMQ、Kafka等。
幾種常見MQ的對比
RabbitMQ | ActiveMQ | RocketMQ | Kafka | |
---|---|---|---|---|
公司/社區(qū) | Rabbit | Apache | 阿里 | Apache |
開發(fā)語言 | Erlang | Java | Java | Scala&Java |
協(xié)議支持 | AMQP,XMPP,SMTP,STOMP | OpenWire,STOMP,REST,XMPP,AMQP | 自定義協(xié)議 | 自定義協(xié)議 |
可用性 | 高 | 一般 | 高 | 高 |
單機吞吐量 | 一般 | 差 | 高 | 非常高 |
消息延遲 | 微秒級 | 毫秒級 | 毫秒級 | 毫秒以內(nèi) |
消息可靠性 | 高 | 一般 | 高 | 一般 |
追求可用性:Kafka、 RocketMQ 、RabbitMQ
追求可靠性:RabbitMQ、RocketMQ
追求吞吐能力:RocketMQ、Kafka
追求消息低延遲:RabbitMQ、Kafka
綜合看來,我們選擇比較流行的rabbitmq來幫助我們。
部署
在安裝rabbitmq之前需要先安裝erlang,并配置好其環(huán)境變量。(跟java環(huán)境變量配置一樣)
安裝rabbitmq部署并啟動服務:安裝rabbitmq(解壓版教程)_rabbitmq-server 壓縮包版本如何使用-CSDN博客
啟動有可能會報毒,我們選擇允許。
訪問:RabbitMQ Management,顯示出rabbitmq的管理界面就算部署成功!
默認賬號密碼都是guest。
RabbitMQ的簡要概述
RabbitMQ中的一些角色:
- publisher:生產(chǎn)者
- consumer:消費者
- exchange個:交換機,負責消息路由
- queue:隊列,存儲消息
- virtualHost:虛擬主機,隔離不同租戶的exchange、queue、消息的隔離
RabbitMQ使用流程總結(jié)
基本消息隊列的消息發(fā)送流程:
-
建立connection
-
創(chuàng)建channel
-
利用channel聲明隊列
-
利用channel向隊列發(fā)送消息
基本消息隊列的消息接收流程:
-
建立connection
-
創(chuàng)建channel
-
利用channel聲明隊列
-
定義consumer的消費行為handleDelivery()
-
利用channel將消費者與隊列綁定
SpringAMQP
SpringAMQP是基于RabbitMQ封裝的一套模板,并且還利用SpringBoot對其實現(xiàn)了自動裝配,使用起來非常方便。
SpringAmqp的官方地址:https://spring.io/projects/spring-amqp
SpringAMQP提供了三個功能:
- 自動聲明隊列、交換機及其綁定關系
- 基于注解的監(jiān)聽器模式,異步接收消息
- 封裝了RabbitTemplate工具,用于發(fā)送消息
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--AMQP依賴,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--單元測試-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
</dependencies>
創(chuàng)建2個模塊,publisher和consumer,完善一下項目的啟動類,yaml配置文件等。每個模塊寫一個測試類:
PublisherTest:
public class PublisherTest {
@Test
public void testSendMessage() throws IOException, TimeoutException {
// 1.建立連接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.設置連接參數(shù),分別是:主機名、端口號、vhost、用戶名、密碼
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
// 1.2.建立連接
Connection connection = factory.newConnection();
// 2.創(chuàng)建通道Channel
Channel channel = connection.createChannel();
// 3.創(chuàng)建隊列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.發(fā)送消息
String message = "hello, rabbitmq!";
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println("發(fā)送消息成功:【" + message + "】");
// 5.關閉通道和連接
channel.close();
connection.close();
}
}
ConsumerTest:
public class ConsumerTest {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.建立連接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.設置連接參數(shù),分別是:主機名、端口號、vhost、用戶名、密碼
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("guest");
factory.setPassword("guest");
// 1.2.建立連接
Connection connection = factory.newConnection();
// 2.創(chuàng)建通道Channel
Channel channel = connection.createChannel();
// 3.創(chuàng)建隊列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.訂閱消息
channel.basicConsume(queueName, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
// 5.處理消息
String message = new String(body);
System.out.println("接收到消息:【" + message + "】");
}
});
System.out.println("等待接收消息。。。。");
}
}
先啟動publisher:
再看看rabbitmq的控制臺:
再啟動consumer
此時再看看rabbitmq控制臺:
消息已經(jīng)被成功消費了!
rabbitmq實現(xiàn)即時通信
具體可以參考:RabbitMQ實現(xiàn)即時通訊_rabbitmq 聊天-CSDN博客
MQTT(Message Queuing Telemetry Transport,消息隊列遙測傳輸協(xié)議),是一種基于發(fā)布/訂閱(publish/subscribe)模式的輕量級
通訊協(xié)議,該協(xié)議構(gòu)建于TCP/IP
協(xié)議上。MQTT最大優(yōu)點在于,可以以極少的代碼和有限的帶寬,為連接遠程設備提供實時可靠的消息服務。
RabbitMQ啟用MQTT功能,需要先安裝然RabbitMQ然后再啟用MQTT插件。
接下來就是啟用RabbitMQ的MQTT插件了,默認是不啟用的,使用命令開啟即可;
需要進到目錄\rabbitmq_server-3.9.13\sbin里,執(zhí)行如下命令:
.\rabbitmq-plugins.bat enable rabbitmq_mqtt
MQTTX
我們可以使用MQTT客戶端來測試MQTT的即時通訊功能,這里使用的是MQTTX
這個客戶端工具。下載地址:MQTTX: Your All-in-one MQTT Client Toolbox
點擊新建連接按鈕或者左邊的加號來創(chuàng)建一個MQTT客戶端;
接下來對MQTT客戶端進行配置,主要是配置好協(xié)議端口、連接用戶名密碼和QoS即可。
再配置一個訂閱者,訂閱者訂閱testTopicA
這個主題,我們會向這個主題發(fā)送消息;
發(fā)布者向主題中發(fā)布消息,訂閱者可以實時接收到。
前端直接實現(xiàn)即時通訊
既然
MQTTBox
客戶端可以直接通過RabbitMQ實現(xiàn)即時通訊,那我們是不是直接使用前端技術(shù)也可以實現(xiàn)即時通訊?答案是肯定的!下面我們將通過html+javascript
實現(xiàn)一個簡單的聊天功能,真正不寫一行后端代碼實現(xiàn)即時通訊!
- 由于RabbitMQ與Web端交互底層使用的是WebSocket,所以我們需要開啟RabbitMQ的MQTT WEB支持,使用如下命令開啟即可;
rabbitmq-plugins enable rabbitmq_web_mqtt
- 開啟成功后,查看管理控制臺,我們可以發(fā)現(xiàn)MQTT的WEB服務運行在
15675
端口上了。
WEB端與MQTT服務進行通訊需要使用一個叫MQTT.js
的庫,項目地址:https://github.com/mqttjs/MQTT.js
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<div>
<label>目標Topic:<input id="targetTopicInput" type="text"></label><br>
<label>發(fā)送消息:<input id="messageInput" type="text"></label><br>
<button onclick="sendMessage()">發(fā)送</button>
<button onclick="clearMessage()">清空</button>
<div id="messageDiv"></div>
</div>
</body>
<script src="https://unpkg.com/mqtt/dist/mqtt.min.js"></script>
<script>
//RabbitMQ的web-mqtt連接地址
const url = 'ws://localhost:15675/ws';
//獲取訂閱的topic
const topic = getQueryString("topic");
//連接到消息隊列
let client = mqtt.connect(url);
client.on('connect', function () {
//連接成功后訂閱topic
client.subscribe(topic, function (err) {
if (!err) {
showMessage("訂閱topic:" + topic + "成功!");
}
});
});
//獲取訂閱topic中的消息
client.on('message', function (topic, message) {
showMessage("收到消息:" + message.toString());
});
//發(fā)送消息
function sendMessage() {
let targetTopic = document.getElementById("targetTopicInput").value;
let message = document.getElementById("messageInput").value;
//向目標topic中發(fā)送消息
client.publish(targetTopic, message);
showMessage("發(fā)送消息給" + targetTopic + "的消息:" + message);
}
//從URL中獲取參數(shù)
function getQueryString(name) {
let reg = new RegExp("(^|&)" + name + "=([^&]*)(&|$)", "i");
let r = window.location.search.substr(1).match(reg);
if (r != null) {
return decodeURIComponent(r[2]);
}
return null;
}
//在消息列表中展示消息
function showMessage(message) {
let messageDiv = document.getElementById("messageDiv");
let messageEle = document.createElement("div");
messageEle.innerText = message;
messageDiv.appendChild(messageEle);
}
//清空消息列表
function clearMessage() {
let messageDiv = document.getElementById("messageDiv");
messageDiv.innerHTML = "";
}
</script>
</html>
之后在界面輸入對應的topic地址,發(fā)送消息,發(fā)現(xiàn)是可以實現(xiàn)的!
在SpringBoot中使用
沒有特殊業(yè)務需求的時候,前端可以直接和RabbitMQ對接實現(xiàn)即時通訊。但是有時候我們需要通過服務端去通知前端,此時就需要在應用中集成MQTT了
此處項目源碼地址:https://github.com/macrozheng/mall-learning/tree/master/mall-tiny-mqtt
首先,項目需要引入mqtt依賴
<!--Spring集成MQTT-->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>
在application.yml添加配置:
rabbitmq:
mqtt:
url: tcp://localhost:1883
username: guest
password: guest
defaultTopic: testTopic
編寫一個Java配置類從配置文件中讀取配置便于使用;
/**
* MQTT相關配置
*/
@Data
@EqualsAndHashCode(callSuper = false)
@Component
@ConfigurationProperties(prefix = "rabbitmq.mqtt")
public class MqttConfig {
/**
* RabbitMQ連接用戶名
*/
private String username;
/**
* RabbitMQ連接密碼
*/
private String password;
/**
* RabbitMQ的MQTT默認topic
*/
private String defaultTopic;
/**
* RabbitMQ的MQTT連接地址
*/
private String url;
}
添加MQTT消息訂閱者相關配置,使用@ServiceActivator
注解聲明一個服務激活器,通過MessageHandler
來處理訂閱消息;
/**
* MQTT消息訂閱者相關配置
*/
@Slf4j
@Configuration
public class MqttInboundConfig {
@Autowired
private MqttConfig mqttConfig;
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getUrl(), "subscriberClient",
mqttConfig.getDefaultTopic());
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
//設置消息質(zhì)量:0->至多一次;1->至少一次;2->只有一次
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
@Bean
@ServiceActivator(inputChannel = "mqttInputChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message<?> message) throws MessagingException {
//處理訂閱消息
log.info("handleMessage : {}",message.getPayload());
}
};
}
}
注意:messageHandler導包路徑:import org.springframework.messaging.MessageHandler;
添加MQTT消息發(fā)布者相關配置;
/**
* MQTT消息發(fā)布者相關配置
*/
@Configuration
public class MqttOutboundConfig {
@Autowired
private MqttConfig mqttConfig;
@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setServerURIs(new String[] { mqttConfig.getUrl()});
options.setUserName(mqttConfig.getUsername());
options.setPassword(mqttConfig.getPassword().toCharArray());
factory.setConnectionOptions(options);
return factory;
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler =
new MqttPahoMessageHandler("publisherClient", mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic(mqttConfig.getDefaultTopic());
return messageHandler;
}
@Bean
public MessageChannel mqttOutboundChannel() {
return new DirectChannel();
}
}
添加MQTT網(wǎng)關,用于向主題中發(fā)送消息;
/**
* MQTT網(wǎng)關,通過接口將數(shù)據(jù)傳遞到集成流
*/
@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
/**
* 發(fā)送消息到默認topic
*/
void sendToMqtt(String payload);
/**
* 發(fā)送消息到指定topic
*/
void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic);
/**
* 發(fā)送消息到指定topic并設置QOS
*/
void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}
添加MQTT測試接口,使用MQTT網(wǎng)關向特定主題中發(fā)送消息;
/**
* MQTT測試接口
*/
@RestController
@RequestMapping("/mqtt")
public class MqttController {
@Autowired
private MqttGateway mqttGateway;
@PostMapping("/sendToDefaultTopic")
public CommonResult sendToDefaultTopic(String payload) {
mqttGateway.sendToMqtt(payload);
return CommonResult.success(null);
}
@PostMapping("/sendToTopic")
public CommonResult sendToTopic(String payload, String topic) {
mqttGateway.sendToMqtt(payload, topic);
return CommonResult.success(null);
}
}
別忘聲明一個返回類
package com.itcast.mq.model;
import lombok.Data;
import java.io.Serializable;
@Data
public class CommonResult<T> {
/*返回體*/
private Integer code;
private String msg;
private T data;
/*成功,且返回體有數(shù)據(jù)*/
public static CommonResult success(Object object) {
CommonResult r = new CommonResult();
r.setCode(200);
r.setMsg("成功");
r.setData(object);
return r;
}
//成功,但返回體沒數(shù)據(jù)
public static CommonResult success(){
return success(null);
}
//失敗返回信息
public static CommonResult Err(Integer code,String msg){
CommonResult r = new CommonResult();
r.setCode(code);
r.setMsg(msg);
return r;
}
}
配置一下啟動類
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
啟動該項目,打開postman輸入鏈接:http://localhost:8080/mqtt//sendToTopic?payload=&topic=。填好輸入內(nèi)容和目標topic的id即可!
回到mqtt客戶端看一下
消息發(fā)送成功!
netty實現(xiàn)
Netty是 一個異步事件驅(qū)動的網(wǎng)絡應用程序框架,用于快速開發(fā)可維護的高性能協(xié)議服務器和客戶端。
超詳細Netty入門,看這篇就夠了! - 知乎 (zhihu.com)
具體實現(xiàn)可以參考:https://blog.csdn.net/weixin_44814270/article/details/132947704
依賴:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
<version>2.1.6.RELEASE</version>
<exclusions><!-- 去掉springboot默認配置 -->
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<version>2.1.6.RELEASE</version>
<exclusions><!-- 去掉springboot默認配置 -->
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-logging</artifactId>
</exclusion>
</exclusions>
</dependency>
<!--我這里使用的是jfinal-enjoy模板引擎-->
<dependency>
<groupId>com.jfinal</groupId>
<artifactId>enjoy</artifactId>
<version>5.1.2</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.65.Final</version> <!-- 使用最新版本 -->
</dependency>
</dependencies>
配置NettyChatServer:
public class NettyChatServer {
private final int port;
private final EventExecutorGroup eventExecutorGroup;
private final ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
public NettyChatServer(int port) {
this.port = port;
this.eventExecutorGroup = new DefaultEventExecutorGroup(4); // 用于在handler中處理耗時任務
}
public void start() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// 字符串編解碼器,用于將消息編碼成字符串和解碼成字符串
pipeline.addLast(new HttpServerCodec());
pipeline.addLast(new HttpObjectAggregator(65536));
pipeline.addLast(new WebSocketServerProtocolHandler("/websocket"));
pipeline.addLast(eventExecutorGroup, new ChatServerHandler(channelGroup));
// 添加自定義的聊天處理器
// pipeline.addLast(eventExecutorGroup, new ChatServerHandler(channelGroup));
}
});
ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
System.out.println("Chat Server started on port " + port);
channelFuture.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
public void stop() {
// 停止服務器
}
public static void main(String[] args) {
int port = 8888;
NettyChatServer chatServer = new NettyChatServer(port);
try {
chatServer.start();
} catch (Exception e) {
e.printStackTrace();
}
}
配置ChatServerHandler
public class ChatServerHandler extends SimpleChannelInboundHandler<WebSocketFrame> {
private final ChannelGroup channelGroup;
public ChatServerHandler(ChannelGroup channelGroup) {
this.channelGroup = channelGroup;
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {
// 處理WebSocket消息
if (frame instanceof TextWebSocketFrame) {
TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
String message = textFrame.text();
// 在服務器控制臺上輸出消息
System.out.println("Received message: " + message);
// 將消息廣播給所有連接的客戶端
channelGroup.writeAndFlush(new TextWebSocketFrame(message));
}
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
// 新客戶端連接時添加到ChannelGroup
channelGroup.add(ctx.channel());
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
// 客戶端斷開連接時從ChannelGroup中移除
channelGroup.remove(ctx.channel());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// 異常處理
cause.printStackTrace();
ctx.close();
}
因為項目用到了,所以我們需要jfinal-enjoy模板引擎文件:
package com.zd.config;
import com.jfinal.template.Engine;
import com.jfinal.template.ext.spring.JFinalViewResolver;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class EnjoyConfig {
@Bean(name = "jfinalViewResolver")
public JFinalViewResolver getJFinalViewResolver() {
// 創(chuàng)建用于整合 spring boot 的 ViewResolver 擴展對象
JFinalViewResolver jfr = new JFinalViewResolver();
// 對 spring boot 進行配置
jfr.setSuffix(".html");
jfr.setContentType("text/html;charset=UTF-8");
jfr.setOrder(0);
// 設置在模板中可通過 #(session.value) 訪問 session 中的數(shù)據(jù)
jfr.setSessionInView(true);
// 獲取 engine 對象,對 enjoy 模板引擎進行配置,配置方式與前面章節(jié)完全一樣
Engine engine = JFinalViewResolver.engine;
// 熱加載配置能對后續(xù)配置產(chǎn)生影響,需要放在最前面
engine.setDevMode(true);
// 使用 ClassPathSourceFactory 從 class path 與 jar 包中加載模板文件
engine.setToClassPathSourceFactory();
// 在使用 ClassPathSourceFactory 時要使用 setBaseTemplatePath
// 設置靜態(tài)資源路徑在 /static 下
engine.setBaseTemplatePath("/static/");
return jfr;
}
}
配置一下controller層:
@Controller
public class ChatController {
@RequestMapping("/chat")
public String main() {
return "main";
}
}
配置html、css、js
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>簡單聊天室</title>
<!-- 引入Bootstrap CSS文件 -->
<link href="./bootstarp/css/bootstrap.min.css" rel="stylesheet">
<link href="main.css" rel="stylesheet">
</head>
<body>
<div class="container">
<div class="row">
<div class="col-md-8 offset-md-2">
<div id="chat-container">
<div id="chat-header">
<h2>簡單聊天室</h2>
</div>
<div id="chat-box">
<!-- 示例聊天消息 -->
<!-- <div class="message">
<div class="avatar">A</div>
<div class="message-content">
<div class="sender-name">User 1</div>
<div class="message-text">Hello, how are you?</div>
</div>
</div>-->
<!-- 示例聊天消息結(jié)束 -->
</div>
<div id="message-buttons">
<input type="text" id="message-input" placeholder="輸入消息...">
<input type="file" id="file-input">
<button class="btn btn-primary" onclick="sendMessage()">發(fā)送文本</button>
<button class="btn btn-primary" onclick="sendFile()">發(fā)送文件</button>
<button class="btn btn-danger" id="clear-button" onclick="clearChat()">清空聊天</button>
</div>
</div>
</div>
</div>
</div>
<!-- 引入jQuery -->
<script src="https://code.jquery.com/jquery-3.6.0.min.js"></script>
<!-- 引入Bootstrap JS文件 -->
<script src="./bootstarp/js/bootstrap.min.js"></script>
<script src="main.js"></script>
</body>
</html>
body {
background-color: #f2f2f2;
font-family: Arial, Helvetica, sans-serif;
margin: 0;
padding: 0;
}
#chat-container {
max-width: 600px;
margin: 20px auto;
background-color: #fff;
border-radius: 5px;
box-shadow: 0px 0px 10px rgba(0, 0, 0, 0.1);
overflow: hidden;
}
#chat-header {
background-color: #007BFF;
color: #fff;
padding: 10px;
text-align: center;
border-top-left-radius: 5px;
border-top-right-radius: 5px;
}
#chat-box {
max-height: 100vh;
width: 100%;
overflow-y: scroll;
height: 600px;
}
.message {
display: flex;
margin: 10px;
padding: 10px;
border-bottom: 1px solid #ccc;
}
.avatar {
width: 50px;
height: 50px;
background-color: #007BFF;
color: #fff;
border-radius: 50%;
text-align: center;
line-height: 50px;
margin-right: 10px;
}
.message-content {
flex-grow: 3;
}
.sender-name {
font-weight: bold;
margin-bottom: 5px;
}
.message-text {
word-wrap: break-word;
}
#message-input, #file-input {
width: 100%;
padding: 10px;
border: 1px solid #ccc;
}
#message-buttons {
padding: 10px;
text-align: center;
}
button {
padding: 10px 20px;
background-color: #007BFF;
color: #fff;
border: none;
cursor: pointer;
margin-right: 10px;
}
button:hover {
background-color: #0056b3;
}
#clear-button {
background-color: #dc3545;
}
@media (max-width: 768px) {
#chat-container {
margin-top: 10px;
}
#chat-box {
max-height: 200px;
}
}
// WebSocket連接
const socket = new WebSocket('ws://localhost:8888/websocket'); // 請將 your_netty_server_address 替換為實際的Netty WebSocket服務器地址
socket.addEventListener('open', (event) => {
console.log('WebSocket連接已建立');
});
socket.addEventListener('message', (event) => {
// 解析消息
console.log(event)
const data = JSON.parse(event.data);
const chatBox = document.getElementById('chat-box');
if (data.type === 'text') {
// 接收到文本消息
const messageDiv = document.createElement('div');
messageDiv.classList.add('message');
const avatarDiv = document.createElement('div');
avatarDiv.classList.add('avatar');
avatarDiv.textContent = data.sender.charAt(0); // 使用發(fā)送者的首字母作為頭像內(nèi)容
const messageContentDiv = document.createElement('div');
messageContentDiv.classList.add('message-content');
const senderNameDiv = document.createElement('div');
senderNameDiv.classList.add('sender-name');
senderNameDiv.textContent = data.sender;
const messageTextDiv = document.createElement('div');
messageTextDiv.classList.add('message-text');
messageTextDiv.textContent = data.message;
messageContentDiv.appendChild(senderNameDiv);
messageContentDiv.appendChild(messageTextDiv);
messageDiv.appendChild(avatarDiv);
messageDiv.appendChild(messageContentDiv);
chatBox.appendChild(messageDiv);
} else if (data.type === 'file') {
// 接收到文件消息
const fileURL = URL.createObjectURL(data.file);
const messageDiv = document.createElement('div');
messageDiv.classList.add('message');
const avatarDiv = document.createElement('div');
avatarDiv.classList.add('avatar');
avatarDiv.textContent = data.sender.charAt(0); // 使用發(fā)送者的首字母作為頭像內(nèi)容
const messageContentDiv = document.createElement('div');
messageContentDiv.classList.add('message-content');
const senderNameDiv = document.createElement('div');
senderNameDiv.classList.add('sender-name');
senderNameDiv.textContent = data.sender;
const fileLink = document.createElement('a');
fileLink.href = fileURL;
fileLink.textContent = '下載文件';
fileLink.download = data.fileName;
messageContentDiv.appendChild(senderNameDiv);
messageContentDiv.appendChild(fileLink);
messageDiv.appendChild(avatarDiv);
messageDiv.appendChild(messageContentDiv);
chatBox.appendChild(messageDiv);
}
// 滾動到最新消息
chatBox.scrollTop = chatBox.scrollHeight;
});
function sendMessage() {
const messageInput = document.getElementById('message-input');
const message = messageInput.value.trim();
if (message !== '') {
// 發(fā)送文本消息到服務器
const data = {
sender: 'YSK',
type: 'text',
message: message
};
socket.send(JSON.stringify(data));
// 清空輸入框
messageInput.value = '';
}
}
function sendFile() {
const fileInput = document.getElementById('file-input');
const file = fileInput.files[0];
if (file) {
// 發(fā)送文件到服務器
const reader = new FileReader();
reader.onload = function (event) {
const data = {
type: 'file',
fileName: file.name,
file: event.target.result
};
socket.send(JSON.stringify(data));
};
reader.readAsArrayBuffer(file);
// 清空文件選擇框
fileInput.value = '';
}
}
function clearChat() {
const chatBox = document.getElementById('chat-box');
chatBox.innerHTML = '';
}
// 監(jiān)聽Enter鍵,發(fā)送文本消息
const messageInput = document.getElementById('message-input');
messageInput.addEventListener('keyup', function (event) {
if (event.key === 'Enter') {
sendMessage();
}
});
先啟動nettyServer 再啟動Application,訪問:簡單聊天室
我們似乎能基本實現(xiàn)聊天。(發(fā)送文件需要服務器存儲未實現(xiàn))。但是還是缺了太多東西。這里netty我們還是接觸了一點點罷了。
netty更多內(nèi)容可以參考官方文檔Netty: Home或者黑馬的教程:https://www.bilibili.com/video/BV1py4y1E7oA
第三方平臺實現(xiàn)
環(huán)信 - 中國IM即時通訊云服務開創(chuàng)者! (easemob.com)
每個免費用戶最多可以注冊100個能通訊的用戶。
基于項目的實現(xiàn)
簡便且功能要求不太多的:websocket
搭建難度高,但是穩(wěn)定且專業(yè)的:netty
哪都沾一點的:rabbitmq
直接上手但是有限制需要花錢的:第三方
參考文檔
Spring-websocket: Spring boot整合websocket實現(xiàn)即時通訊 (gitee.com)
RabbitMQ實現(xiàn)即時通訊_rabbitmq 聊天-CSDN博客文章來源:http://www.zghlxwxcb.cn/news/detail-802660.html
https://github.com/macrozheng/mall-learning/tree/master/mall-tiny-mqtt文章來源地址http://www.zghlxwxcb.cn/news/detail-802660.html
到了這里,關于消息的發(fā)送與接收的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!