国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

消息的發(fā)送與接收

這篇具有很好參考價值的文章主要介紹了消息的發(fā)送與接收。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

消息的發(fā)送與接收

消息的發(fā)送與接收不僅僅是在于聊天功能的實現(xiàn)。其實還有很多種情況也算"消息的發(fā)送與接收"。而且我們還可以通過多種方法去實現(xiàn)。我們可以基于實際情況來選擇。

WebSocket實現(xiàn)

node做后端。找了好多,前端頁面總是用到了jQuery,包括底下的java做后端的前端代碼等。我們先用最簡單的代碼來幫助我們吧!

首先,配好package.json。注意依賴要引入ws。運行項目前先要運行一下:npm i

{
  "name": "chat-server",
  "version": "1.0.0",
  "description": "",
  "main": "client.js",
  "scripts": {
    "test": "echo \"Error: no test specified\" && exit 1",
    "start": "node server.js"
  },
  "dependencies": {
    "debug": "~2.6.9",
    "ejs": "~2.6.1",
    "express": "~4.16.1",
    "express-session": "^1.17.2",
    "http-errors": "~1.6.3",
    "jsonwebtoken": "^8.5.1",
    "ws": "^8.5.0"
  },
  "keywords": [],
  "author": "",
  "license": "ISC"
}

創(chuàng)建server.js,這是服務端

const  WebSocket = require("ws")
WebSocketServer = WebSocket.WebSocketServer
const wss = new WebSocketServer({ port: 8080 });
wss.on('connection', function connection(ws) {
    ws.on('message', function message(data, isBinary) {
        wss.clients.forEach(function each(client) {
            if (client !== ws && client.readyState === WebSocket.OPEN) {
                client.send(data, { binary: isBinary });
            }
        });

    });

    ws.send('歡迎加入聊天室');
});

創(chuàng)建client.js,這是客戶端

const  WebSocket = require("ws")
var ws = new WebSocket("ws://localhost:8080")
ws.onopen = ()=>{
    console.log("open")
}
ws.onmessage = (evt)=>{
    console.log(evt.data)
}

創(chuàng)建test.html,此處用vscode插件live-server打開。

<!DOCTYPE html>
<html lang="en">

<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <meta http-equiv="X-UA-Compatible" content="ie=edge">
    <title>Document</title>
</head>

<body>
    <h1>websockets簡單示例</h1><br>
    <div id="message"></div>
    <div>
        <input type="text" id="sendText">
        <button id="connect" onclick="connect()">建立連接</button>
        <button id="sendData" onclick="sendData()">發(fā)送數(shù)據(jù)</button>
        <button id="closeConnect" onclick="closeConnect()">關閉連接</button>
    </div>
</body>
<script type="text/javascript">
    let websockets;
    //創(chuàng)建一個數(shù)組對象用于存放當前的連接的狀態(tài),以便在頁面上實時展示出來當前的狀態(tài)
    let statusArr = [
        { state: 0, value: '正在連接' },
        { state: 1, value: '已建立連接' },
        { state: 2, value: '正在關閉連接' },
        { state: 3, value: '已關閉連接' },
    ]
    /**
    *   建立連接
    *
    */
    function connect() {
        // 1. 創(chuàng)建websockets對象,參數(shù)為服務器websockets地址
        websockets = new WebSocket("ws:127.0.0.1:8080");

        // 2.監(jiān)聽websocket的狀態(tài)變化,接收的信息,關閉時的狀態(tài)

        //監(jiān)聽連接狀態(tài)的變化
        websockets.onopen = (event) => socketChange();

        //監(jiān)聽接收消息的情況
        websockets.onmessage = (res) => {
            document.querySelector("#message").innerHTML += `<p>接收數(shù)據(jù): ${res.data}</p>`
        }

        //監(jiān)聽關閉時的狀態(tài)變化
        websockets.onclose = (event) => socketChange();
    }
    /**
    *   socket狀態(tài)變化
    *
    */
    function socketChange() {
        let state = websockets.readyState;
        let val = statusArr.map((item) => {
            if (item.state == state) {
                return item.value
            }
        });

        //實時顯示狀態(tài)的變化
        document.querySelector("#message").innerHTML += `<p>當前的socket連接狀態(tài)是: ${val}</p>`
    }
    /**
    *   發(fā)送數(shù)據(jù)
    *
    */
    function sendData() {
        //1. 首先獲取輸入的信息,判斷信息是否可以發(fā)送
        let val = document.querySelector("#sendText").value;

        if (val == "" || val == undefined) {
            document.querySelector("#message").innerHTML += "<p>發(fā)送數(shù)據(jù)為空,請?zhí)顚懲瓿珊笤侔l(fā)送!</p>";
            return;
        }

        websockets.send(val);
        document.querySelector("#message").innerHTML += `<p>發(fā)送數(shù)據(jù):${val}</p>`;
    }
    /**
    *   關閉連接
    *
    */
    function closeConnect() {
        websockets.close();
    }
</script>
</html>

在本文件夾內(nèi)的控制臺輸入 node .\server.js node .\client.js 啟動服務器端和客戶端。

我們打開Document兩個窗口,嘗試進行消息交流。

消息的發(fā)送與接收,java

我們發(fā)現(xiàn)消息是能正常發(fā)送出去并能被正常接收到。

Java使用Socket實現(xiàn)

java做后端。此處使用了Spring-websocket: Spring boot整合websocket實現(xiàn)即時通訊 (gitee.com)的代碼。

