国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

【UniApp開發(fā)小程序】私聊功能后端實(shí)現(xiàn) (買家、賣家 溝通商品信息)【后端基于若依管理系統(tǒng)開發(fā)】

這篇具有很好參考價(jià)值的文章主要介紹了【UniApp開發(fā)小程序】私聊功能后端實(shí)現(xiàn) (買家、賣家 溝通商品信息)【后端基于若依管理系統(tǒng)開發(fā)】。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

聲明

本文提煉于個(gè)人練手項(xiàng)目,其中的實(shí)現(xiàn)邏輯不一定標(biāo)準(zhǔn),實(shí)現(xiàn)思路沒有參考權(quán)威的文檔和教程,僅為個(gè)人思考得出,因此可能存在較多本人未考慮到的情況和漏洞,因此僅供參考,如果大家覺得有問題,懇請(qǐng)大家指出有問題的地方

如果對(duì)客戶端的實(shí)現(xiàn)感興趣,可以轉(zhuǎn)身查看【UniApp開發(fā)小程序】私聊功能uniapp界面實(shí)現(xiàn) (買家、賣家 溝通商品信息)【后端基于若依管理系統(tǒng)開發(fā)】

聊天數(shù)據(jù)查詢管理

數(shù)據(jù)庫設(shè)計(jì)

【私信表】
【UniApp開發(fā)小程序】私聊功能后端實(shí)現(xiàn) (買家、賣家 溝通商品信息)【后端基于若依管理系統(tǒng)開發(fā)】,小程序開發(fā),小程序,WebSocket,RabbitMQ,延時(shí)任務(wù),SpringBoot

Vo

package com.ruoyi.common.core.domain.vo;

import lombok.Data;

import java.util.Date;

/**
 * @Author dam
 * @create 2023/8/22 21:39
 */
@Data
public class ChatUserVo {
    private Long userId;
    private String userAvatar;
    private String userName;
    private String userNickname;
    /**
     * 最后一條消息的內(nèi)容
     */
    private String lastChatContent;
    /**
     * 最后一次聊天的日期
     */
    private Date lastChatDate;
    /**
     * 未讀消息數(shù)量
     */
    private Integer unReadChatNum;
}

Controller

其中兩個(gè)方法較為重要,介紹如下:

  • listChatUserVo:當(dāng)用戶進(jìn)入消息界面的時(shí)候,需要查詢出最近聊天的用戶,其中還需要展示一些信息,如ChatUserVo的屬性
  • listChat:該方法用于查詢對(duì)方最近和自己的私聊內(nèi)容,當(dāng)用戶查詢了這些私聊內(nèi)容,默認(rèn)用戶已經(jīng)看過了,將這些私聊內(nèi)容設(shè)置為已讀狀態(tài)
package com.shm.controller;

import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import javax.servlet.http.HttpServletResponse;

import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.ruoyi.common.core.domain.entity.Chat;
import com.ruoyi.common.core.domain.vo.ChatUserVo;
import com.shm.service.IChatService;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.security.access.prepost.PreAuthorize;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.PutMapping;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import com.ruoyi.common.annotation.Log;
import com.ruoyi.common.core.controller.BaseController;
import com.ruoyi.common.core.domain.AjaxResult;
import com.ruoyi.common.enums.BusinessType;
import com.ruoyi.common.utils.poi.ExcelUtil;
import com.ruoyi.common.core.page.TableDataInfo;

/**
 * 聊天數(shù)據(jù)Controller
 *
 * @author dam
 * @date 2023-08-19
 */
@RestController
@RequestMapping("/market/chat")
@Api
public class ChatController extends BaseController {
    @Autowired
    private IChatService chatService;

    /**
     * 查詢聊天數(shù)據(jù)列表
     */
    @PreAuthorize("@ss.hasPermi('market:chat:list')")
    @GetMapping("/list")
    public TableDataInfo list(Chat chat) {
        startPage();
        List<Chat> list = chatService.list(new QueryWrapper<Chat>(chat));
        return getDataTable(list);
    }

