国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

【開源項(xiàng)目】消息推送平臺(tái)austin介紹

這篇具有很好參考價(jià)值的文章主要介紹了【開源項(xiàng)目】消息推送平臺(tái)austin介紹。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

項(xiàng)目介紹

核心功能:統(tǒng)一的接口發(fā)送各種類型消息,對(duì)消息生命周期全鏈路追蹤。

意義:只要公司內(nèi)部有發(fā)送消息的需求,都應(yīng)該要有類似austin的項(xiàng)目。消息推送平臺(tái)對(duì)各類消息進(jìn)行統(tǒng)一發(fā)送處理,這有利于對(duì)功能的收攏,以及提高業(yè)務(wù)需求開發(fā)的效率。

項(xiàng)目地址https://github.com/ZhongFuCheng3y/austin

項(xiàng)目拆解

下發(fā)消息接口,分為群發(fā)和單發(fā)。接口參數(shù)主要有模板id(發(fā)送消息的內(nèi)容模板),參數(shù)[用來替換模板參數(shù),接收人],api可以存在多個(gè),但是具體處理的方法只需要批量參數(shù)就可以。

單發(fā)的接口請(qǐng)求實(shí)體類

public class SendRequest {

    /**
     * 執(zhí)行業(yè)務(wù)類型
     * send:發(fā)送消息
     * recall:撤回消息
     */
    private String code;

    /**
     * 消息模板Id
     * 【必填】
     */
    private Long messageTemplateId;


    /**
     * 消息相關(guān)的參數(shù)
     * 當(dāng)業(yè)務(wù)類型為"send",必傳
     */
    private MessageParam messageParam;


}

群發(fā)的接口實(shí)體類

public class BatchSendRequest {

    /**
     * 執(zhí)行業(yè)務(wù)類型
     * 必傳,參考 BusinessCode枚舉
     */
    private String code;


    /**
     * 消息模板Id
     * 必傳
     */
    private Long messageTemplateId;


    /**
     * 消息相關(guān)的參數(shù)
     * 必傳
     */
    private List<MessageParam> messageParamList;

}

單個(gè)消息的實(shí)體

public class MessageParam {

    /**
     * @Description: 接收者
     * 多個(gè)用,逗號(hào)號(hào)分隔開
     * 【不能大于100個(gè)】
     * 必傳
     */
    private String receiver;

    /**
     * @Description: 消息內(nèi)容中的可變部分(占位符替換)
     * 可選
     */
    private Map<String, String> variables;

    /**
     * @Description: 擴(kuò)展參數(shù)
     * 可選
     */
    private Map<String, String> extra;
}

消息模板中定義了發(fā)送渠道,消息渠道決定消息的處理器。

AssembleAction轉(zhuǎn)換實(shí)體類。核心邏輯有

  • 將模板中的可變參數(shù)轉(zhuǎn)化成文本內(nèi)容
  • 根據(jù)消息渠道獲取消息實(shí)體類
  • 生成業(yè)務(wù)編碼

AssembleAction#getContentModelValueContentHolderUtil.replacePlaceHolder(originValue, variables);用來處理可替換變量。

private static ContentModel getContentModelValue(MessageTemplate messageTemplate, MessageParam messageParam) {

    // 得到真正的ContentModel 類型
    Integer sendChannel = messageTemplate.getSendChannel();
    Class<? extends ContentModel> contentModelClass = ChannelType.getChanelModelClassByCode(sendChannel);

    // 得到模板的 msgContent 和 入?yún)?/span>
    Map<String, String> variables = messageParam.getVariables();
    JSONObject jsonObject = JSON.parseObject(messageTemplate.getMsgContent());


    // 通過反射 組裝出 contentModel
    Field[] fields = ReflectUtil.getFields(contentModelClass);
    ContentModel contentModel = ReflectUtil.newInstance(contentModelClass);
    for (Field field : fields) {
        String originValue = jsonObject.getString(field.getName());

        if (StrUtil.isNotBlank(originValue)) {
            String resultValue = ContentHolderUtil.replacePlaceHolder(originValue, variables);
            Object resultObj = JSONUtil.isJsonObj(resultValue) ? JSONUtil.toBean(resultValue, field.getType()) : resultValue;
            ReflectUtil.setFieldValue(contentModel, field, resultObj);
        }
    }

    // 如果 url 字段存在,則在url拼接對(duì)應(yīng)的埋點(diǎn)參數(shù)
    String url = (String) ReflectUtil.getFieldValue(contentModel, LINK_NAME);
    if (StrUtil.isNotBlank(url)) {
        String resultUrl = TaskInfoUtils.generateUrl(url, messageTemplate.getId(), messageTemplate.getTemplateType());
        ReflectUtil.setFieldValue(contentModel, LINK_NAME, resultUrl);
    }
    return contentModel;
}

