優(yōu)點:
批量發(fā)送消息可以提高rocketmq的生產(chǎn)者性能和吞吐量。
使用場景:
- 發(fā)送大量小型消息時;
- 需要降低消息發(fā)送延遲時;
- 需要提高生產(chǎn)者性能時;
注意事項:
- 消息列表的大小不能超過broker設(shè)置的最大消息大小;
- 消息列表的大小不能超過生產(chǎn)證設(shè)置的maxMessageSize 參數(shù),此參數(shù)默認為 4MB;
- 批量發(fā)送消息不支持消息事務;
- 如果代碼在發(fā)送消息列表時發(fā)生異常,則可能會發(fā)生部分消息發(fā)送成功,部分消息發(fā)送失敗的情況。如果要確保所有消息都已成功發(fā)送,則需要增加錯誤處理邏輯和消息重試機制;
批量發(fā)送消息為什么要限制maxMessageSize?
消息列表的大小不能超過生產(chǎn)者設(shè)置的maxMessageSize參數(shù),主要是為了避免消息發(fā)送延遲和消息過大導致broker出現(xiàn)性能問題。如果嘗試發(fā)送大于maxMessageSize的消息,RocketMQ會拋出MessageTooLargeException異常,并且消息不會被發(fā)送到broker。文章來源:http://www.zghlxwxcb.cn/news/detail-773690.html
如果開發(fā)者在開發(fā)時遇到了消息列表大小超過maxMessageSize的情況,可以考慮以下幾種處理方式:文章來源地址http://www.zghlxwxcb.cn/news/detail-773690.html
-
- 提升maxMessageSize參數(shù)的大小,這樣可以容納更大的消息列表。但是,需要注意在提升參數(shù)大小時,要考慮到RocketMQ broker的性能和網(wǎng)絡帶寬等因素。
- 考慮將消息列表進行拆分,然后分批發(fā)送。這樣可以避免一次發(fā)送過多的消息。
- 計算消息的大小并進行壓縮??梢允褂靡恍嚎s算法,如 LZ4、Snappy 等,對消息進行壓縮,以減小消息的大小。
- 對超過 maxMessageSize 的消息進行過濾或其他處理。可以通過業(yè)務邏輯對消息進行分組或分類,對超過 maxMessageSize 的消息進行過濾或其他處理,以避免發(fā)送超出限制的消息。
代碼實現(xiàn)
package com.resource.sync.rocketmq;
import java.util.Iterator;
import java.util.List;
/**
* @description:消息分割,在rocketmq中,一次性發(fā)送消息的長度不可超過4mb,此時我們需要進行切割,確保消息長度小于4mb
**/
public class ListSplitter<T> implements Iterator<List<T>> {
/**
* 分割數(shù)據(jù)大小
*/
private int sizeLimit;
/**
* 分割數(shù)據(jù)列表
*/
private final List<T> messages;
/**
* 分割索引
*/
private int currIndex;
public ListSplitter(int sizeLimit, List<T> messages) {
this.sizeLimit = sizeLimit;
this.messages = messages;
}
@Override
public boolean hasNext() {
return currIndex < messages.size();
}
@Override
public List<T> next() {
int nextIndex = currIndex;
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++) {
T t = messages.get(nextIndex);
totalSize = totalSize + t.toString().length();
if (totalSize > sizeLimit) {
break;
}
}
List<T> subList = messages.subList(currIndex, nextIndex);
currIndex = nextIndex;
return subList;
}
}
private final int maxMessageSize = 1024 * 1024 * 4;
/**
* 消息分割(批量發(fā)送)
*/
private void bulkSendMsg(List<Message<String>> messageList) {
// 限制數(shù)據(jù)大小
ListSplitter splitter = new ListSplitter(maxMessageSize, messageList);
while (splitter.hasNext()) {
List<Message> nextList = splitter.next();
syncBulkSendMessage("topic", nextList);
}
}
/**
* @param topic
* @param list
* @description:發(fā)送實時消息(批量)
*/
public void syncBulkSendMessage(String topic, List<Message> list) {
SendResult sendResult = null;
try {
sendResult = rocketMQTemplate.syncSend(topic, list);
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
log.error("BULK_ROCKET_MQ_DISTRIBUTION_ERROR.RESULT_STATUS:{},MSG_ID:{}", sendResult.getSendStatus(), sendResult.getMsgId());
}
if (sendResult.getSendStatus() == SendStatus.SEND_OK) {
log.info("BULK_SEND_MSG_SUCCESS.MSG_ID:{}", sendResult.getMsgId());
}
} catch (Exception e) {
log.error("BULK_ROCKET_MQ_DISTRIBUTION_ERROR:{}", e);
}
}
到了這里,關(guān)于RocketMQ如何安全的批量發(fā)送消息?的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!