    /**
     * 查詢最近和自己聊天的用戶
     */
    @ApiOperation("listChatUserVo")
    @PreAuthorize("@ss.hasPermi('market:chat:list')")
    @GetMapping("/listChatUserVo")
    public TableDataInfo listChatUserVo() {
        startPage();
        String username = getLoginUser().getUsername();
        List<ChatUserVo> list = chatService.listChatUserVo(username);
        return getDataTable(list);
    }

    /**
     * 查詢用戶和自己最近的聊天信息
     */
    @ApiOperation("listUsersChatWithMe")
    @PreAuthorize("@ss.hasPermi('market:chat:list')")
    @GetMapping("/listChat/{toUsername}")
    public TableDataInfo listChat(@PathVariable("toUsername") String toUsername) {
        String curUsername = getLoginUser().getUsername();
        startPage();
        List<Chat> list = chatService.listChat(curUsername, toUsername);
        for (Chat chat : list) {
            System.out.println("chat:"+chat.toString());
        }
        System.out.println();
        // 查出的數(shù)據(jù),如果消息是對(duì)方發(fā)的,且是未讀狀態(tài),重新設(shè)置為已讀
        List<Long> unReadIdList = list.stream().filter(
                        (item1) -> {
                            if (item1.getIsRead() == 0 && item1.getFromWho().equals(toUsername)) {
                                return true;
                            } else {
                                return false;
                            }
                        }
                )
                .map(item2 -> {
                    return item2.getId();
                }).collect(Collectors.toList());
        System.out.println("將"+ unReadIdList.toString()+"設(shè)置為已讀");
        if (unReadIdList.size() > 0) {
            // 批量設(shè)置私聊為已讀狀態(tài)
            chatService.batchRead(unReadIdList);
        }
        return getDataTable(list);
    }

    /**
     * 導(dǎo)出聊天數(shù)據(jù)列表
     */
    @PreAuthorize("@ss.hasPermi('market:chat:export')")
    @Log(title = "聊天數(shù)據(jù)", businessType = BusinessType.EXPORT)
    @PostMapping("/export")
    public void export(HttpServletResponse response, Chat chat) {
        List<Chat> list = chatService.list(new QueryWrapper<Chat>(chat));
        ExcelUtil<Chat> util = new ExcelUtil<Chat>(Chat.class);
        util.exportExcel(response, list, "聊天數(shù)據(jù)數(shù)據(jù)");
    }

    /**
     * 獲取聊天數(shù)據(jù)詳細(xì)信息
     */
    @PreAuthorize("@ss.hasPermi('market:chat:query')")
    @GetMapping(value = "/getInfo/{id}")
    public AjaxResult getInfo(@PathVariable("id") Long id) {
        return success(chatService.getById(id));
    }

    /**
     * 新增聊天數(shù)據(jù)
     */
    @PreAuthorize("@ss.hasPermi('market:chat:add')")
    @Log(title = "聊天數(shù)據(jù)", businessType = BusinessType.INSERT)
    @PostMapping
    public AjaxResult add(@RequestBody Chat chat) {
        return toAjax(chatService.save(chat));
    }

    /**
     * 修改聊天數(shù)據(jù)
     */
    @PreAuthorize("@ss.hasPermi('market:chat:edit')")
    @Log(title = "聊天數(shù)據(jù)", businessType = BusinessType.UPDATE)
    @PutMapping
    public AjaxResult edit(@RequestBody Chat chat) {
        return toAjax(chatService.updateById(chat));
    }

    /**
     * 刪除聊天數(shù)據(jù)
     */
    @PreAuthorize("@ss.hasPermi('market:chat:remove')")
    @Log(title = "聊天數(shù)據(jù)", businessType = BusinessType.DELETE)
    @DeleteMapping("/{ids}")
    public AjaxResult remove(@PathVariable List<Long> ids) {
        return toAjax(chatService.removeByIds(ids));
    }
}

Service

package com.shm.service.impl;

import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.ruoyi.common.core.domain.entity.Chat;
import com.ruoyi.common.core.domain.vo.ChatUserVo;
import com.shm.mapper.ChatMapper;
import com.shm.service.IChatService;
import org.springframework.stereotype.Service;

import java.util.List;