SendMqAction負(fù)責(zé)發(fā)送消息,根據(jù)austin.mq.pipeline的值獲取不同的消息發(fā)送者,有MQ發(fā)消息,有EventBus,有SpringEvent。

@Slf4j
@Service
public class SendMqAction implements BusinessProcess<SendTaskModel> {


    @Autowired
    private SendMqService sendMqService;

    @Value("${austin.business.topic.name}")
    private String sendMessageTopic;

    @Value("${austin.business.recall.topic.name}")
    private String austinRecall;
    @Value("${austin.business.tagId.value}")
    private String tagId;

    @Value("${austin.mq.pipeline}")
    private String mqPipeline;


    @Override
    public void process(ProcessContext<SendTaskModel> context) {
        SendTaskModel sendTaskModel = context.getProcessModel();
        try {
            if (BusinessCode.COMMON_SEND.getCode().equals(context.getCode())) {
                String message = JSON.toJSONString(sendTaskModel.getTaskInfo(), new SerializerFeature[]{SerializerFeature.WriteClassName});
                sendMqService.send(sendMessageTopic, message, tagId);
            } else if (BusinessCode.RECALL.getCode().equals(context.getCode())) {
                String message = JSON.toJSONString(sendTaskModel.getMessageTemplate(), new SerializerFeature[]{SerializerFeature.WriteClassName});
                sendMqService.send(austinRecall, message, tagId);
            }
        } catch (Exception e) {
            context.setNeedBreak(true).setResponse(BasicResultVO.fail(RespStatusEnum.SERVICE_ERROR));
            log.error("send {} fail! e:{},params:{}", mqPipeline, Throwables.getStackTraceAsString(e)
                    , JSON.toJSONString(CollUtil.getFirst(sendTaskModel.getTaskInfo().listIterator())));
        }
    }

}

ConsumeServiceImpl#consume2Send,負(fù)責(zé)處理消息

    @Override
    public void consume2Send(List<TaskInfo> taskInfoLists) {
        String topicGroupId = GroupIdMappingUtils.getGroupIdByTaskInfo(CollUtil.getFirst(taskInfoLists.iterator()));
        for (TaskInfo taskInfo : taskInfoLists) {
            logUtils.print(LogParam.builder().bizType(LOG_BIZ_TYPE).object(taskInfo).build(), AnchorInfo.builder().ids(taskInfo.getReceiver()).businessId(taskInfo.getBusinessId()).state(AnchorState.RECEIVE.getCode()).build());
            Task task = context.getBean(Task.class).setTaskInfo(taskInfo);
            taskPendingHolder.route(topicGroupId).execute(task);
        }
    }

Task#run,負(fù)責(zé)任務(wù)的具體處理

    @Override
    public void run() {

        // 0. 丟棄消息
        if (discardMessageService.isDiscard(taskInfo)) {
            return;
        }
        // 1. 屏蔽消息
        shieldService.shield(taskInfo);

        // 2.平臺(tái)通用去重
        if (CollUtil.isNotEmpty(taskInfo.getReceiver())) {
            deduplicationRuleService.duplication(taskInfo);
        }

        // 3. 真正發(fā)送消息
        if (CollUtil.isNotEmpty(taskInfo.getReceiver())) {
            handlerHolder.route(taskInfo.getSendChannel()).doHandler(taskInfo);
        }

    }