引入依賴:

 <dependencies>
		<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-websocket</artifactId>
            <version>2.1.6.RELEASE</version>
        </dependency>
        
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-thymeleaf</artifactId>
			<version>2.1.6.RELEASE</version>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter</artifactId>
			<version>2.1.6.RELEASE</version>
			<exclusions><!-- 去掉springboot默認配置 -->  
		        <exclusion>  
		            <groupId>org.springframework.boot</groupId>  
		            <artifactId>spring-boot-starter-logging</artifactId>  
		        </exclusion>  
		    </exclusions>  
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<version>2.1.6.RELEASE</version>
			<scope>test</scope>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
			<version>2.1.6.RELEASE</version>
			<exclusions><!-- 去掉springboot默認配置 -->  
		        <exclusion>  
		            <groupId>org.springframework.boot</groupId>  
		            <artifactId>spring-boot-starter-logging</artifactId>  
		        </exclusion>  
		    </exclusions>  
		</dependency>
	  <dependency>
		  <groupId>mysql</groupId>
		  <artifactId>mysql-connector-java</artifactId>
		  <version>8.0.25</version>
	  </dependency>
        <dependency>
			<groupId>org.mybatis.spring.boot</groupId>
			<artifactId>mybatis-spring-boot-starter</artifactId>
			<version>2.0.1</version>
		</dependency>
		<dependency>
		    <groupId>com.github.pagehelper</groupId>
		    <artifactId>pagehelper</artifactId>
		    <version>4.1.6</version>
		</dependency>
		<dependency>
		    <groupId>org.apache.poi</groupId>
		    <artifactId>poi</artifactId>
		    <version>3.14</version>
		</dependency>

		<dependency>
		    <groupId>com.alibaba</groupId>
		    <artifactId>druid</artifactId>
		    <version>1.1.20</version>
		</dependency>
		<dependency>
		   <groupId>org.postgresql</groupId>
		   <artifactId>postgresql</artifactId>
		   <version>42.2.5</version>
		</dependency>
		<dependency>
		    <groupId>com.alibaba</groupId>
		    <artifactId>fastjson</artifactId>
		    <version>1.2.68</version>
		</dependency>
		<dependency>
            <groupId>commons-net</groupId>
            <artifactId>commons-net</artifactId>
            <version>3.1</version>
        </dependency>
        <dependency>
		    <groupId>org.springframework.boot</groupId>  
		    <artifactId>spring-boot-starter-log4j2</artifactId>
		    <version>2.1.6.RELEASE</version>
		</dependency>

核心方法:

webSocketServer類

package boot.spring.service;

import java.io.IOException;
import java.util.Date;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;

import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;

import org.springframework.stereotype.Component;

import com.alibaba.fastjson.JSON;

import boot.spring.po.Message;

@ServerEndpoint("/webSocket/{username}")
@Component
public class WebSocketServer {
	 //靜態(tài)變量,用來記錄當前在線連接數(shù)。應該把它設計成線程安全的。
    private static AtomicInteger onlineNum = new AtomicInteger();
    //concurrent包的線程安全Set,用來存放每個客戶端對應的WebSocketServer對象。
    private static ConcurrentHashMap<String, Session> sessionPools = new ConcurrentHashMap<>();
    //發(fā)送消息
    public void sendMessage(Session session, String message) throws IOException {
        if(session != null){
            synchronized (session) {
                System.out.println("發(fā)送數(shù)據(jù):" + message);
                session.getBasicRemote().sendText(message);
            }
        }
    }
    //給指定用戶發(fā)送信息
    public void sendInfo(String userName, String message){
        Session session = sessionPools.get(userName);
        try {
            sendMessage(session, message);
        }catch (Exception e){
            e.printStackTrace();
        }
    }
    // 群發(fā)消息
    public void broadcast(String message){
    	for (Session session: sessionPools.values()) {
            try {
                sendMessage(session, message);
            } catch(Exception e){
                e.printStackTrace();
                continue;
            }
        }
    }
    //建立連接成功調(diào)用
    @OnOpen
    public void onOpen(Session session, @PathParam(value = "username") String userName){
        sessionPools.put(userName, session);
        addOnlineCount();
        System.out.println(userName + "加入webSocket!當前人數(shù)為" + onlineNum);
        // 廣播上線消息
        Message msg = new Message();
        msg.setDate(new Date());
        msg.setTo("0");
        msg.setText(userName);
        broadcast(JSON.toJSONString(msg,true));
    }
    //關閉連接時調(diào)用
    @OnClose
    public void onClose(@PathParam(value = "username") String userName){
        sessionPools.remove(userName);
        subOnlineCount();
        System.out.println(userName + "斷開webSocket連接!當前人數(shù)為" + onlineNum);
        // 廣播下線消息
        Message msg = new Message();
        msg.setDate(new Date());
        msg.setTo("-2");
        msg.setText(userName);
        broadcast(JSON.toJSONString(msg,true));
    }

    //收到客戶端信息后,根據(jù)接收人的username把消息推下去或者群發(fā)
    // to=-1群發(fā)消息
    @OnMessage
    public void onMessage(String message) throws IOException{
        System.out.println("server get" + message);
        Message msg=JSON.parseObject(message, Message.class);
		msg.setDate(new Date());
		if (msg.getTo().equals("-1")) {
			broadcast(JSON.toJSONString(msg,true));
		} else {
			sendInfo(msg.getTo(), JSON.toJSONString(msg,true));
		}
    }

    //錯誤時調(diào)用
    @OnError
    public void onError(Session session, Throwable throwable){
        System.out.println("發(fā)生錯誤");
        throwable.printStackTrace();
    }

    public static void addOnlineCount(){
        onlineNum.incrementAndGet();
    }

    public static void subOnlineCount() {
        onlineNum.decrementAndGet();
    }
    
    public static AtomicInteger getOnlineNumber() {
        return onlineNum;
    }
    
    public static ConcurrentHashMap<String, Session> getSessionPools() {
        return sessionPools;
    }
}

controller層

@Controller
public class ChatController {

	@Autowired
	LoginService loginservice;
    
	@RequestMapping("/onlineusers")
	@ResponseBody
	public Set<String> onlineusers(@RequestParam("currentuser") String currentuser) {
		ConcurrentHashMap<String, Session> map = WebSocketServer.getSessionPools();
		Set<String> set = map.keySet();
		Iterator<String> it = set.iterator();
		Set<String> nameset = new HashSet<String>();
		while (it.hasNext()) {
			String entry = it.next();
			if (!entry.equals(currentuser))
				nameset.add(entry);
		}
		return nameset;
	}