/**
 * @author 17526
 * @description 針對(duì)表【chat(聊天數(shù)據(jù)表)】的數(shù)據(jù)庫操作Service實(shí)現(xiàn)
 * @createDate 2023-08-19 21:12:49
 */
@Service
public class IChatServiceImpl extends ServiceImpl<ChatMapper, Chat>
        implements IChatService {

    /**
     * 查詢最近和自己聊天的用戶
     *
     * @return
     */
    @Override
    public List<ChatUserVo> listChatUserVo(String username) {
        return baseMapper.listChatUserVo(username);
    }

    /**
     * 查詢用戶和自己最近的聊天信息
     *
     * @param curUsername
     * @param toUsername
     * @return
     */
    @Override
    public List<Chat> listChat(String curUsername, String toUsername) {
        return baseMapper.listChat(curUsername, toUsername);
    }

    @Override
    public void batchRead(List<Long> unReadIdList) {
        baseMapper.batchRead(unReadIdList);
    }
}

Mapper

package com.shm.mapper;

import com.ruoyi.common.core.domain.entity.Chat;
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
import com.ruoyi.common.core.domain.vo.ChatUserVo;
import org.apache.ibatis.annotations.Param;

import java.util.List;

/**
* @author 17526
* @description 針對(duì)表【chat(聊天數(shù)據(jù)表)】的數(shù)據(jù)庫操作Mapper
* @createDate 2023-08-19 21:12:49
* @Entity com.ruoyi.common.core.domain.entity.Chat
*/
public interface ChatMapper extends BaseMapper<Chat> {

    List<ChatUserVo> listChatUserVo(@Param("username") String username);

    List<Chat> listChat(@Param("curUsername") String curUsername, @Param("toUsername") String toUsername);

    void batchRead(@Param("unReadIdList") List<Long> unReadIdList);
}

【xml文件】

