項(xiàng)目介紹
核心功能:統(tǒng)一的接口發(fā)送各種類型消息,對(duì)消息生命周期全鏈路追蹤。
意義:只要公司內(nèi)部有發(fā)送消息的需求,都應(yīng)該要有類似austin
的項(xiàng)目。消息推送平臺(tái)對(duì)各類消息進(jìn)行統(tǒng)一發(fā)送處理,這有利于對(duì)功能的收攏,以及提高業(yè)務(wù)需求開發(fā)的效率。
項(xiàng)目地址:https://github.com/ZhongFuCheng3y/austin
項(xiàng)目拆解
下發(fā)消息接口,分為群發(fā)和單發(fā)。接口參數(shù)主要有模板id(發(fā)送消息的內(nèi)容模板),參數(shù)[用來替換模板參數(shù),接收人],api可以存在多個(gè),但是具體處理的方法只需要批量參數(shù)就可以。
單發(fā)的接口請(qǐng)求實(shí)體類
public class SendRequest {
/**
* 執(zhí)行業(yè)務(wù)類型
* send:發(fā)送消息
* recall:撤回消息
*/
private String code;
/**
* 消息模板Id
* 【必填】
*/
private Long messageTemplateId;
/**
* 消息相關(guān)的參數(shù)
* 當(dāng)業(yè)務(wù)類型為"send",必傳
*/
private MessageParam messageParam;
}
群發(fā)的接口實(shí)體類
public class BatchSendRequest {
/**
* 執(zhí)行業(yè)務(wù)類型
* 必傳,參考 BusinessCode枚舉
*/
private String code;
/**
* 消息模板Id
* 必傳
*/
private Long messageTemplateId;
/**
* 消息相關(guān)的參數(shù)
* 必傳
*/
private List<MessageParam> messageParamList;
}
單個(gè)消息的實(shí)體
public class MessageParam {
/**
* @Description: 接收者
* 多個(gè)用,逗號(hào)號(hào)分隔開
* 【不能大于100個(gè)】
* 必傳
*/
private String receiver;
/**
* @Description: 消息內(nèi)容中的可變部分(占位符替換)
* 可選
*/
private Map<String, String> variables;
/**
* @Description: 擴(kuò)展參數(shù)
* 可選
*/
private Map<String, String> extra;
}
消息模板中定義了發(fā)送渠道,消息渠道決定消息的處理器。
在AssembleAction
轉(zhuǎn)換實(shí)體類。核心邏輯有
- 將模板中的可變參數(shù)轉(zhuǎn)化成文本內(nèi)容
- 根據(jù)消息渠道獲取消息實(shí)體類
- 生成業(yè)務(wù)編碼
AssembleAction#getContentModelValue
,ContentHolderUtil.replacePlaceHolder(originValue, variables);
用來處理可替換變量。
private static ContentModel getContentModelValue(MessageTemplate messageTemplate, MessageParam messageParam) {
// 得到真正的ContentModel 類型
Integer sendChannel = messageTemplate.getSendChannel();
Class<? extends ContentModel> contentModelClass = ChannelType.getChanelModelClassByCode(sendChannel);
// 得到模板的 msgContent 和 入?yún)?/span>
Map<String, String> variables = messageParam.getVariables();
JSONObject jsonObject = JSON.parseObject(messageTemplate.getMsgContent());
// 通過反射 組裝出 contentModel
Field[] fields = ReflectUtil.getFields(contentModelClass);
ContentModel contentModel = ReflectUtil.newInstance(contentModelClass);
for (Field field : fields) {
String originValue = jsonObject.getString(field.getName());
if (StrUtil.isNotBlank(originValue)) {
String resultValue = ContentHolderUtil.replacePlaceHolder(originValue, variables);
Object resultObj = JSONUtil.isJsonObj(resultValue) ? JSONUtil.toBean(resultValue, field.getType()) : resultValue;
ReflectUtil.setFieldValue(contentModel, field, resultObj);
}
}
// 如果 url 字段存在,則在url拼接對(duì)應(yīng)的埋點(diǎn)參數(shù)
String url = (String) ReflectUtil.getFieldValue(contentModel, LINK_NAME);
if (StrUtil.isNotBlank(url)) {
String resultUrl = TaskInfoUtils.generateUrl(url, messageTemplate.getId(), messageTemplate.getTemplateType());
ReflectUtil.setFieldValue(contentModel, LINK_NAME, resultUrl);
}
return contentModel;
}
在SendMqAction
負(fù)責(zé)發(fā)送消息,根據(jù)austin.mq.pipeline
的值獲取不同的消息發(fā)送者,有MQ發(fā)消息,有EventBus,有SpringEvent。
@Slf4j
@Service
public class SendMqAction implements BusinessProcess<SendTaskModel> {
@Autowired
private SendMqService sendMqService;
@Value("${austin.business.topic.name}")
private String sendMessageTopic;
@Value("${austin.business.recall.topic.name}")
private String austinRecall;
@Value("${austin.business.tagId.value}")
private String tagId;
@Value("${austin.mq.pipeline}")
private String mqPipeline;
@Override
public void process(ProcessContext<SendTaskModel> context) {
SendTaskModel sendTaskModel = context.getProcessModel();
try {
if (BusinessCode.COMMON_SEND.getCode().equals(context.getCode())) {
String message = JSON.toJSONString(sendTaskModel.getTaskInfo(), new SerializerFeature[]{SerializerFeature.WriteClassName});
sendMqService.send(sendMessageTopic, message, tagId);
} else if (BusinessCode.RECALL.getCode().equals(context.getCode())) {
String message = JSON.toJSONString(sendTaskModel.getMessageTemplate(), new SerializerFeature[]{SerializerFeature.WriteClassName});
sendMqService.send(austinRecall, message, tagId);
}
} catch (Exception e) {
context.setNeedBreak(true).setResponse(BasicResultVO.fail(RespStatusEnum.SERVICE_ERROR));
log.error("send {} fail! e:{},params:{}", mqPipeline, Throwables.getStackTraceAsString(e)
, JSON.toJSONString(CollUtil.getFirst(sendTaskModel.getTaskInfo().listIterator())));
}
}
}
ConsumeServiceImpl#consume2Send
,負(fù)責(zé)處理消息
@Override
public void consume2Send(List<TaskInfo> taskInfoLists) {
String topicGroupId = GroupIdMappingUtils.getGroupIdByTaskInfo(CollUtil.getFirst(taskInfoLists.iterator()));
for (TaskInfo taskInfo : taskInfoLists) {
logUtils.print(LogParam.builder().bizType(LOG_BIZ_TYPE).object(taskInfo).build(), AnchorInfo.builder().ids(taskInfo.getReceiver()).businessId(taskInfo.getBusinessId()).state(AnchorState.RECEIVE.getCode()).build());
Task task = context.getBean(Task.class).setTaskInfo(taskInfo);
taskPendingHolder.route(topicGroupId).execute(task);
}
}
Task#run
,負(fù)責(zé)任務(wù)的具體處理
@Override
public void run() {
// 0. 丟棄消息
if (discardMessageService.isDiscard(taskInfo)) {
return;
}
// 1. 屏蔽消息
shieldService.shield(taskInfo);
// 2.平臺(tái)通用去重
if (CollUtil.isNotEmpty(taskInfo.getReceiver())) {
deduplicationRuleService.duplication(taskInfo);
}
// 3. 真正發(fā)送消息
if (CollUtil.isNotEmpty(taskInfo.getReceiver())) {
handlerHolder.route(taskInfo.getSendChannel()).doHandler(taskInfo);
}
}
EmailHandler#handler
,郵件處理器專門處理郵件的消息
@Override
public boolean handler(TaskInfo taskInfo) {
EmailContentModel emailContentModel = (EmailContentModel) taskInfo.getContentModel();
MailAccount account = getAccountConfig(taskInfo.getSendAccount());
try {
File file = StrUtil.isNotBlank(emailContentModel.getUrl()) ? AustinFileUtils.getRemoteUrl2File(dataPath, emailContentModel.getUrl()) : null;
String result = Objects.isNull(file) ? MailUtil.send(account, taskInfo.getReceiver(), emailContentModel.getTitle(), emailContentModel.getContent(), true) :
MailUtil.send(account, taskInfo.getReceiver(), emailContentModel.getTitle(), emailContentModel.getContent(), true, file);
} catch (Exception e) {
log.error("EmailHandler#handler fail!{},params:{}", Throwables.getStackTraceAsString(e), taskInfo);
return false;
}
return true;
}
ShieldServiceImpl#shield
,發(fā)送消息判斷是否需要白天屏蔽。將消息存儲(chǔ)到Redis中,開啟xxl-job從redis中獲取數(shù)據(jù)。文章來源:http://www.zghlxwxcb.cn/news/detail-498903.html
@Override
public void shield(TaskInfo taskInfo) {
if (ShieldType.NIGHT_NO_SHIELD.getCode().equals(taskInfo.getShieldType())) {
return;
}
/**
* example:當(dāng)消息下發(fā)至austin平臺(tái)時(shí),已經(jīng)是凌晨1點(diǎn),業(yè)務(wù)希望此類消息在次日的早上9點(diǎn)推送
* (配合 分布式任務(wù)定時(shí)任務(wù)框架搞掂)
*/
if (isNight()) {
if (ShieldType.NIGHT_SHIELD.getCode().equals(taskInfo.getShieldType())) {
logUtils.print(AnchorInfo.builder().state(AnchorState.NIGHT_SHIELD.getCode())
.businessId(taskInfo.getBusinessId()).ids(taskInfo.getReceiver()).build());
}
if (ShieldType.NIGHT_SHIELD_BUT_NEXT_DAY_SEND.getCode().equals(taskInfo.getShieldType())) {
redisUtils.lPush(NIGHT_SHIELD_BUT_NEXT_DAY_SEND_KEY, JSON.toJSONString(taskInfo,
SerializerFeature.WriteClassName),
(DateUtil.offsetDay(new Date(), 1).getTime() / 1000) - DateUtil.currentSeconds());
logUtils.print(AnchorInfo.builder().state(AnchorState.NIGHT_SHIELD_NEXT_SEND.getCode()).businessId(taskInfo.getBusinessId()).ids(taskInfo.getReceiver()).build());
}
taskInfo.setReceiver(new HashSet<>());
}
}
/**
* 小時(shí) < 8 默認(rèn)就認(rèn)為是凌晨(夜晚)
*
* @return
*/
private boolean isNight() {
return LocalDateTime.now().getHour() < 8;
}
總結(jié)一下
- 下發(fā)消息接口,分為群發(fā)和單發(fā)。接口參數(shù)主要有模板id(發(fā)送消息的內(nèi)容模板),模板中定義了消息渠道(消息類型決定消息的處理器),參數(shù)[用來替換模板參數(shù),接收人],api可以存在多個(gè),但是具體處理的方法只需要批量參數(shù)就可以。
- 根據(jù)模板組裝數(shù)據(jù),替換變量。
- 消息發(fā)送,使用監(jiān)聽器或者消息隊(duì)列,進(jìn)行異步解耦。消息發(fā)送的業(yè)務(wù)和消息接收的業(yè)務(wù)拆解開來。
- 根據(jù)發(fā)送渠道獲取對(duì)應(yīng)的消息處理器(用郵件還是短信),使用策略模式進(jìn)行不同的消息渠道拆分,用具體的消息處理器進(jìn)行處理消息。
文章來源地址http://www.zghlxwxcb.cn/news/detail-498903.html
到了這里,關(guān)于【開源項(xiàng)目】消息推送平臺(tái)austin介紹的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!