	@RequestMapping("getuid")
	@ResponseBody
	public User getuid(@RequestParam("username") String username) {
		Long a = loginservice.getUidbyname(username);
		User u = new User();
		u.setUid(a);
		return u;
	}
}

啟動項目,訪問:http://localhost:8080/login。此前需要先運行項目里的stuff.sql文件,并配好其數(shù)據(jù)庫密碼等,點擊登錄即可。

消息的發(fā)送與接收,java

這個項目包含了:即時通信,查看當前在線的其他用戶,用戶的上線提醒,群發(fā)消息等功能。

消息中間件實現(xiàn)

消息中間件市面上常見的四種:ActiveMQ 、RabbitMQ 、 RocketMQ、Kafka等。

幾種常見MQ的對比

RabbitMQ ActiveMQ RocketMQ Kafka
公司/社區(qū) Rabbit Apache 阿里 Apache
開發(fā)語言 Erlang Java Java Scala&Java
協(xié)議支持 AMQP,XMPP,SMTP,STOMP OpenWire,STOMP,REST,XMPP,AMQP 自定義協(xié)議 自定義協(xié)議
可用性 一般
單機吞吐量 一般 非常高
消息延遲 微秒級 毫秒級 毫秒級 毫秒以內(nèi)
消息可靠性 一般 一般

追求可用性:Kafka、 RocketMQ 、RabbitMQ

追求可靠性:RabbitMQ、RocketMQ

追求吞吐能力:RocketMQ、Kafka

追求消息低延遲:RabbitMQ、Kafka

綜合看來,我們選擇比較流行的rabbitmq來幫助我們。

部署

在安裝rabbitmq之前需要先安裝erlang,并配置好其環(huán)境變量。(跟java環(huán)境變量配置一樣)

安裝rabbitmq部署并啟動服務:安裝rabbitmq(解壓版教程)_rabbitmq-server 壓縮包版本如何使用-CSDN博客

啟動有可能會報毒,我們選擇允許。

訪問:RabbitMQ Management,顯示出rabbitmq的管理界面就算部署成功!

默認賬號密碼都是guest。

RabbitMQ的簡要概述

RabbitMQ中的一些角色:

  • publisher:生產(chǎn)者
  • consumer:消費者
  • exchange個:交換機,負責消息路由
  • queue:隊列,存儲消息
  • virtualHost:虛擬主機,隔離不同租戶的exchange、queue、消息的隔離

RabbitMQ使用流程總結(jié)

基本消息隊列的消息發(fā)送流程:

  1. 建立connection

  2. 創(chuàng)建channel

  3. 利用channel聲明隊列

  4. 利用channel向隊列發(fā)送消息

基本消息隊列的消息接收流程:

  1. 建立connection

  2. 創(chuàng)建channel

  3. 利用channel聲明隊列

  4. 定義consumer的消費行為handleDelivery()

  5. 利用channel將消費者與隊列綁定

SpringAMQP

SpringAMQP是基于RabbitMQ封裝的一套模板,并且還利用SpringBoot對其實現(xiàn)了自動裝配,使用起來非常方便。

SpringAmqp的官方地址:https://spring.io/projects/spring-amqp

SpringAMQP提供了三個功能:

  • 自動聲明隊列、交換機及其綁定關系
  • 基于注解的監(jiān)聽器模式,異步接收消息
  • 封裝了RabbitTemplate工具,用于發(fā)送消息
  <dependencies>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
        <!--AMQP依賴,包含RabbitMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <!--單元測試-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
    </dependencies>

創(chuàng)建2個模塊,publisher和consumer,完善一下項目的啟動類,yaml配置文件等。每個模塊寫一個測試類:

PublisherTest:

public class PublisherTest {
    @Test
    public void testSendMessage() throws IOException, TimeoutException {
        // 1.建立連接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.設置連接參數(shù),分別是:主機名、端口號、vhost、用戶名、密碼
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");
        // 1.2.建立連接
        Connection connection = factory.newConnection();

        // 2.創(chuàng)建通道Channel
        Channel channel = connection.createChannel();

        // 3.創(chuàng)建隊列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.發(fā)送消息
        String message = "hello, rabbitmq!";
        channel.basicPublish("", queueName, null, message.getBytes());
        System.out.println("發(fā)送消息成功:【" + message + "】");

        // 5.關閉通道和連接
        channel.close();
        connection.close();

    }
}

ConsumerTest:

public class ConsumerTest {

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.建立連接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.設置連接參數(shù),分別是:主機名、端口號、vhost、用戶名、密碼
        factory.setHost("localhost");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("guest");
        factory.setPassword("guest");
        // 1.2.建立連接
        Connection connection = factory.newConnection();

        // 2.創(chuàng)建通道Channel
        Channel channel = connection.createChannel();

        // 3.創(chuàng)建隊列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);

        // 4.訂閱消息
        channel.basicConsume(queueName, true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 5.處理消息
                String message = new String(body);
                System.out.println("接收到消息:【" + message + "】");
            }
        });
        System.out.println("等待接收消息。。。。");
    }
}

先啟動publisher:

消息的發(fā)送與接收,java

再看看rabbitmq的控制臺:
消息的發(fā)送與接收,java

再啟動consumer
消息的發(fā)送與接收,java

此時再看看rabbitmq控制臺:

消息的發(fā)送與接收,java

消息已經(jīng)被成功消費了!

rabbitmq實現(xiàn)即時通信

具體可以參考:RabbitMQ實現(xiàn)即時通訊_rabbitmq 聊天-CSDN博客

