前言
官方定義消息體默認大小為 4MB,普通順序消息類型。事務(wù)、定時、延時類消息默認大小為64KB。如果超過限制則會拋出異常!
但實際工作中,需要使用到MQ進行異步解耦,傳輸?shù)臉I(yè)務(wù)消息偶爾會遇到超過4MB,尤其在業(yè)務(wù)復(fù)雜的系統(tǒng)中,那么我們應(yīng)該如何處理呢?
在我工作實際應(yīng)用中,有以下幾種解決方案。
解決方案
方案一:消息壓縮
通常我們都是傳遞json消息數(shù)據(jù),然后底層使用字節(jié)流進行傳輸。如果此時json數(shù)據(jù)超過4MB,則可以考慮進行消息壓縮。
原理其實很好理解,比如我們經(jīng)常使用的壓縮包,可以把大文件進行壓縮,依次減小文件大小。
那么我們這里需要使用到的就是字符壓縮,把json字符串進行壓縮,然后進行傳輸,原理圖如下:
經(jīng)過測試:我們原來5MB的數(shù)據(jù)可以壓縮到230KB,1MB都不到,當然效果和數(shù)據(jù)以及壓縮算法有關(guān)。如:大量重復(fù)字符則壓縮效率就更高。
壓縮解壓代碼如下:
@Slf4j
public class StringCompressUtils {
/**
* 使用gzip壓縮字符串
*
* @param str 要壓縮的字符串
* @return 壓縮結(jié)果字符
*/
public static String compress(String str) {
if (str == null || str.length() == 0) {
return str;
}
ByteArrayOutputStream out = new ByteArrayOutputStream();
GZIPOutputStream gzip = null;
try {
gzip = new GZIPOutputStream(out);
gzip.write(str.getBytes());
} catch (IOException e) {
log.error("字符串壓縮異常!", e);
e.printStackTrace();
} finally {
IoUtil.close(gzip);
}
return new sun.misc.BASE64Encoder().encode(out.toByteArray());
}
/**
* 使用gzip解壓縮
*
* @param compressedStr 壓縮字符串
* @return 解壓縮字符串
*/
public static String uncompress(String compressedStr) {
if (compressedStr == null) {
return null;
}
byte[] compressed = null;
String decompressed = null;
GZIPInputStream ginzip = null;
ByteArrayInputStream in = null;
ByteArrayOutputStream out = new ByteArrayOutputStream();
try {
// 先解碼
compressed = new sun.misc.BASE64Decoder().decodeBuffer(compressedStr);
in = new ByteArrayInputStream(compressed);
ginzip = new GZIPInputStream(in);
byte[] buffer = new byte[1024];
int offset = -1;
while ((offset = ginzip.read(buffer)) != -1) {
out.write(buffer, 0, offset);
}
decompressed = out.toString();
} catch (IOException e) {
log.error("字符串解壓縮異常!", e);
e.printStackTrace();
} finally {
// 關(guān)流
IoUtil.close(ginzip);
IoUtil.close(in);
IoUtil.close(out);
}
return decompressed;
}
壓縮流程:原始字符 -> 壓縮 -> 壓縮字符 -> 編碼
解壓流程:壓縮字符 -> 解碼 -> 解壓 -> 原始字符
方案二:消息分割
方案一基本可以解決遇到的99%消息體過大的問題,如果不行則可以使用消息分割方案。
簡而言之,就是把一個大消息體,進行分割成多個小消息體進行傳輸。運用了化整為零的思想,至于實現(xiàn)方案,有很多,我簡單舉個例子。
- 一個大消息分割成多個小消息
- 多個小消息擁有相同的消息標識,如UUID
- 分割后小消息需要有一些元數(shù)據(jù)來標識自己,如 消息標識、一共分割了多少個、自己是第幾個。
- 傳輸后,消費者消費,然后根據(jù)元數(shù)據(jù)進行數(shù)據(jù)聚合還原。
- 將還原后的消息走正常消費流程即可
方案三:OSS存儲
方案一方案二幾乎已經(jīng)解決99.99%的場景了,如果還是不夠,那就要實施核打擊了。終極方案,OSS存儲!此方法絕對可以解決100%的場景!
大消息 -> 寫入到文件 -> 上傳到文件服務(wù)器 -> 拿到URL -> 傳輸 -> 消費
生產(chǎn)者:大消息,寫入文件,上傳文件,拿到訪問連接,發(fā)送訪問連接給MQ
消費者:消費,拿到訪問鏈接,讀取文件,拿到消息,執(zhí)行業(yè)務(wù)邏輯文章來源:http://www.zghlxwxcb.cn/news/detail-794907.html
快準狠!短平快!就是增加了中間件,其他一點毛病沒有,效率更高!文章來源地址http://www.zghlxwxcb.cn/news/detail-794907.html
到了這里,關(guān)于實際生產(chǎn)環(huán)境Apache RocketMQ消息體過大的解決方案的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!