<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE mapper
        PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
        "http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper namespace="com.shm.mapper.ChatMapper">

    <resultMap id="BaseResultMap" type="com.ruoyi.common.core.domain.entity.Chat">
            <id property="id" column="id" jdbcType="BIGINT"/>
            <result property="createTime" column="create_time" jdbcType="TIMESTAMP"/>
            <result property="updateTime" column="update_time" jdbcType="TIMESTAMP"/>
            <result property="isDeleted" column="is_deleted" jdbcType="TINYINT"/>
            <result property="fromWho" column="from_who" jdbcType="BIGINT"/>
            <result property="toWho" column="to_who" jdbcType="BIGINT"/>
            <result property="content" column="content" jdbcType="VARCHAR"/>
            <result property="picUrl" column="pic_url" jdbcType="VARCHAR"/>
    </resultMap>

    <sql id="Base_Column_List">
        id,create_time,update_time,
        is_deleted,from,to,
        content,pic_url
    </sql>
    <update id="batchRead">
        update chat set is_read = 1 where id in
        <foreach collection="unReadIdList" item="chatId" separator="," open="(" close=")">
            #{chatId}
        </foreach>
    </update>
    <select id="listChatUserVo" resultType="com.ruoyi.common.core.domain.vo.ChatUserVo">
        SELECT
            (CASE WHEN c.from_who=#{username} THEN c.to_who ELSE c.from_who END) AS `userName`,
            c.content AS `lastChatContent`,
            c.create_time AS lastChatDate,
            u.user_id AS userId,
            u.avatar AS userAvatar,
            u.nick_name AS userNickname,
            ur.unReadNum as unReadChatNum
        FROM
            (SELECT
                 MAX(`id`) AS chatId,
                 CASE
                     WHEN `from_who` = #{username} THEN `to_who`
                     ELSE `from_who`
                     END AS uname
             FROM `chat`
             WHERE `from_who` = #{username} OR `to_who` = #{username}
             GROUP BY uname) AS t
                INNER JOIN `chat` c ON c.id = t.chatId
                LEFT JOIN `sys_user` u ON t.uname = u.user_name
                LEFT JOIN (SELECT from_who, SUM(CASE WHEN is_read=1 THEN 0 ELSE 1 END) AS unReadNum FROM chat WHERE is_deleted=0 AND to_who = #{username} GROUP BY from_who) ur ON ur.from_who = t.uname
        ORDER BY c.create_time DESC
    </select>
    <select id="listChat" resultType="com.ruoyi.common.core.domain.entity.Chat">
        SELECT
            *
        FROM
            chat
        WHERE
            ( from_who = #{curUsername} AND to_who = #{toUsername} )
           OR ( to_who = #{curUsername} AND from_who = #{toUsername} )
        ORDER BY
            create_time DESC
    </select>
</mapper>

【查詢最近聊天的用戶的用戶名和那條消息的id】
因?yàn)閕d是自增的,所以最新的那條消息的id肯定最大,因此可以使用MAX(id)來獲取最近的消息

SELECT 
        MAX(`id`) AS chatId,
				 CASE 
            WHEN `from_who` = 'admin' THEN `to_who`
            ELSE `from_who`
        END AS uname
    FROM `chat`
    WHERE `from_who` = 'admin' OR `to_who` = 'admin'
    GROUP BY uname

【UniApp開發(fā)小程序】私聊功能后端實(shí)現(xiàn) (買家、賣家 溝通商品信息)【后端基于若依管理系統(tǒng)開發(fā)】,小程序開發(fā),小程序,WebSocket,RabbitMQ,延時(shí)任務(wù),SpringBoot
【內(nèi)連接私信表獲取消息的其他信息】

INNER JOIN `chat` c ON c.id = t.chatId 

【左連接用戶表獲取用戶的相關(guān)信息】

LEFT JOIN `sys_user` u ON t.uname = u.user_name

【左聯(lián)接私信表獲取未讀對(duì)方消息的數(shù)量】
CASE WHEN is_read=1 THEN 0 ELSE 1 END 如果已讀,說明未讀數(shù)量為0;否則為1

LEFT JOIN (SELECT from_who, SUM(CASE WHEN is_read=1 THEN 0 ELSE 1 END) AS unReadNum FROM chat WHERE is_deleted=0 AND to_who = 'admin' GROUP BY from_who) ur ON ur.from_who = t.uname

【最后按照用戶和自己最后聊天的時(shí)間來降序排序】

ORDER BY c.create_time DESC

WebSocket引入

為什么使用WebSocket

WebSocket不僅支持客戶端向服務(wù)端發(fā)送消息,同時(shí)也支持服務(wù)端向客戶端發(fā)送消息,這樣才能完成私聊的功能。即
用戶1-->服務(wù)端-->用戶2

依賴

<!-- websocket -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

配置類

package com.shm.config;

import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;

@Configuration
public class WebSocketConfig {

    /**
     * 注入一個(gè)ServerEndpointExporter,
     * 該Bean會(huì)自動(dòng)注冊(cè)使用@ServerEndpoint注解 聲明的websocket endpoint
     */
    @Bean
    public ServerEndpointExporter serverEndpointExporter() {
        return new ServerEndpointExporter();
    }

}

WebSocket服務(wù)

需要注意的是,Websocket是多例模式,無法直接使用@Autowired注解來注入rabbitTemplate,需要使用下面的方式,其中rabbitTemplate為靜態(tài)變量

private static RabbitTemplate rabbitTemplate;

 @Autowired
 public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {
     WebSocketServer.rabbitTemplate = rabbitTemplate;
 }
package com.shm.component;

import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.ruoyi.common.core.domain.entity.Chat;
import com.shm.component.delay.DelayQueueManager;
import com.shm.component.delay.DelayTask;
import com.shm.constant.RabbitMqConstant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;


/**
 * @author websocket服務(wù)
 */
@ServerEndpoint(value = "/websocket/{username}")
@Component//將WebSocketServer注冊(cè)為spring的一個(gè)bean
public class WebSocketServer {

    private static final Logger log = LoggerFactory.getLogger(WebSocketServer.class);

    /**
     * 記錄當(dāng)前在線連接的客戶端的session
     */
    public static final Map<String, Session> usernameAndSessionMap = new ConcurrentHashMap<>();
    /**
     * 記錄正在進(jìn)行的聊天的發(fā)出者和接收者
     */
    public static final Map<String, Integer> fromToMap = new ConcurrentHashMap<>();
    /**
     * 用戶Session保留時(shí)間,如果超過該時(shí)間,用戶還沒有給服務(wù)端發(fā)送消息,認(rèn)為用戶下線,刪除其Session
     * 注意:該時(shí)間需要比客戶端的心跳時(shí)間更長(zhǎng)
     */
    private static final long expire = 6000;

    // websocket為多例模式,無法直接注入,需要換成下面的方式
//    @Autowired
//    RabbitTemplate rabbitTemplate;

    private static RabbitTemplate rabbitTemplate;

    @Autowired
    public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {
        WebSocketServer.rabbitTemplate = rabbitTemplate;
    }

    @Autowired
    private static DelayQueueManager delayQueueManager;

    @Autowired
    public void setDelayQueueManager(DelayQueueManager delayQueueManager) {
        WebSocketServer.delayQueueManager = delayQueueManager;
    }

    /**
     * 瀏覽器和服務(wù)端連接建立成功之后會(huì)調(diào)用這個(gè)方法
     */
    @OnOpen
    public void onOpen(Session session, @PathParam("username") String username) {
        usernameAndSessionMap.put(username, session);
        // 建立延時(shí)任務(wù),如果到expire時(shí)間,客戶端還是沒有和服務(wù)器有任何交互的話,就刪除該用戶的session,表示該用戶下線
        delayQueueManager.put(new DelayTask(username, expire));
        log.info("有新用戶加入,username={}, 當(dāng)前在線人數(shù)為:{}", username, usernameAndSessionMap.size());
    }

    /**
     * 連接關(guān)閉調(diào)用的方法
     */
    @OnClose
    public void onClose(Session session, @PathParam("username") String username) {
        usernameAndSessionMap.remove(username);
        log.info("有一連接關(guān)閉,移除username={}的用戶session, 當(dāng)前在線人數(shù)為:{}", username, usernameAndSessionMap.size());
    }

    /**
     * 發(fā)生錯(cuò)誤的時(shí)候會(huì)調(diào)用這個(gè)方法
     */
    @OnError
    public void onError(Session session, Throwable error) {
        log.error("發(fā)生錯(cuò)誤");
        error.printStackTrace();
    }

    /**
     * 服務(wù)端發(fā)送消息給客戶端
     */
    public void sendMessage(String message, Session toSession) {
        try {
            log.info("服務(wù)端給客戶端[{}]發(fā)送消息{}", toSession.getId(), message);
            toSession.getBasicRemote().sendText(message);
        } catch (Exception e) {
            log.error("服務(wù)端發(fā)送消息給客戶端失敗", e);
        }
    }


    /**
     * onMessage方法是一個(gè)消息的中轉(zhuǎn)站
     * 1、首先接受瀏覽器端socket.send發(fā)送過來的json數(shù)據(jù)
     * 2、然后解析其數(shù)據(jù),找到消息要發(fā)送給誰
     * 3、最后將數(shù)據(jù)發(fā)送給相應(yīng)的人
     *
     * @param message 客戶端發(fā)送過來的消息 數(shù)據(jù)格式:{"from":"user1","to":"admin","text":"你好呀"}
     */
    @OnMessage
    public void onMessage(String message, Session session, @PathParam("username") String username) {
//        log.info("服務(wù)端接收到 {} 的消息,消息內(nèi)容是:{}", username, message);

        // 收到用戶的信息,刪除之前的延時(shí)任務(wù),創(chuàng)建新的延時(shí)任務(wù)
        delayQueueManager.put(new DelayTask(username, expire));
        if (!usernameAndSessionMap.containsKey(username)) {
            // 可能用戶掛機(jī)了一段時(shí)間,被下線了,后面又重新回來發(fā)信息了,需要重新將用戶和session添加字典中
            usernameAndSessionMap.put(username, session);
        }

        // 將json字符串轉(zhuǎn)化為json對(duì)象
        JSONObject obj = JSON.parseObject(message);
        String status = (String) obj.get("status");
        // 獲取消息的內(nèi)容
        String text = (String) obj.get("text");
        // 查看消息要發(fā)送給哪個(gè)用戶
        String to = (String) obj.get("to");
        String fromToKey = username + "-" + to;
        String toFromKey = to + "-" + username;
        if (status != null) {
            if (status.equals("start")) {
                fromToMap.put(fromToKey, 1);
            } else if (status.equals("end")) {
                System.out.println("移除銷毀的fromToKey:" + fromToKey);
                fromToMap.remove(fromToKey);
            } else if (status.equals("ping")) {
                // 更新用戶對(duì)應(yīng)的時(shí)間戳
//                usernameAndTimeStampMap.put(username, System.currentTimeMillis());
            }
        } else {
            // 封裝數(shù)據(jù)發(fā)送給消息隊(duì)列
            Chat chat = new Chat();
            chat.setFromWho(username);
            chat.setToWho(to);
            chat.setContent(text);
            chat.setIsRead(0);
            //        chat.setPicUrl("");

            // 根據(jù)to來獲取相應(yīng)的session,然后通過session將消息內(nèi)容轉(zhuǎn)發(fā)給相應(yīng)的用戶
            Session toSession = usernameAndSessionMap.get(to);
            if (toSession != null) {
                JSONObject jsonObject = new JSONObject();
                // 設(shè)置消息來源的用戶名
                jsonObject.put("from", username);
                // 設(shè)置消息內(nèi)容
                jsonObject.put("text", text);
                // 服務(wù)端發(fā)送消息給目標(biāo)客戶端
                this.sendMessage(jsonObject.toString(), toSession);
                log.info("發(fā)送消息給用戶 {} ,消息內(nèi)容是:{} ", toSession, jsonObject.toString());
                if (fromToMap.containsKey(toFromKey)) {
                    chat.setIsRead(1);
                }
            } else {
                log.info("發(fā)送失敗,未找到用戶 {} 的session", to);
            }

            rabbitTemplate.convertAndSend(RabbitMqConstant.CHAT_STORAGE_EXCHANGE, RabbitMqConstant.CHAT_STORAGE_ROUTER_KEY, chat);
        }
    }

}

RabbitMQ引入

為什么使用消息隊(duì)列

在用戶之間進(jìn)行聊天的時(shí)候,需要將用戶的聊天數(shù)據(jù)存儲(chǔ)到數(shù)據(jù)庫中,但是如果大量用戶同時(shí)在線的話,可能同一時(shí)間發(fā)送的消息數(shù)量太多,如果同時(shí)將這些消息存儲(chǔ)到數(shù)據(jù)庫中,會(huì)給數(shù)據(jù)庫帶來較大的壓力,使用RabbitMQ可以先把要存儲(chǔ)的數(shù)據(jù)放到消息隊(duì)列,然后數(shù)據(jù)庫服務(wù)器壓力沒這么大的時(shí)候,就會(huì)從消息隊(duì)列中獲取數(shù)據(jù)來存儲(chǔ),這樣可以分散數(shù)據(jù)庫的壓力。但是如果用戶是直接從數(shù)據(jù)庫獲取消息的話,消息可能有一定的延遲,如果用戶之間正在聊天的話,消息則不會(huì)延遲,因?yàn)榱奶靸?nèi)容會(huì)立刻通過WebSocket發(fā)送給對(duì)方。

依賴

<!-- rabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

啟動(dòng)類添加注解

在啟動(dòng)類的上方添加@EnableRabbit注解
【UniApp開發(fā)小程序】私聊功能后端實(shí)現(xiàn) (買家、賣家 溝通商品信息)【后端基于若依管理系統(tǒng)開發(fā)】,小程序開發(fā),小程序,WebSocket,RabbitMQ,延時(shí)任務(wù),SpringBoot

常量類

因?yàn)橛卸嗵帟?huì)使用到隊(duì)列命名等信息,創(chuàng)建一個(gè)常量類來保存相關(guān)信息