MQTT(Message Queuing Telemetry Transport,消息隊列遙測傳輸協(xié)議),是一種基于發(fā)布/訂閱(publish/subscribe)模式的輕量級通訊協(xié)議,該協(xié)議構(gòu)建于TCP/IP協(xié)議上。MQTT最大優(yōu)點在于,可以以極少的代碼和有限的帶寬,為連接遠程設備提供實時可靠的消息服務。

RabbitMQ啟用MQTT功能,需要先安裝然RabbitMQ然后再啟用MQTT插件。

接下來就是啟用RabbitMQ的MQTT插件了,默認是不啟用的,使用命令開啟即可;

需要進到目錄\rabbitmq_server-3.9.13\sbin里,執(zhí)行如下命令:

.\rabbitmq-plugins.bat enable rabbitmq_mqtt

消息的發(fā)送與接收,java

MQTTX

我們可以使用MQTT客戶端來測試MQTT的即時通訊功能,這里使用的是MQTTX這個客戶端工具。下載地址:MQTTX: Your All-in-one MQTT Client Toolbox

點擊新建連接按鈕或者左邊的加號來創(chuàng)建一個MQTT客戶端;

接下來對MQTT客戶端進行配置,主要是配置好協(xié)議端口、連接用戶名密碼和QoS即可。

再配置一個訂閱者,訂閱者訂閱testTopicA這個主題,我們會向這個主題發(fā)送消息;

消息的發(fā)送與接收,java

發(fā)布者向主題中發(fā)布消息,訂閱者可以實時接收到。

消息的發(fā)送與接收,java

前端直接實現(xiàn)即時通訊

既然MQTTBox客戶端可以直接通過RabbitMQ實現(xiàn)即時通訊,那我們是不是直接使用前端技術(shù)也可以實現(xiàn)即時通訊?答案是肯定的!下面我們將通過html+javascript實現(xiàn)一個簡單的聊天功能,真正不寫一行后端代碼實現(xiàn)即時通訊!

  • 由于RabbitMQ與Web端交互底層使用的是WebSocket,所以我們需要開啟RabbitMQ的MQTT WEB支持,使用如下命令開啟即可;
rabbitmq-plugins enable rabbitmq_web_mqtt
  • 開啟成功后,查看管理控制臺,我們可以發(fā)現(xiàn)MQTT的WEB服務運行在15675端口上了。

WEB端與MQTT服務進行通訊需要使用一個叫MQTT.js的庫,項目地址:https://github.com/mqttjs/MQTT.js

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>Title</title>
</head>
<body>
<div>
    <label>目標Topic:<input id="targetTopicInput" type="text"></label><br>
    <label>發(fā)送消息:<input id="messageInput" type="text"></label><br>
    <button onclick="sendMessage()">發(fā)送</button>
    <button onclick="clearMessage()">清空</button>
    <div id="messageDiv"></div>
</div>
</body>
<script src="https://unpkg.com/mqtt/dist/mqtt.min.js"></script>
<script>
    //RabbitMQ的web-mqtt連接地址
    const url = 'ws://localhost:15675/ws';
    //獲取訂閱的topic
    const topic = getQueryString("topic");
    //連接到消息隊列
    let client = mqtt.connect(url);
    client.on('connect', function () {
        //連接成功后訂閱topic
        client.subscribe(topic, function (err) {
            if (!err) {
                showMessage("訂閱topic:" + topic + "成功!");
            }
        });
    });
    //獲取訂閱topic中的消息
    client.on('message', function (topic, message) {
        showMessage("收到消息:" + message.toString());
    });
 
    //發(fā)送消息
    function sendMessage() {
        let targetTopic = document.getElementById("targetTopicInput").value;
        let message = document.getElementById("messageInput").value;
        //向目標topic中發(fā)送消息
        client.publish(targetTopic, message);
        showMessage("發(fā)送消息給" + targetTopic + "的消息:" + message);
    }
 
    //從URL中獲取參數(shù)
    function getQueryString(name) {
        let reg = new RegExp("(^|&)" + name + "=([^&]*)(&|$)", "i");
        let r = window.location.search.substr(1).match(reg);
        if (r != null) {
            return decodeURIComponent(r[2]);
        }
        return null;
    }
 
    //在消息列表中展示消息
    function showMessage(message) {
        let messageDiv = document.getElementById("messageDiv");
        let messageEle = document.createElement("div");
        messageEle.innerText = message;
        messageDiv.appendChild(messageEle);
    }
 
    //清空消息列表
    function clearMessage() {
        let messageDiv = document.getElementById("messageDiv");
        messageDiv.innerHTML = "";
    }
</script>
</html>

之后在界面輸入對應的topic地址,發(fā)送消息,發(fā)現(xiàn)是可以實現(xiàn)的!

在SpringBoot中使用

沒有特殊業(yè)務需求的時候,前端可以直接和RabbitMQ對接實現(xiàn)即時通訊。但是有時候我們需要通過服務端去通知前端,此時就需要在應用中集成MQTT了

此處項目源碼地址:https://github.com/macrozheng/mall-learning/tree/master/mall-tiny-mqtt

首先,項目需要引入mqtt依賴

<!--Spring集成MQTT-->
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-mqtt</artifactId>
</dependency>

在application.yml添加配置:

rabbitmq:
  mqtt:
    url: tcp://localhost:1883
    username: guest
    password: guest
    defaultTopic: testTopic

編寫一個Java配置類從配置文件中讀取配置便于使用;

/**
 * MQTT相關配置
 */
@Data
@EqualsAndHashCode(callSuper = false)
@Component
@ConfigurationProperties(prefix = "rabbitmq.mqtt")
public class MqttConfig {
    /**
     * RabbitMQ連接用戶名
     */
    private String username;
    /**
     * RabbitMQ連接密碼
     */
    private String password;
    /**
     * RabbitMQ的MQTT默認topic
     */
    private String defaultTopic;
    /**
     * RabbitMQ的MQTT連接地址
     */
    private String url;
}

添加MQTT消息訂閱者相關配置,使用@ServiceActivator注解聲明一個服務激活器,通過MessageHandler來處理訂閱消息;

