聲明
本文提煉于個(gè)人練手項(xiàng)目,其中的實(shí)現(xiàn)邏輯不一定標(biāo)準(zhǔn),實(shí)現(xiàn)思路沒有參考權(quán)威的文檔和教程,僅為個(gè)人思考得出,因此可能存在較多本人未考慮到的情況和漏洞,因此僅供參考,如果大家覺得有問題,懇請(qǐng)大家指出有問題的地方
如果對(duì)客戶端的實(shí)現(xiàn)感興趣,可以轉(zhuǎn)身查看【UniApp開發(fā)小程序】私聊功能uniapp界面實(shí)現(xiàn) (買家、賣家 溝通商品信息)【后端基于若依管理系統(tǒng)開發(fā)】
聊天數(shù)據(jù)查詢管理
數(shù)據(jù)庫設(shè)計(jì)
【私信表】
Vo
package com.ruoyi.common.core.domain.vo;
import lombok.Data;
import java.util.Date;
/**
* @Author dam
* @create 2023/8/22 21:39
*/
@Data
public class ChatUserVo {
private Long userId;
private String userAvatar;
private String userName;
private String userNickname;
/**
* 最后一條消息的內(nèi)容
*/
private String lastChatContent;
/**
* 最后一次聊天的日期
*/
private Date lastChatDate;
/**
* 未讀消息數(shù)量
*/
private Integer unReadChatNum;
}
Controller
其中兩個(gè)方法較為重要,介紹如下:
- listChatUserVo:當(dāng)用戶進(jìn)入消息界面的時(shí)候,需要查詢出最近聊天的用戶,其中還需要展示一些信息,如
ChatUserVo
的屬性 - listChat:該方法用于查詢對(duì)方最近和自己的私聊內(nèi)容,當(dāng)用戶查詢了這些私聊內(nèi)容,默認(rèn)用戶已經(jīng)看過了,將這些私聊內(nèi)容設(shè)置為已讀狀態(tài)
package com.shm.controller;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import javax.servlet.http.HttpServletResponse;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.ruoyi.common.core.domain.entity.Chat;
import com.ruoyi.common.core.domain.vo.ChatUserVo;
import com.shm.service.IChatService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.ruoyi.common.annotation.Log;
import com.ruoyi.common.core.controller.BaseController;
import com.ruoyi.common.core.domain.AjaxResult;
import com.ruoyi.common.enums.BusinessType;
import com.ruoyi.common.utils.poi.ExcelUtil;
import com.ruoyi.common.core.page.TableDataInfo;
/**
* 聊天數(shù)據(jù)Controller
*
* @author dam
* @date 2023-08-19
*/
@RestController
@RequestMapping("/market/chat")
@Api
public class ChatController extends BaseController {
@Autowired
private IChatService chatService;
/**
* 查詢聊天數(shù)據(jù)列表
*/
@PreAuthorize("@ss.hasPermi('market:chat:list')")
@GetMapping("/list")
public TableDataInfo list(Chat chat) {
startPage();
List<Chat> list = chatService.list(new QueryWrapper<Chat>(chat));
return getDataTable(list);
}
/**
* 查詢最近和自己聊天的用戶
*/
@ApiOperation("listChatUserVo")
@PreAuthorize("@ss.hasPermi('market:chat:list')")
@GetMapping("/listChatUserVo")
public TableDataInfo listChatUserVo() {
startPage();
String username = getLoginUser().getUsername();
List<ChatUserVo> list = chatService.listChatUserVo(username);
return getDataTable(list);
}
/**
* 查詢用戶和自己最近的聊天信息
*/
@ApiOperation("listUsersChatWithMe")
@PreAuthorize("@ss.hasPermi('market:chat:list')")
@GetMapping("/listChat/{toUsername}")
public TableDataInfo listChat(@PathVariable("toUsername") String toUsername) {
String curUsername = getLoginUser().getUsername();
startPage();
List<Chat> list = chatService.listChat(curUsername, toUsername);
for (Chat chat : list) {
System.out.println("chat:"+chat.toString());
}
System.out.println();
// 查出的數(shù)據(jù),如果消息是對(duì)方發(fā)的,且是未讀狀態(tài),重新設(shè)置為已讀
List<Long> unReadIdList = list.stream().filter(
(item1) -> {
if (item1.getIsRead() == 0 && item1.getFromWho().equals(toUsername)) {
return true;
} else {
return false;
}
}
)
.map(item2 -> {
return item2.getId();
}).collect(Collectors.toList());
System.out.println("將"+ unReadIdList.toString()+"設(shè)置為已讀");
if (unReadIdList.size() > 0) {
// 批量設(shè)置私聊為已讀狀態(tài)
chatService.batchRead(unReadIdList);
}
return getDataTable(list);
}
/**
* 導(dǎo)出聊天數(shù)據(jù)列表
*/
@PreAuthorize("@ss.hasPermi('market:chat:export')")
@Log(title = "聊天數(shù)據(jù)", businessType = BusinessType.EXPORT)
@PostMapping("/export")
public void export(HttpServletResponse response, Chat chat) {
List<Chat> list = chatService.list(new QueryWrapper<Chat>(chat));
ExcelUtil<Chat> util = new ExcelUtil<Chat>(Chat.class);
util.exportExcel(response, list, "聊天數(shù)據(jù)數(shù)據(jù)");
}
/**
* 獲取聊天數(shù)據(jù)詳細(xì)信息
*/
@PreAuthorize("@ss.hasPermi('market:chat:query')")
@GetMapping(value = "/getInfo/{id}")
public AjaxResult getInfo(@PathVariable("id") Long id) {
return success(chatService.getById(id));
}
/**
* 新增聊天數(shù)據(jù)
*/
@PreAuthorize("@ss.hasPermi('market:chat:add')")
@Log(title = "聊天數(shù)據(jù)", businessType = BusinessType.INSERT)
@PostMapping
public AjaxResult add(@RequestBody Chat chat) {
return toAjax(chatService.save(chat));
}
/**
* 修改聊天數(shù)據(jù)
*/
@PreAuthorize("@ss.hasPermi('market:chat:edit')")
@Log(title = "聊天數(shù)據(jù)", businessType = BusinessType.UPDATE)
@PutMapping
public AjaxResult edit(@RequestBody Chat chat) {
return toAjax(chatService.updateById(chat));
}
/**
* 刪除聊天數(shù)據(jù)
*/
@PreAuthorize("@ss.hasPermi('market:chat:remove')")
@Log(title = "聊天數(shù)據(jù)", businessType = BusinessType.DELETE)
@DeleteMapping("/{ids}")
public AjaxResult remove(@PathVariable List<Long> ids) {
return toAjax(chatService.removeByIds(ids));
}
}
Service
package com.shm.service.impl;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.ruoyi.common.core.domain.entity.Chat;
import com.ruoyi.common.core.domain.vo.ChatUserVo;
import com.shm.mapper.ChatMapper;
import com.shm.service.IChatService;
import org.springframework.stereotype.Service;
import java.util.List;
/**
* @author 17526
* @description 針對(duì)表【chat(聊天數(shù)據(jù)表)】的數(shù)據(jù)庫操作Service實(shí)現(xiàn)
* @createDate 2023-08-19 21:12:49
*/
@Service
public class IChatServiceImpl extends ServiceImpl<ChatMapper, Chat>
implements IChatService {
/**
* 查詢最近和自己聊天的用戶
*
* @return
*/
@Override
public List<ChatUserVo> listChatUserVo(String username) {
return baseMapper.listChatUserVo(username);
}
/**
* 查詢用戶和自己最近的聊天信息
*
* @param curUsername
* @param toUsername
* @return
*/
@Override
public List<Chat> listChat(String curUsername, String toUsername) {
return baseMapper.listChat(curUsername, toUsername);
}
@Override
public void batchRead(List<Long> unReadIdList) {
baseMapper.batchRead(unReadIdList);
}
}
Mapper
package com.shm.mapper;
import com.ruoyi.common.core.domain.entity.Chat;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.ruoyi.common.core.domain.vo.ChatUserVo;
import org.apache.ibatis.annotations.Param;
import java.util.List;
/**
* @author 17526
* @description 針對(duì)表【chat(聊天數(shù)據(jù)表)】的數(shù)據(jù)庫操作Mapper
* @createDate 2023-08-19 21:12:49
* @Entity com.ruoyi.common.core.domain.entity.Chat
*/
public interface ChatMapper extends BaseMapper<Chat> {
List<ChatUserVo> listChatUserVo(@Param("username") String username);
List<Chat> listChat(@Param("curUsername") String curUsername, @Param("toUsername") String toUsername);
void batchRead(@Param("unReadIdList") List<Long> unReadIdList);
}
【xml文件】
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper
PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.shm.mapper.ChatMapper">
<resultMap id="BaseResultMap" type="com.ruoyi.common.core.domain.entity.Chat">
<id property="id" column="id" jdbcType="BIGINT"/>
<result property="createTime" column="create_time" jdbcType="TIMESTAMP"/>
<result property="updateTime" column="update_time" jdbcType="TIMESTAMP"/>
<result property="isDeleted" column="is_deleted" jdbcType="TINYINT"/>
<result property="fromWho" column="from_who" jdbcType="BIGINT"/>
<result property="toWho" column="to_who" jdbcType="BIGINT"/>
<result property="content" column="content" jdbcType="VARCHAR"/>
<result property="picUrl" column="pic_url" jdbcType="VARCHAR"/>
</resultMap>
<sql id="Base_Column_List">
id,create_time,update_time,
is_deleted,from,to,
content,pic_url
</sql>
<update id="batchRead">
update chat set is_read = 1 where id in
<foreach collection="unReadIdList" item="chatId" separator="," open="(" close=")">
#{chatId}
</foreach>
</update>
<select id="listChatUserVo" resultType="com.ruoyi.common.core.domain.vo.ChatUserVo">
SELECT
(CASE WHEN c.from_who=#{username} THEN c.to_who ELSE c.from_who END) AS `userName`,
c.content AS `lastChatContent`,
c.create_time AS lastChatDate,
u.user_id AS userId,
u.avatar AS userAvatar,
u.nick_name AS userNickname,
ur.unReadNum as unReadChatNum
FROM
(SELECT
MAX(`id`) AS chatId,
CASE
WHEN `from_who` = #{username} THEN `to_who`
ELSE `from_who`
END AS uname
FROM `chat`
WHERE `from_who` = #{username} OR `to_who` = #{username}
GROUP BY uname) AS t
INNER JOIN `chat` c ON c.id = t.chatId
LEFT JOIN `sys_user` u ON t.uname = u.user_name
LEFT JOIN (SELECT from_who, SUM(CASE WHEN is_read=1 THEN 0 ELSE 1 END) AS unReadNum FROM chat WHERE is_deleted=0 AND to_who = #{username} GROUP BY from_who) ur ON ur.from_who = t.uname
ORDER BY c.create_time DESC
</select>
<select id="listChat" resultType="com.ruoyi.common.core.domain.entity.Chat">
SELECT
*
FROM
chat
WHERE
( from_who = #{curUsername} AND to_who = #{toUsername} )
OR ( to_who = #{curUsername} AND from_who = #{toUsername} )
ORDER BY
create_time DESC
</select>
</mapper>
【查詢最近聊天的用戶的用戶名和那條消息的id】
因?yàn)閕d是自增的,所以最新的那條消息的id肯定最大,因此可以使用MAX(id)
來獲取最近的消息
SELECT
MAX(`id`) AS chatId,
CASE
WHEN `from_who` = 'admin' THEN `to_who`
ELSE `from_who`
END AS uname
FROM `chat`
WHERE `from_who` = 'admin' OR `to_who` = 'admin'
GROUP BY uname
【內(nèi)連接私信表獲取消息的其他信息】
INNER JOIN `chat` c ON c.id = t.chatId
【左連接用戶表獲取用戶的相關(guān)信息】
LEFT JOIN `sys_user` u ON t.uname = u.user_name
【左聯(lián)接私信表獲取未讀對(duì)方消息的數(shù)量】CASE WHEN is_read=1 THEN 0 ELSE 1 END
如果已讀,說明未讀數(shù)量為0;否則為1
LEFT JOIN (SELECT from_who, SUM(CASE WHEN is_read=1 THEN 0 ELSE 1 END) AS unReadNum FROM chat WHERE is_deleted=0 AND to_who = 'admin' GROUP BY from_who) ur ON ur.from_who = t.uname
【最后按照用戶和自己最后聊天的時(shí)間來降序排序】
ORDER BY c.create_time DESC
WebSocket引入
為什么使用WebSocket
WebSocket不僅支持客戶端向服務(wù)端發(fā)送消息,同時(shí)也支持服務(wù)端向客戶端發(fā)送消息,這樣才能完成私聊的功能。即用戶1-->服務(wù)端-->用戶2
依賴
<!-- websocket -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
配置類
package com.shm.config;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;
@Configuration
public class WebSocketConfig {
/**
* 注入一個(gè)ServerEndpointExporter,
* 該Bean會(huì)自動(dòng)注冊(cè)使用@ServerEndpoint注解 聲明的websocket endpoint
*/
@Bean
public ServerEndpointExporter serverEndpointExporter() {
return new ServerEndpointExporter();
}
}
WebSocket服務(wù)
需要注意的是,Websocket是多例模式,無法直接使用@Autowired
注解來注入rabbitTemplate,需要使用下面的方式,其中rabbitTemplate為靜態(tài)變量
private static RabbitTemplate rabbitTemplate;
@Autowired
public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {
WebSocketServer.rabbitTemplate = rabbitTemplate;
}
package com.shm.component;
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.ruoyi.common.core.domain.entity.Chat;
import com.shm.component.delay.DelayQueueManager;
import com.shm.component.delay.DelayTask;
import com.shm.constant.RabbitMqConstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* @author websocket服務(wù)
*/
@ServerEndpoint(value = "/websocket/{username}")
@Component//將WebSocketServer注冊(cè)為spring的一個(gè)bean
public class WebSocketServer {
private static final Logger log = LoggerFactory.getLogger(WebSocketServer.class);
/**
* 記錄當(dāng)前在線連接的客戶端的session
*/
public static final Map<String, Session> usernameAndSessionMap = new ConcurrentHashMap<>();
/**
* 記錄正在進(jìn)行的聊天的發(fā)出者和接收者
*/
public static final Map<String, Integer> fromToMap = new ConcurrentHashMap<>();
/**
* 用戶Session保留時(shí)間,如果超過該時(shí)間,用戶還沒有給服務(wù)端發(fā)送消息,認(rèn)為用戶下線,刪除其Session
* 注意:該時(shí)間需要比客戶端的心跳時(shí)間更長(zhǎng)
*/
private static final long expire = 6000;
// websocket為多例模式,無法直接注入,需要換成下面的方式
// @Autowired
// RabbitTemplate rabbitTemplate;
private static RabbitTemplate rabbitTemplate;
@Autowired
public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {
WebSocketServer.rabbitTemplate = rabbitTemplate;
}
@Autowired
private static DelayQueueManager delayQueueManager;
@Autowired
public void setDelayQueueManager(DelayQueueManager delayQueueManager) {
WebSocketServer.delayQueueManager = delayQueueManager;
}
/**
* 瀏覽器和服務(wù)端連接建立成功之后會(huì)調(diào)用這個(gè)方法
*/
@OnOpen
public void onOpen(Session session, @PathParam("username") String username) {
usernameAndSessionMap.put(username, session);
// 建立延時(shí)任務(wù),如果到expire時(shí)間,客戶端還是沒有和服務(wù)器有任何交互的話,就刪除該用戶的session,表示該用戶下線
delayQueueManager.put(new DelayTask(username, expire));
log.info("有新用戶加入,username={}, 當(dāng)前在線人數(shù)為:{}", username, usernameAndSessionMap.size());
}
/**
* 連接關(guān)閉調(diào)用的方法
*/
@OnClose
public void onClose(Session session, @PathParam("username") String username) {
usernameAndSessionMap.remove(username);
log.info("有一連接關(guān)閉,移除username={}的用戶session, 當(dāng)前在線人數(shù)為:{}", username, usernameAndSessionMap.size());
}
/**
* 發(fā)生錯(cuò)誤的時(shí)候會(huì)調(diào)用這個(gè)方法
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("發(fā)生錯(cuò)誤");
error.printStackTrace();
}
/**
* 服務(wù)端發(fā)送消息給客戶端
*/
public void sendMessage(String message, Session toSession) {
try {
log.info("服務(wù)端給客戶端[{}]發(fā)送消息{}", toSession.getId(), message);
toSession.getBasicRemote().sendText(message);
} catch (Exception e) {
log.error("服務(wù)端發(fā)送消息給客戶端失敗", e);
}
}
/**
* onMessage方法是一個(gè)消息的中轉(zhuǎn)站
* 1、首先接受瀏覽器端socket.send發(fā)送過來的json數(shù)據(jù)
* 2、然后解析其數(shù)據(jù),找到消息要發(fā)送給誰
* 3、最后將數(shù)據(jù)發(fā)送給相應(yīng)的人
*
* @param message 客戶端發(fā)送過來的消息 數(shù)據(jù)格式:{"from":"user1","to":"admin","text":"你好呀"}
*/
@OnMessage
public void onMessage(String message, Session session, @PathParam("username") String username) {
// log.info("服務(wù)端接收到 {} 的消息,消息內(nèi)容是:{}", username, message);
// 收到用戶的信息,刪除之前的延時(shí)任務(wù),創(chuàng)建新的延時(shí)任務(wù)
delayQueueManager.put(new DelayTask(username, expire));
if (!usernameAndSessionMap.containsKey(username)) {
// 可能用戶掛機(jī)了一段時(shí)間,被下線了,后面又重新回來發(fā)信息了,需要重新將用戶和session添加字典中
usernameAndSessionMap.put(username, session);
}
// 將json字符串轉(zhuǎn)化為json對(duì)象
JSONObject obj = JSON.parseObject(message);
String status = (String) obj.get("status");
// 獲取消息的內(nèi)容
String text = (String) obj.get("text");
// 查看消息要發(fā)送給哪個(gè)用戶
String to = (String) obj.get("to");
String fromToKey = username + "-" + to;
String toFromKey = to + "-" + username;
if (status != null) {
if (status.equals("start")) {
fromToMap.put(fromToKey, 1);
} else if (status.equals("end")) {
System.out.println("移除銷毀的fromToKey:" + fromToKey);
fromToMap.remove(fromToKey);
} else if (status.equals("ping")) {
// 更新用戶對(duì)應(yīng)的時(shí)間戳
// usernameAndTimeStampMap.put(username, System.currentTimeMillis());
}
} else {
// 封裝數(shù)據(jù)發(fā)送給消息隊(duì)列
Chat chat = new Chat();
chat.setFromWho(username);
chat.setToWho(to);
chat.setContent(text);
chat.setIsRead(0);
// chat.setPicUrl("");
// 根據(jù)to來獲取相應(yīng)的session,然后通過session將消息內(nèi)容轉(zhuǎn)發(fā)給相應(yīng)的用戶
Session toSession = usernameAndSessionMap.get(to);
if (toSession != null) {
JSONObject jsonObject = new JSONObject();
// 設(shè)置消息來源的用戶名
jsonObject.put("from", username);
// 設(shè)置消息內(nèi)容
jsonObject.put("text", text);
// 服務(wù)端發(fā)送消息給目標(biāo)客戶端
this.sendMessage(jsonObject.toString(), toSession);
log.info("發(fā)送消息給用戶 {} ,消息內(nèi)容是:{} ", toSession, jsonObject.toString());
if (fromToMap.containsKey(toFromKey)) {
chat.setIsRead(1);
}
} else {
log.info("發(fā)送失敗,未找到用戶 {} 的session", to);
}
rabbitTemplate.convertAndSend(RabbitMqConstant.CHAT_STORAGE_EXCHANGE, RabbitMqConstant.CHAT_STORAGE_ROUTER_KEY, chat);
}
}
}
RabbitMQ引入
為什么使用消息隊(duì)列
在用戶之間進(jìn)行聊天的時(shí)候,需要將用戶的聊天數(shù)據(jù)存儲(chǔ)到數(shù)據(jù)庫中,但是如果大量用戶同時(shí)在線的話,可能同一時(shí)間發(fā)送的消息數(shù)量太多,如果同時(shí)將這些消息存儲(chǔ)到數(shù)據(jù)庫中,會(huì)給數(shù)據(jù)庫帶來較大的壓力,使用RabbitMQ可以先把要存儲(chǔ)的數(shù)據(jù)放到消息隊(duì)列,然后數(shù)據(jù)庫服務(wù)器壓力沒這么大的時(shí)候,就會(huì)從消息隊(duì)列中獲取數(shù)據(jù)來存儲(chǔ),這樣可以分散數(shù)據(jù)庫的壓力。但是如果用戶是直接從數(shù)據(jù)庫獲取消息的話,消息可能有一定的延遲,如果用戶之間正在聊天的話,消息則不會(huì)延遲,因?yàn)榱奶靸?nèi)容會(huì)立刻通過WebSocket發(fā)送給對(duì)方。
依賴
<!-- rabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
啟動(dòng)類添加注解
在啟動(dòng)類的上方添加@EnableRabbit
注解
常量類
因?yàn)橛卸嗵帟?huì)使用到隊(duì)列命名等信息,創(chuàng)建一個(gè)常量類來保存相關(guān)信息
package com.shm.constant;
public class RabbitMqConstant {
public static final String CHAT_STORAGE_QUEUE = "shm.chat-storage.queue";
public static final String CHAT_STORAGE_EXCHANGE = "shm.chat-storage-event-exchange";
public static final String CHAT_STORAGE_ROUTER_KEY = "shm.chat-storage.register";
}
使用配置類創(chuàng)建隊(duì)列、交換機(jī)、綁定關(guān)系
package com.shm.config;
import com.shm.constant.RabbitMqConstant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class MyRabbitConfig {
/**
* 使用JSON序列化機(jī)制,進(jìn)行消息轉(zhuǎn)換
* @return
*/
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
/**
* 私信存儲(chǔ)隊(duì)列
*
* @return
*/
@Bean
public Queue chatStorageQueue() {
Queue queue = new Queue(RabbitMqConstant.CHAT_STORAGE_QUEUE, true, false, false);
return queue;
}
/**
* 私信存儲(chǔ)交換機(jī)
* 創(chuàng)建交換機(jī),由于只需要一個(gè)隊(duì)列,創(chuàng)建direct交換機(jī)
*
* @return
*/
@Bean
public Exchange chatStorageExchange() {
//durable:持久化
return new DirectExchange(RabbitMqConstant.CHAT_STORAGE_EXCHANGE, true, false);
}
/**
* 創(chuàng)建私信存儲(chǔ) 交換機(jī)和隊(duì)列的綁定關(guān)系
*
* @return
*/
@Bean
public Binding chatStorageBinding() {
return new Binding(RabbitMqConstant.CHAT_STORAGE_QUEUE,
Binding.DestinationType.QUEUE,
RabbitMqConstant.CHAT_STORAGE_EXCHANGE,
RabbitMqConstant.CHAT_STORAGE_ROUTER_KEY,
null);
}
}
消息監(jiān)聽器
創(chuàng)建一個(gè)消息監(jiān)聽類來監(jiān)聽隊(duì)列的消息,然后調(diào)用相關(guān)的邏輯來處理信息,本文主要的處理是將私信內(nèi)容存儲(chǔ)到數(shù)據(jù)庫中
package com.shm.listener;
import com.rabbitmq.client.Channel;
import com.ruoyi.common.core.domain.entity.Chat;
import com.shm.constant.RabbitMqConstant;
import com.shm.service.IChatService;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.IOException;
@Service
/**
* 注意,類上面需要RabbitListener注解
*/
@RabbitListener(queues = RabbitMqConstant.CHAT_STORAGE_QUEUE)
public class ChatStorageListener {
@Autowired
private IChatService chatService;
@RabbitHandler
public void handleStockLockedRelease(Chat chat, Message message, Channel channel) throws IOException {
try {
boolean save = chatService.save(chat);
//解鎖成功,手動(dòng)確認(rèn),消息才從MQ中刪除
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
//只要有異常,拒絕消息,讓消息重新返回隊(duì)列,讓別的消費(fèi)者繼續(xù)解鎖
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}
}
}
發(fā)送消息到消息隊(duì)列
WebSocketServer
為Websocket后端服務(wù)代碼,其中的onMessage方法會(huì)接受客戶端發(fā)送過來的消息,當(dāng)接收到消息的時(shí)候,將消息發(fā)送給消息隊(duì)列
// 封裝數(shù)據(jù)發(fā)送給消息隊(duì)列
Chat chat = new Chat();
chat.setFromWho(username);
chat.setToWho(to);
chat.setContent(text);
chat.setPicUrl("");
rabbitTemplate.convertAndSend(RabbitMqConstant.CHAT_STORAGE_EXCHANGE,RabbitMqConstant.CHAT_STORAGE_ROUTER_KEY,chat);
延時(shí)任務(wù)
為什么使用延時(shí)任務(wù)
為了更好地感知用戶的在線狀態(tài),在用戶連接了WebSocket或者發(fā)送消息之后,建立一個(gè)延時(shí)任務(wù),如果到達(dá)了所設(shè)定的延時(shí)時(shí)間,就刪除用戶的Session,認(rèn)為用戶已經(jīng)下線;如果在延時(shí)期間之內(nèi),用戶發(fā)送了新消息,或者發(fā)送了心跳信號(hào),證明該用戶還處于在線狀態(tài),刪除前面的延時(shí)任務(wù),并創(chuàng)建新的延時(shí)任務(wù)文章來源:http://www.zghlxwxcb.cn/news/detail-673083.html
延時(shí)任務(wù)類
package com.shm.component.delay;
import lombok.Data;
import lombok.Getter;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
/**
* @Author dam
* @create 2023/8/25 15:12
*/
@Getter
public class DelayTask implements Delayed {
/**
* 用戶名
*/
private final String userName;
/**
* 任務(wù)的真正執(zhí)行時(shí)間
*/
private final long executeTime;
/**
* 任務(wù)延時(shí)多久執(zhí)行
*/
private final long expire;
/**
* @param expire 任務(wù)需要延時(shí)的時(shí)間
*/
public DelayTask(String userName, long expire) {
this.userName = userName;
this.executeTime = expire + System.currentTimeMillis();
this.expire = expire;
}
/**
* 根據(jù)給定的時(shí)間單位,返回與此對(duì)象關(guān)聯(lián)的剩余延遲時(shí)間
*
* @param unit the time unit 時(shí)間單位
* @return 返回剩余延遲,零值或負(fù)值表示延遲已經(jīng)過去
*/
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(this.executeTime - System.currentTimeMillis(), unit);
}
@Override
public int compareTo(Delayed o) {
return 0;
}
}
延時(shí)任務(wù)管理
package com.shm.component.delay;
import com.shm.component.WebSocketServer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Executors;
/**
* @Author dam
* @create 2023/8/25 15:12
*/
@Component
@Slf4j
public class DelayQueueManager implements CommandLineRunner {
private final DelayQueue<DelayTask> delayQueue = new DelayQueue<>();
private final Map<String, DelayTask> usernameAndDelayTaskMap = new ConcurrentHashMap<>();
/**
* 加入到延時(shí)隊(duì)列中
*
* @param task
*/
public void put(DelayTask task) {
// 因?yàn)橐粋€(gè)用戶只能對(duì)應(yīng)一個(gè)延時(shí)任務(wù),所以如果已經(jīng)存在了延時(shí)任務(wù),將其進(jìn)行刪除
if (usernameAndDelayTaskMap.containsKey(task.getUserName())) {
this.remove(task.getUserName());
}
delayQueue.put(task);
usernameAndDelayTaskMap.put(task.getUserName(), task);
}
/**
* 取消延時(shí)任務(wù)
*
* @param username 要?jiǎng)h除的任務(wù)的用戶名
* @return
*/
public boolean remove(String username) {
DelayTask remove = usernameAndDelayTaskMap.remove(username);
return delayQueue.remove(remove);
}
@Override
public void run(String... args) throws Exception {
this.executeThread();
}
/**
* 延時(shí)任務(wù)執(zhí)行線程
*/
private void executeThread() {
while (true) {
try {
DelayTask task = delayQueue.take();
//執(zhí)行任務(wù)
processTask(task);
} catch (InterruptedException e) {
break;
}
}
}
/**
* 執(zhí)行延時(shí)任務(wù)
*
* @param task
*/
private void processTask(DelayTask task) {
// 刪除該用戶的session,表示用戶下線
WebSocketServer.usernameAndSessionMap.remove(task.getUserName());
log.error("執(zhí)行定時(shí)任務(wù):{}下線", task.getUserName());
}
}
同項(xiàng)目其他文章
該項(xiàng)目的其他文章請(qǐng)查看【易售小程序項(xiàng)目】項(xiàng)目介紹、小程序頁面展示與系列文章集合文章來源地址http://www.zghlxwxcb.cn/news/detail-673083.html
到了這里,關(guān)于【UniApp開發(fā)小程序】私聊功能后端實(shí)現(xiàn) (買家、賣家 溝通商品信息)【后端基于若依管理系統(tǒng)開發(fā)】的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!