package com.shm.constant;

public class RabbitMqConstant {
    public static final String CHAT_STORAGE_QUEUE = "shm.chat-storage.queue";
    public static final String CHAT_STORAGE_EXCHANGE = "shm.chat-storage-event-exchange";
    public static final String CHAT_STORAGE_ROUTER_KEY = "shm.chat-storage.register";
}

使用配置類創(chuàng)建隊(duì)列、交換機(jī)、綁定關(guān)系

package com.shm.config;

import com.shm.constant.RabbitMqConstant;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class MyRabbitConfig {

    /**
     * 使用JSON序列化機(jī)制,進(jìn)行消息轉(zhuǎn)換
     * @return
     */
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }

    /**
     * 私信存儲(chǔ)隊(duì)列
     *
     * @return
     */
    @Bean
    public Queue chatStorageQueue() {
        Queue queue = new Queue(RabbitMqConstant.CHAT_STORAGE_QUEUE, true, false, false);
        return queue;
    }

    /**
     * 私信存儲(chǔ)交換機(jī)
     * 創(chuàng)建交換機(jī),由于只需要一個(gè)隊(duì)列,創(chuàng)建direct交換機(jī)
     *
     * @return
     */
    @Bean
    public Exchange chatStorageExchange() {
        //durable:持久化
        return new DirectExchange(RabbitMqConstant.CHAT_STORAGE_EXCHANGE, true, false);
    }

    /**
     * 創(chuàng)建私信存儲(chǔ) 交換機(jī)和隊(duì)列的綁定關(guān)系
     *
     * @return
     */
    @Bean
    public Binding chatStorageBinding() {
        return new Binding(RabbitMqConstant.CHAT_STORAGE_QUEUE,
                Binding.DestinationType.QUEUE,
                RabbitMqConstant.CHAT_STORAGE_EXCHANGE,
                RabbitMqConstant.CHAT_STORAGE_ROUTER_KEY,
                null);
    }

}