/**
 * MQTT消息訂閱者相關配置
 */
@Slf4j
@Configuration
public class MqttInboundConfig {
    @Autowired
    private MqttConfig mqttConfig;
    @Bean
    public MessageChannel mqttInputChannel() {
        return new DirectChannel();
    }
    @Bean
    public MessageProducer inbound() {
        MqttPahoMessageDrivenChannelAdapter adapter =
                new MqttPahoMessageDrivenChannelAdapter(mqttConfig.getUrl(), "subscriberClient",
                        mqttConfig.getDefaultTopic());
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        //設置消息質(zhì)量:0->至多一次;1->至少一次;2->只有一次
        adapter.setQos(1);
        adapter.setOutputChannel(mqttInputChannel());
        return adapter;
    }
    @Bean
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public MessageHandler handler() {
        return new MessageHandler() {
 
            @Override
            public void handleMessage(Message<?> message) throws MessagingException {
                //處理訂閱消息
                log.info("handleMessage : {}",message.getPayload());
            }
 
        };
    }
}

注意:messageHandler導包路徑:import org.springframework.messaging.MessageHandler;

添加MQTT消息發(fā)布者相關配置;

/**
 * MQTT消息發(fā)布者相關配置
 */
@Configuration
public class MqttOutboundConfig {
 
    @Autowired
    private MqttConfig mqttConfig;
 
    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[] { mqttConfig.getUrl()});
        options.setUserName(mqttConfig.getUsername());
        options.setPassword(mqttConfig.getPassword().toCharArray());
        factory.setConnectionOptions(options);
        return factory;
    }
 
    @Bean
    @ServiceActivator(inputChannel = "mqttOutboundChannel")
    public MessageHandler mqttOutbound() {
        MqttPahoMessageHandler messageHandler =
                new MqttPahoMessageHandler("publisherClient", mqttClientFactory());
        messageHandler.setAsync(true);
        messageHandler.setDefaultTopic(mqttConfig.getDefaultTopic());
        return messageHandler;
    }
 
    @Bean
    public MessageChannel mqttOutboundChannel() {
        return new DirectChannel();
    }
}

添加MQTT網(wǎng)關,用于向主題中發(fā)送消息;

/**
 * MQTT網(wǎng)關,通過接口將數(shù)據(jù)傳遞到集成流
 */
@Component
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {
    /**
     * 發(fā)送消息到默認topic
     */
    void sendToMqtt(String payload);
 
    /**
     * 發(fā)送消息到指定topic
     */
    void sendToMqtt(String payload, @Header(MqttHeaders.TOPIC) String topic);
 
    /**
     * 發(fā)送消息到指定topic并設置QOS
     */
    void sendToMqtt(@Header(MqttHeaders.TOPIC) String topic, @Header(MqttHeaders.QOS) int qos, String payload);
}

添加MQTT測試接口,使用MQTT網(wǎng)關向特定主題中發(fā)送消息;

/**
 * MQTT測試接口
 */
@RestController
@RequestMapping("/mqtt")
public class MqttController {
    @Autowired
    private MqttGateway mqttGateway;
    @PostMapping("/sendToDefaultTopic")
    public CommonResult sendToDefaultTopic(String payload) {
        mqttGateway.sendToMqtt(payload);
        return CommonResult.success(null);
    }
    @PostMapping("/sendToTopic")
    public CommonResult sendToTopic(String payload, String topic) {
        mqttGateway.sendToMqtt(payload, topic);
        return CommonResult.success(null);
    }
}

別忘聲明一個返回類

package com.itcast.mq.model;

import lombok.Data;

import java.io.Serializable;

@Data
public class CommonResult<T> {
    /*返回體*/
    private  Integer code;
    private String msg;
    private T data;

    /*成功,且返回體有數(shù)據(jù)*/
    public static CommonResult success(Object object) {
        CommonResult r = new CommonResult();
        r.setCode(200);
        r.setMsg("成功");
        r.setData(object);
        return r;
    }
    //成功,但返回體沒數(shù)據(jù)
    public static CommonResult success(){
        return success(null);
    }
    //失敗返回信息
    public static CommonResult Err(Integer code,String msg){
        CommonResult r = new CommonResult();
        r.setCode(code);
        r.setMsg(msg);
        return r;
    }

}

配置一下啟動類

@SpringBootApplication
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

啟動該項目,打開postman輸入鏈接:http://localhost:8080/mqtt//sendToTopic?payload=&topic=。填好輸入內(nèi)容和目標topic的id即可!
消息的發(fā)送與接收,java

回到mqtt客戶端看一下

消息發(fā)送成功!

netty實現(xiàn)

Netty是 一個異步事件驅(qū)動的網(wǎng)絡應用程序框架,用于快速開發(fā)可維護的高性能協(xié)議服務器和客戶端

超詳細Netty入門,看這篇就夠了! - 知乎 (zhihu.com)

具體實現(xiàn)可以參考:https://blog.csdn.net/weixin_44814270/article/details/132947704

依賴:

 <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
            <version>2.1.6.RELEASE</version>
            <exclusions><!-- 去掉springboot默認配置 -->
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-logging</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
            <version>2.1.6.RELEASE</version>
            <exclusions><!-- 去掉springboot默認配置 -->
                <exclusion>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-logging</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!--我這里使用的是jfinal-enjoy模板引擎-->
        <dependency>
            <groupId>com.jfinal</groupId>
            <artifactId>enjoy</artifactId>
            <version>5.1.2</version>
        </dependency>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.65.Final</version> <!-- 使用最新版本 -->
        </dependency>
    </dependencies>

配置NettyChatServer:

public class NettyChatServer {