EmailHandler#handler,郵件處理器專門處理郵件的消息

    @Override
    public boolean handler(TaskInfo taskInfo) {
        EmailContentModel emailContentModel = (EmailContentModel) taskInfo.getContentModel();
        MailAccount account = getAccountConfig(taskInfo.getSendAccount());
        try {
            File file = StrUtil.isNotBlank(emailContentModel.getUrl()) ? AustinFileUtils.getRemoteUrl2File(dataPath, emailContentModel.getUrl()) : null;
            String result = Objects.isNull(file) ? MailUtil.send(account, taskInfo.getReceiver(), emailContentModel.getTitle(), emailContentModel.getContent(), true) :
                    MailUtil.send(account, taskInfo.getReceiver(), emailContentModel.getTitle(), emailContentModel.getContent(), true, file);
        } catch (Exception e) {
            log.error("EmailHandler#handler fail!{},params:{}", Throwables.getStackTraceAsString(e), taskInfo);
            return false;
        }
        return true;
    }

ShieldServiceImpl#shield,發(fā)送消息判斷是否需要白天屏蔽。將消息存儲(chǔ)到Redis中,開啟xxl-job從redis中獲取數(shù)據(jù)。

    @Override
    public void shield(TaskInfo taskInfo) {

        if (ShieldType.NIGHT_NO_SHIELD.getCode().equals(taskInfo.getShieldType())) {
            return;
        }

        /**
         * example:當(dāng)消息下發(fā)至austin平臺(tái)時(shí),已經(jīng)是凌晨1點(diǎn),業(yè)務(wù)希望此類消息在次日的早上9點(diǎn)推送
         * (配合 分布式任務(wù)定時(shí)任務(wù)框架搞掂)
         */
        if (isNight()) {
            if (ShieldType.NIGHT_SHIELD.getCode().equals(taskInfo.getShieldType())) {
                logUtils.print(AnchorInfo.builder().state(AnchorState.NIGHT_SHIELD.getCode())
                        .businessId(taskInfo.getBusinessId()).ids(taskInfo.getReceiver()).build());
            }
            if (ShieldType.NIGHT_SHIELD_BUT_NEXT_DAY_SEND.getCode().equals(taskInfo.getShieldType())) {
                redisUtils.lPush(NIGHT_SHIELD_BUT_NEXT_DAY_SEND_KEY, JSON.toJSONString(taskInfo,
                                SerializerFeature.WriteClassName),
                        (DateUtil.offsetDay(new Date(), 1).getTime() / 1000) - DateUtil.currentSeconds());
                logUtils.print(AnchorInfo.builder().state(AnchorState.NIGHT_SHIELD_NEXT_SEND.getCode()).businessId(taskInfo.getBusinessId()).ids(taskInfo.getReceiver()).build());
            }
            taskInfo.setReceiver(new HashSet<>());
        }
    }

    /**
     * 小時(shí) < 8 默認(rèn)就認(rèn)為是凌晨(夜晚)
     *
     * @return
     */
    private boolean isNight() {
        return LocalDateTime.now().getHour() < 8;

    }

總結(jié)一下

  1. 下發(fā)消息接口,分為群發(fā)和單發(fā)。接口參數(shù)主要有模板id(發(fā)送消息的內(nèi)容模板),模板中定義了消息渠道(消息類型決定消息的處理器),參數(shù)[用來替換模板參數(shù),接收人],api可以存在多個(gè),但是具體處理的方法只需要批量參數(shù)就可以。
  2. 根據(jù)模板組裝數(shù)據(jù),替換變量。
  3. 消息發(fā)送,使用監(jiān)聽器或者消息隊(duì)列,進(jìn)行異步解耦。消息發(fā)送的業(yè)務(wù)和消息接收的業(yè)務(wù)拆解開來。
  4. 根據(jù)發(fā)送渠道獲取對(duì)應(yīng)的消息處理器(用郵件還是短信),使用策略模式進(jìn)行不同的消息渠道拆分,用具體的消息處理器進(jìn)行處理消息。

【開源項(xiàng)目】消息推送平臺(tái)austin介紹文章來源地址http://www.zghlxwxcb.cn/news/detail-498903.html

到了這里,關(guān)于【開源項(xiàng)目】消息推送平臺(tái)austin介紹的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包