消息監(jiān)聽器

創(chuàng)建一個(gè)消息監(jiān)聽類來監(jiān)聽隊(duì)列的消息,然后調(diào)用相關(guān)的邏輯來處理信息,本文主要的處理是將私信內(nèi)容存儲(chǔ)到數(shù)據(jù)庫中

package com.shm.listener;

import com.rabbitmq.client.Channel;
import com.ruoyi.common.core.domain.entity.Chat;
import com.shm.constant.RabbitMqConstant;
import com.shm.service.IChatService;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.io.IOException;

@Service
/**
 * 注意,類上面需要RabbitListener注解
 */
@RabbitListener(queues = RabbitMqConstant.CHAT_STORAGE_QUEUE)
public class ChatStorageListener {
    @Autowired
    private IChatService chatService;

    @RabbitHandler
    public void handleStockLockedRelease(Chat chat, Message message, Channel channel) throws IOException {
        try {
            boolean save = chatService.save(chat);
            //解鎖成功,手動(dòng)確認(rèn),消息才從MQ中刪除
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (Exception e) {
            //只要有異常,拒絕消息,讓消息重新返回隊(duì)列,讓別的消費(fèi)者繼續(xù)解鎖
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
        }

    }

}

發(fā)送消息到消息隊(duì)列

WebSocketServer為Websocket后端服務(wù)代碼,其中的onMessage方法會(huì)接受客戶端發(fā)送過來的消息,當(dāng)接收到消息的時(shí)候,將消息發(fā)送給消息隊(duì)列

// 封裝數(shù)據(jù)發(fā)送給消息隊(duì)列
Chat chat = new Chat();
chat.setFromWho(username);
chat.setToWho(to);
chat.setContent(text);
chat.setPicUrl("");
rabbitTemplate.convertAndSend(RabbitMqConstant.CHAT_STORAGE_EXCHANGE,RabbitMqConstant.CHAT_STORAGE_ROUTER_KEY,chat);

延時(shí)任務(wù)

為什么使用延時(shí)任務(wù)

為了更好地感知用戶的在線狀態(tài),在用戶連接了WebSocket或者發(fā)送消息之后,建立一個(gè)延時(shí)任務(wù),如果到達(dá)了所設(shè)定的延時(shí)時(shí)間,就刪除用戶的Session,認(rèn)為用戶已經(jīng)下線;如果在延時(shí)期間之內(nèi),用戶發(fā)送了新消息,或者發(fā)送了心跳信號(hào),證明該用戶還處于在線狀態(tài),刪除前面的延時(shí)任務(wù),并創(chuàng)建新的延時(shí)任務(wù)

延時(shí)任務(wù)類

package com.shm.component.delay;

import lombok.Data;
import lombok.Getter;

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

/**
 * @Author dam
 * @create 2023/8/25 15:12
 */
@Getter
public class DelayTask implements Delayed {