    private final int port;
    private final EventExecutorGroup eventExecutorGroup;
    private final ChannelGroup channelGroup = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);

    public NettyChatServer(int port) {
        this.port = port;
        this.eventExecutorGroup = new DefaultEventExecutorGroup(4); // 用于在handler中處理耗時任務
    }

    public void start() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel ch) throws Exception {
                            ChannelPipeline pipeline = ch.pipeline();
                            // 字符串編解碼器,用于將消息編碼成字符串和解碼成字符串
                            pipeline.addLast(new HttpServerCodec());
                            pipeline.addLast(new HttpObjectAggregator(65536));
                            pipeline.addLast(new WebSocketServerProtocolHandler("/websocket"));
                            pipeline.addLast(eventExecutorGroup, new ChatServerHandler(channelGroup));
                            // 添加自定義的聊天處理器
                            //  pipeline.addLast(eventExecutorGroup, new ChatServerHandler(channelGroup));
                        }
                    });

            ChannelFuture channelFuture = serverBootstrap.bind(port).sync();
            System.out.println("Chat Server started on port " + port);

            channelFuture.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }

    public void stop() {
        // 停止服務器
    }

    public static void main(String[] args) {
        int port = 8888;
        NettyChatServer chatServer = new NettyChatServer(port);
        try {
            chatServer.start();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

配置ChatServerHandler

public class ChatServerHandler extends SimpleChannelInboundHandler<WebSocketFrame> {

    private final ChannelGroup channelGroup;

    public ChatServerHandler(ChannelGroup channelGroup) {
        this.channelGroup = channelGroup;
    }

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, WebSocketFrame frame) throws Exception {
        // 處理WebSocket消息
        if (frame instanceof TextWebSocketFrame) {
            TextWebSocketFrame textFrame = (TextWebSocketFrame) frame;
            String message = textFrame.text();

            // 在服務器控制臺上輸出消息
            System.out.println("Received message: " + message);

            // 將消息廣播給所有連接的客戶端
            channelGroup.writeAndFlush(new TextWebSocketFrame(message));
        }
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        // 新客戶端連接時添加到ChannelGroup
        channelGroup.add(ctx.channel());
    }

    @Override
    public void handlerRemoved(ChannelHandlerContext ctx) {
        // 客戶端斷開連接時從ChannelGroup中移除
        channelGroup.remove(ctx.channel());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // 異常處理
        cause.printStackTrace();
        ctx.close();
    }

因為項目用到了,所以我們需要jfinal-enjoy模板引擎文件:

package com.zd.config;

import com.jfinal.template.Engine;
import com.jfinal.template.ext.spring.JFinalViewResolver;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class EnjoyConfig {


    @Bean(name = "jfinalViewResolver")
    public JFinalViewResolver getJFinalViewResolver() {

        // 創(chuàng)建用于整合 spring boot 的 ViewResolver 擴展對象
        JFinalViewResolver jfr = new JFinalViewResolver();

        // 對 spring boot 進行配置
        jfr.setSuffix(".html");
        jfr.setContentType("text/html;charset=UTF-8");
        jfr.setOrder(0);

        // 設置在模板中可通過 #(session.value) 訪問 session 中的數(shù)據(jù)
        jfr.setSessionInView(true);

        // 獲取 engine 對象,對 enjoy 模板引擎進行配置,配置方式與前面章節(jié)完全一樣
        Engine engine = JFinalViewResolver.engine;

        // 熱加載配置能對后續(xù)配置產(chǎn)生影響,需要放在最前面
        engine.setDevMode(true);

        // 使用 ClassPathSourceFactory 從 class path 與 jar 包中加載模板文件
        engine.setToClassPathSourceFactory();

        // 在使用 ClassPathSourceFactory 時要使用 setBaseTemplatePath
        // 設置靜態(tài)資源路徑在 /static 下
        engine.setBaseTemplatePath("/static/");

        return jfr;
    }
}

配置一下controller層:

@Controller
public class ChatController {

    @RequestMapping("/chat")
    public String main() {
        return "main";
    }
}

配置html、css、js

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>簡單聊天室</title>
    <!-- 引入Bootstrap CSS文件 -->
    <link href="./bootstarp/css/bootstrap.min.css" rel="stylesheet">
    <link href="main.css" rel="stylesheet">
</head>
<body>
<div class="container">
    <div class="row">
        <div class="col-md-8 offset-md-2">
            <div id="chat-container">
                <div id="chat-header">
                    <h2>簡單聊天室</h2>
                </div>
                <div id="chat-box">
                    <!-- 示例聊天消息 -->
                    <!--  <div class="message">
                          <div class="avatar">A</div>
                          <div class="message-content">
                              <div class="sender-name">User 1</div>
                              <div class="message-text">Hello, how are you?</div>
                          </div>
                      </div>-->
                    <!-- 示例聊天消息結(jié)束 -->
                </div>
                <div id="message-buttons">
                    <input type="text" id="message-input" placeholder="輸入消息...">
                    <input type="file" id="file-input">
                    <button class="btn btn-primary" onclick="sendMessage()">發(fā)送文本</button>
                    <button class="btn btn-primary" onclick="sendFile()">發(fā)送文件</button>
                    <button class="btn btn-danger" id="clear-button" onclick="clearChat()">清空聊天</button>
                </div>
            </div>
        </div>
    </div>
</div>

<!-- 引入jQuery -->
<script src="https://code.jquery.com/jquery-3.6.0.min.js"></script>
<!-- 引入Bootstrap JS文件 -->
<script src="./bootstarp/js/bootstrap.min.js"></script>
<script src="main.js"></script>
</body>
</html>

body {
    background-color: #f2f2f2;
    font-family: Arial, Helvetica, sans-serif;
    margin: 0;
    padding: 0;
}

#chat-container {
    max-width: 600px;
    margin: 20px auto;
    background-color: #fff;
    border-radius: 5px;
    box-shadow: 0px 0px 10px rgba(0, 0, 0, 0.1);
    overflow: hidden;
}

#chat-header {
    background-color: #007BFF;
    color: #fff;
    padding: 10px;
    text-align: center;
    border-top-left-radius: 5px;
    border-top-right-radius: 5px;
}

#chat-box {
    max-height: 100vh;
    width: 100%;
    overflow-y: scroll;
    height: 600px;
}

.message {
    display: flex;
    margin: 10px;
    padding: 10px;
    border-bottom: 1px solid #ccc;
}

.avatar {
    width: 50px;
    height: 50px;
    background-color: #007BFF;
    color: #fff;
    border-radius: 50%;
    text-align: center;
    line-height: 50px;
    margin-right: 10px;
}

.message-content {
    flex-grow: 3;
}

.sender-name {
    font-weight: bold;
    margin-bottom: 5px;
}

.message-text {
    word-wrap: break-word;
}

#message-input, #file-input {
    width: 100%;
    padding: 10px;
    border: 1px solid #ccc;
}

#message-buttons {
    padding: 10px;
    text-align: center;
}

button {
    padding: 10px 20px;
    background-color: #007BFF;
    color: #fff;
    border: none;
    cursor: pointer;
    margin-right: 10px;
}

button:hover {
    background-color: #0056b3;
}

#clear-button {
    background-color: #dc3545;
}

@media (max-width: 768px) {
    #chat-container {
        margin-top: 10px;
    }

    #chat-box {
        max-height: 200px;
    }
}

// WebSocket連接
const socket = new WebSocket('ws://localhost:8888/websocket'); // 請將 your_netty_server_address 替換為實際的Netty WebSocket服務器地址

socket.addEventListener('open', (event) => {
    console.log('WebSocket連接已建立');
});

socket.addEventListener('message', (event) => {
    // 解析消息
    console.log(event)
    const data = JSON.parse(event.data);

    const chatBox = document.getElementById('chat-box');

    if (data.type === 'text') {
        // 接收到文本消息
        const messageDiv = document.createElement('div');
        messageDiv.classList.add('message');

        const avatarDiv = document.createElement('div');
        avatarDiv.classList.add('avatar');
        avatarDiv.textContent = data.sender.charAt(0); // 使用發(fā)送者的首字母作為頭像內(nèi)容

        const messageContentDiv = document.createElement('div');
        messageContentDiv.classList.add('message-content');

        const senderNameDiv = document.createElement('div');
        senderNameDiv.classList.add('sender-name');
        senderNameDiv.textContent = data.sender;

        const messageTextDiv = document.createElement('div');
        messageTextDiv.classList.add('message-text');
        messageTextDiv.textContent = data.message;

        messageContentDiv.appendChild(senderNameDiv);
        messageContentDiv.appendChild(messageTextDiv);

        messageDiv.appendChild(avatarDiv);
        messageDiv.appendChild(messageContentDiv);

        chatBox.appendChild(messageDiv);
    } else if (data.type === 'file') {
        // 接收到文件消息
        const fileURL = URL.createObjectURL(data.file);
        const messageDiv = document.createElement('div');
        messageDiv.classList.add('message');

        const avatarDiv = document.createElement('div');
        avatarDiv.classList.add('avatar');
        avatarDiv.textContent = data.sender.charAt(0); // 使用發(fā)送者的首字母作為頭像內(nèi)容

        const messageContentDiv = document.createElement('div');
        messageContentDiv.classList.add('message-content');

        const senderNameDiv = document.createElement('div');
        senderNameDiv.classList.add('sender-name');
        senderNameDiv.textContent = data.sender;

        const fileLink = document.createElement('a');
        fileLink.href = fileURL;
        fileLink.textContent = '下載文件';
        fileLink.download = data.fileName;

        messageContentDiv.appendChild(senderNameDiv);
        messageContentDiv.appendChild(fileLink);

        messageDiv.appendChild(avatarDiv);
        messageDiv.appendChild(messageContentDiv);

        chatBox.appendChild(messageDiv);
    }

    // 滾動到最新消息
    chatBox.scrollTop = chatBox.scrollHeight;
});

function sendMessage() {
    const messageInput = document.getElementById('message-input');
    const message = messageInput.value.trim();

    if (message !== '') {
        // 發(fā)送文本消息到服務器
        const data = {
            sender: 'YSK',
            type: 'text',
            message: message
        };
        socket.send(JSON.stringify(data));
        // 清空輸入框
        messageInput.value = '';
    }
}

function sendFile() {
    const fileInput = document.getElementById('file-input');
    const file = fileInput.files[0];

    if (file) {
        // 發(fā)送文件到服務器
        const reader = new FileReader();

        reader.onload = function (event) {
            const data = {
                type: 'file',
                fileName: file.name,
                file: event.target.result
            };
            socket.send(JSON.stringify(data));
        };

        reader.readAsArrayBuffer(file);

        // 清空文件選擇框
        fileInput.value = '';
    }
}

function clearChat() {
    const chatBox = document.getElementById('chat-box');
    chatBox.innerHTML = '';
}

// 監(jiān)聽Enter鍵,發(fā)送文本消息
const messageInput = document.getElementById('message-input');
messageInput.addEventListener('keyup', function (event) {
    if (event.key === 'Enter') {
        sendMessage();
    }
});

先啟動nettyServer 再啟動Application,訪問:簡單聊天室

我們似乎能基本實現(xiàn)聊天。(發(fā)送文件需要服務器存儲未實現(xiàn))。但是還是缺了太多東西。這里netty我們還是接觸了一點點罷了。

netty更多內(nèi)容可以參考官方文檔Netty: Home或者黑馬的教程:https://www.bilibili.com/video/BV1py4y1E7oA

第三方平臺實現(xiàn)

環(huán)信 - 中國IM即時通訊云服務開創(chuàng)者! (easemob.com)

每個免費用戶最多可以注冊100個能通訊的用戶。

基于項目的實現(xiàn)

簡便且功能要求不太多的:websocket

搭建難度高,但是穩(wěn)定且專業(yè)的:netty

哪都沾一點的:rabbitmq

直接上手但是有限制需要花錢的:第三方

參考文檔