    /**
     * 用戶名
     */
    private final String userName;
    /**
     * 任務(wù)的真正執(zhí)行時(shí)間
     */
    private final long executeTime;
    /**
     * 任務(wù)延時(shí)多久執(zhí)行
     */
    private final long expire;

    /**
     * @param expire 任務(wù)需要延時(shí)的時(shí)間
     */
    public DelayTask(String userName, long expire) {
        this.userName = userName;
        this.executeTime = expire + System.currentTimeMillis();
        this.expire = expire;
    }

    /**
     * 根據(jù)給定的時(shí)間單位,返回與此對(duì)象關(guān)聯(lián)的剩余延遲時(shí)間
     * 
     * @param unit the time unit 時(shí)間單位
     * @return 返回剩余延遲,零值或負(fù)值表示延遲已經(jīng)過去
     */
    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(this.executeTime - System.currentTimeMillis(), unit);
    }

    @Override
    public int compareTo(Delayed o) {
        return 0;
    }
}

延時(shí)任務(wù)管理

package com.shm.component.delay;

import com.shm.component.WebSocketServer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Executors;

/**
 * @Author dam
 * @create 2023/8/25 15:12
 */
@Component
@Slf4j
public class DelayQueueManager implements CommandLineRunner {
    private final DelayQueue<DelayTask> delayQueue = new DelayQueue<>();
    private final Map<String, DelayTask> usernameAndDelayTaskMap = new ConcurrentHashMap<>();