Spring-websocket: Spring boot整合websocket實現(xiàn)即時通訊 (gitee.com)

RabbitMQ實現(xiàn)即時通訊_rabbitmq 聊天-CSDN博客

https://github.com/macrozheng/mall-learning/tree/master/mall-tiny-mqtt文章來源地址http://www.zghlxwxcb.cn/news/detail-802660.html

到了這里,關于消息的發(fā)送與接收的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權(quán),不承擔相關法律責任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領支付寶紅包贊助服務器費用

相關文章

  • Java 構(gòu)建websocket客戶端,構(gòu)建wss客戶端,使用wss連接,并發(fā)送數(shù)據(jù)到服務器端,接收服務器端消息

    Java 構(gòu)建websocket客戶端,構(gòu)建wss客戶端,使用wss連接,并發(fā)送數(shù)據(jù)到服務器端,接收服務器端消息 回調(diào)函數(shù)處理

    2024年02月13日
    瀏覽(33)
  • 消息的發(fā)送與接收

    消息的發(fā)送與接收

    消息的發(fā)送與接收不僅僅是在于聊天功能的實現(xiàn)。其實還有很多種情況也算\\\"消息的發(fā)送與接收\\\"。而且我們還可以通過多種方法去實現(xiàn)。我們可以基于實際情況來選擇。 node做后端。找了好多,前端頁面總是用到了jQuery,包括底下的java做后端的前端代碼等。我們先用最簡單的

    2024年01月18日
    瀏覽(17)
  • Kafka消息隊列實現(xiàn)消息的發(fā)送和接收

    Kafka消息隊列實現(xiàn)消息的發(fā)送和接收

    消息在Kafka消息隊列中發(fā)送和接收過程如下圖所示: 消息生產(chǎn)者Producer產(chǎn)生消息數(shù)據(jù),發(fā)送到Kafka消息隊列中,一臺Kafka節(jié)點只有一個Broker,消息會存儲在Kafka的Topic(主題中),不同類型的消息數(shù)據(jù)會存儲在不同的Topic中,可以利用Topic實現(xiàn)消息的分類,消息消費者Consumer會訂閱

    2024年02月11日
    瀏覽(21)
  • 如何使用RabbitMQ發(fā)送和接收消息

    本文介紹了如何使用RabbitMQ的Python客戶端庫pika來發(fā)送和接收消息,并提供了示例代碼。讀者可以根據(jù)自己的需求修改代碼,例如修改隊列名稱、發(fā)送不同的消息等。 RabbitMQ 是一個開源的消息隊列軟件,可以用于在應用程序之間傳遞消息。下面是一個使用 RabbitMQ 的流程和代碼

    2024年02月15日
    瀏覽(23)
  • SpringAMQP中AmqpTemplate發(fā)送接收消息

    SpringAMQP中AmqpTemplate發(fā)送接收消息

    前言: 最近沒事瀏覽Spring官網(wǎng),簡單寫一些相關的筆記,這篇文章整理Spring AMQP相關內(nèi)容。文章并不包含所有技術(shù)點,只是記錄有收獲 ? 目錄 1.AmqpTemplate 介紹 2.發(fā)送消息(Sending Message) 2.1發(fā)送Message消息 2.2發(fā)送POJO對象 2.3默認交換器與默認路由 2.5構(gòu)建消息方法 3.接收消息(

    2023年04月08日
    瀏覽(23)
  • 微信小程序消息推送、接收消息事件、發(fā)送客服消息

    微信小程序消息推送、接收消息事件、發(fā)送客服消息

    文檔地址消息推送 | 微信開放文檔 接收消息和事件 | 微信開放文檔 發(fā)送客服消息 | 微信開放文檔 代碼參考

    2024年02月12日
    瀏覽(22)
  • RabbitMQ如何保證消息的發(fā)送和接收

    一、RabbitMQ如何保證消息的發(fā)送和接收 1.ConfirmCallback方法 ConfirmCallback是一個回調(diào)接口,消息發(fā)送到broker后觸發(fā)回調(diào),確認消息是否到達broker服務器,也就是只確認消息是否正確到達Exchange交換機中。 2.ReturnCallback方法 通過實現(xiàn)ReturnCallback接口,啟動消息失敗返回,此接口是在交

    2024年02月15日
    瀏覽(23)
  • qt websocket 通訊實現(xiàn)消息發(fā)送接收

    websocket 是基于 TCP socket 之上的應用層, 解決 HTML 輪詢連接的問題,實現(xiàn)客戶端與服務端長連接, 實現(xiàn)消息互相發(fā)送,全雙工。 服務端, 使用 QT 教程demo chatserver.h chatserver.cpp main.cpp 客戶端 clientwidget.h clientwidget.cpp websocketclient.h websocketclient.cpp

    2024年02月15日
    瀏覽(25)
  • 使用C#和RabbitMQ發(fā)送和接收消息

    通過NuGet安裝 RabbitMQ.Client 以下是一個簡單的示例代碼,演示如何使用 C# 和 RabbitMQ 客戶端庫來發(fā)送和接收消息: durable持久化 durable 參數(shù)用于指定隊列是否是持久化的。 當 durable 參數(shù)設置為 true 時,表示隊列是持久化的。持久化的隊列會在RabbitMQ服務器重啟后仍然存在,確保

    2024年02月11日
    瀏覽(25)
  • 如何使用 RabbitMQ 進行消息的發(fā)送和接收

    1、創(chuàng)建連接工廠: 2、創(chuàng)建交換器和隊列: 3、發(fā)送消息: 4、接收消息: 在上述示例中,我們創(chuàng)建了一個連接工廠,并通過它建立與 RabbitMQ 服務器的連接和通道。然后,我們聲明了一個直連型交換器和一個隊列,并將它們綁定在一起。接下來,我們使用basicPublish方法發(fā)送消

    2024年04月22日
    瀏覽(32)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領取紅包,優(yōu)惠每天領

二維碼1

領取紅包

二維碼2

領紅包