    /**
     * 加入到延時(shí)隊(duì)列中
     *
     * @param task
     */
    public void put(DelayTask task) {
        // 因?yàn)橐粋€(gè)用戶只能對(duì)應(yīng)一個(gè)延時(shí)任務(wù),所以如果已經(jīng)存在了延時(shí)任務(wù),將其進(jìn)行刪除
        if (usernameAndDelayTaskMap.containsKey(task.getUserName())) {
            this.remove(task.getUserName());
        }
        delayQueue.put(task);
        usernameAndDelayTaskMap.put(task.getUserName(), task);
    }

    /**
     * 取消延時(shí)任務(wù)
     *
     * @param username 要?jiǎng)h除的任務(wù)的用戶名
     * @return
     */
    public boolean remove(String username) {
        DelayTask remove = usernameAndDelayTaskMap.remove(username);
        return delayQueue.remove(remove);
    }

    @Override
    public void run(String... args) throws Exception {
        this.executeThread();
    }

    /**
     * 延時(shí)任務(wù)執(zhí)行線程
     */
    private void executeThread() {
        while (true) {
            try {
                DelayTask task = delayQueue.take();
                //執(zhí)行任務(wù)
                processTask(task);
            } catch (InterruptedException e) {
                break;
            }
        }
    }

    /**
     * 執(zhí)行延時(shí)任務(wù)
     *
     * @param task
     */
    private void processTask(DelayTask task) {
        // 刪除該用戶的session,表示用戶下線
        WebSocketServer.usernameAndSessionMap.remove(task.getUserName());
        log.error("執(zhí)行定時(shí)任務(wù):{}下線", task.getUserName());
    }
    
}

同項(xiàng)目其他文章

該項(xiàng)目的其他文章請(qǐng)查看【易售小程序項(xiàng)目】項(xiàng)目介紹、小程序頁面展示與系列文章集合文章來源地址http://www.zghlxwxcb.cn/news/detail-673083.html

到了這里,關(guān)于【UniApp開發(fā)小程序】私聊功能后端實(shí)現(xiàn) (買家、賣家 溝通商品信息)【后端基于若依管理系統(tǒng)開發(fā)】的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包