国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

分布式重試服務(wù)平臺(tái) Easy-Retry

這篇具有很好參考價(jià)值的文章主要介紹了分布式重試服務(wù)平臺(tái) Easy-Retry。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

1.簡(jiǎn)介

??在介紹這款開(kāi)源產(chǎn)品前先給大家介紹一個(gè)開(kāi)源組織:aizuda–愛(ài)組搭

分布式重試服務(wù)平臺(tái) Easy-Retry

1.1愛(ài)組搭官網(wǎng)

http://aizuda.com/

分布式重試服務(wù)平臺(tái) Easy-Retry

??可以看到Easy-Retry就是愛(ài)組搭的開(kāi)源項(xiàng)目之一。

1.2介紹

??在分布式系統(tǒng)大行其道的當(dāng)前,系統(tǒng)數(shù)據(jù)的準(zhǔn)確性和正確性是重大的挑戰(zhàn),基于CAP理論,采用柔性事務(wù),保障系統(tǒng)可用性以及數(shù)據(jù)的最終一致性成為技術(shù)共識(shí) 為了保障分布式服務(wù)的可用性,服務(wù)容錯(cuò)性,服務(wù)數(shù)據(jù)一致性 以及服務(wù)間調(diào)用的網(wǎng)絡(luò)問(wèn)題。依據(jù)"墨菲定律",增加核心流程重試, 數(shù)據(jù)核對(duì)校驗(yàn)成為提高系統(tǒng)魯棒性常用的技術(shù)方案。

特性

  • 易用性 業(yè)務(wù)接入成本小。避免依賴(lài)研發(fā)人員的技術(shù)水平,保障重試的穩(wěn)定性

  • 靈活性 能夠動(dòng)態(tài)調(diào)整配置,啟動(dòng)/停止任務(wù),以及終止運(yùn)行中的重試數(shù)據(jù)

  • 操作簡(jiǎn)單 分鐘上手,支持WEB頁(yè)面對(duì)重試數(shù)據(jù)CRUD操作。

  • 數(shù)據(jù)大盤(pán) 實(shí)時(shí)管控系統(tǒng)重試數(shù)據(jù)

  • 多樣化退避策略 Cron、固定間隔、等級(jí)觸發(fā)、隨機(jī)時(shí)間觸發(fā)

  • 容器化部署 服務(wù)端支持docker容器部署

  • 高性能調(diào)度平臺(tái) 支持服務(wù)端節(jié)點(diǎn)動(dòng)態(tài)擴(kuò)容和縮容

  • 多樣化重試類(lèi)型 支持ONLY_LOCAL、ONLY_REMOTE、LOCAL_REMOTE多種重試類(lèi)型

  • 重試數(shù)據(jù)管理 可以做到重試數(shù)據(jù)不丟失、重試數(shù)據(jù)一鍵回放

  • 支持多樣化的告警方式 郵箱、企業(yè)微信、釘釘、飛書(shū)

1.3 相關(guān)地址

easy-retry官方文檔地址

https://www.easyretry.com/

項(xiàng)目地址

https://toscode.mulanos.cn/aizuda/easy-retry

gitHub地址

https://github.com/aizuda/easy-retry

字節(jié)跳動(dòng): 如何優(yōu)雅地重試

https://juejin.cn/post/6914091859463634951

java優(yōu)雅重試機(jī)制spring-retry

https://mp.weixin.qq.com/s/vqmON5EOT17YDVLo-1JLNQ

2.架構(gòu)

2.1系統(tǒng)架構(gòu)圖

分布式重試服務(wù)平臺(tái) Easy-Retry

2.2 客戶(hù)端與服務(wù)端數(shù)據(jù)交互圖

分布式重試服務(wù)平臺(tái) Easy-Retry

3.業(yè)內(nèi)成熟重試組件對(duì)比

區(qū)別 SpringRetry GuavaRetry EasyRetry
編程語(yǔ)言 Java Java Java
退避策略 支持多種策略 支持多種策略 支持多種策略
依賴(lài)生態(tài) Spring 框架 不依賴(lài)任何框架 Spring框架、GuavaRetry
重試類(lèi)型 內(nèi)存重試 內(nèi)存重試 多種策略 內(nèi)存重試+服務(wù)端重試
存儲(chǔ)介質(zhì) 內(nèi)存 內(nèi)存 內(nèi)存+數(shù)據(jù)庫(kù)
是否管控重試流量 支持多維度管控(單機(jī)重試管控、鏈路重試管控、重試流速管控等)
數(shù)據(jù)安全 會(huì)丟失重試數(shù)據(jù) 會(huì)丟失重試數(shù)據(jù) 基于LOCAL_REMOTE或ONLY_REMOTE持久化數(shù)據(jù)
管理重試數(shù)據(jù) 不支持 不支持 支持暫停、停止、新增、修改重試數(shù)據(jù)

4.快速開(kāi)始

4.1 服務(wù)端項(xiàng)目部署

4.1.0 初始化腳本

doc/sql/easy_retry.sql

分布式重試服務(wù)平臺(tái) Easy-Retry

該sql腳本在項(xiàng)目中的位置如圖所示。

準(zhǔn)備easy_retry數(shù)據(jù),執(zhí)行上面的初始化腳本:

分布式重試服務(wù)平臺(tái) Easy-Retry

4.1.1 源碼部署

  • 下載源碼

     https://gitee.com/aizuda/easy-retry.git
     或
     https://github.com/aizuda/easy-retry.git
    
  • maven 打包鏡像

maven clean install
  • 修改配置
/easy-retry-server/src/main/resources/application.yml

配置文件修改:

spring:
  datasource:
    name: easy_retry
    url:  jdbc:mysql://localhost:3306/x_retry?useSSL=false&characterEncoding=utf8&useUnicode=true
    username: root
    password: root
    ....其他配置信息....
easy-retry:
  lastDays: 30 # 拉取重試數(shù)據(jù)的天數(shù)
  retryPullPageSize: 100 # 拉取重試數(shù)據(jù)的每批次的大小
  nettyPort: 1788  # 服務(wù)端netty端口
  totalPartition: 32  # 重試和死信表的分區(qū)總數(shù)
  • 啟動(dòng)
java -jar easy-retry-server.jar

4.1.2 Docker部署

  • 下載鏡像
docker pull byteblogs/easy-retry:1.5.0
  • 創(chuàng)建容器并運(yùn)行
/**
* 如需自定義 mysql 等配置,可通過(guò) "-e PARAMS" 指定,參數(shù)格式 PARAMS="--key1=value1  --key2=value2" ;
* 配置項(xiàng)參考文件:/easy-retry-server/src/main/resources/application.yml
* 如需自定義 JVM內(nèi)存參數(shù) 等配置,可通過(guò) "-e JAVA_OPTS" 指定,參數(shù)格式 JAVA_OPTS="-Xmx512m" ;
*/
docker run \
  -e PARAMS="--spring.datasource.username=root --spring.datasource.password=root  --spring.datasource.url=jdbc:mysql://IP:3306/easy_retry?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai " \
  -p 8080:8080 \
  -p 1788:1788 \
  --name easy-retry-server-1  \
  -d byteblogs/easy-retry:1.5.0

如果你已經(jīng)正確啟動(dòng)系統(tǒng)了,那么你可以輸入以下地址就可以進(jìn)入管理系統(tǒng)了

http://localhost:8080

后臺(tái)體驗(yàn)地址

地址: http://preview.easyretry.com/ 
賬號(hào): admin 密碼: admin

分布式重試服務(wù)平臺(tái) Easy-Retry

4.2 客戶(hù)端集成配置

4.2.1 添加依賴(lài)

項(xiàng)目中引入依賴(lài)

<dependency>
    <groupId>com.aizuda</groupId>
    <artifactId>easy-retry-client-starter</artifactId>
    <version>1.5.0</version>
</dependency>

4.2.2 配置

啟動(dòng)類(lèi)上添加注解開(kāi)啟easy-retry功能

@SpringBootApplication
@EnableEasyRetry(group = "example_group")
public class ExampleApplication {
    public static void main(String[] args) {
        SpringApplication.run(ExampleApplication.class, args);
    }
}

配置服務(wù)地址:

easy-retry:
  server: 
    host: 127.0.0.1 #服務(wù)端的地址建議使用域名
    port: 1788 #服務(wù)端netty的端口號(hào)

4.2.3 基于@Retryable注解實(shí)現(xiàn)重試

為需要重試的方法添加重試注解

@Retryable(scene = "errorMethodForLocalAndRemote", localTimes = 3, retryStrategy = RetryType.LOCAL_REMOTE)
public String errorMethodForLocalAndRemote(String name) {
    double i = 1 / 0;
    return "這是一個(gè)簡(jiǎn)單的異常方法";
}

4.2.4 Retryable 詳解

屬性 類(lèi)型 必須指定 默認(rèn)值 描述
scene String 無(wú) 場(chǎng)景
include Throwable 無(wú) 包含的異常
exclude Throwable 無(wú) 排除的異常
retryStrategy RetryType LOCAL_REMOTE 重試策略
retryMethod RetryMethod RetryAnnotationMethod 重試處理入口
idempotentId IdempotentIdGenerate SimpleIdempotentIdGenerate 冪等id生成器,默認(rèn)的idempotentId生成器{@link SimpleIdempotentIdGenerate} 對(duì)所有參數(shù)進(jìn)行MD5
retryCompleteCallback RetryCompleteCallback SimpleRetryCompleteCallback 服務(wù)端重試完成(重試成功、重試到達(dá)最大次數(shù))回調(diào)客戶(hù)端
isThrowException boolean true 本地重試完成后是否拋出異常
bizNo String 無(wú) bizNo spel表達(dá)式(opens new window)
localTimes int 3 本地重試次數(shù) 次數(shù)必須大于等于1
localInterval int 2 本地重試間隔時(shí)間(s)
timeout long 60 * 1000 同步(async:false)上報(bào)數(shù)據(jù)需要配置超時(shí)時(shí)間
unit TimeUnit TimeUnit.MILLISECONDS 超時(shí)時(shí)間單位
forceReport boolean false 是否強(qiáng)制上報(bào)數(shù)據(jù)到服務(wù)端

4.2.5自定義生成重試任務(wù)

注意:生成重試任務(wù)是將任務(wù)在客戶(hù)端創(chuàng)建并上報(bào)到服務(wù)端,由服務(wù)端調(diào)度并通知客戶(hù)端進(jìn)行重試

ExecutorMethodRegister 詳解

屬性 類(lèi)型 必須指定 默認(rèn)值 描述
scene String 無(wú) 場(chǎng)景
include Throwable 無(wú) 包含的異常
exclude Throwable 無(wú) 排除的異常
retryStrategy RetryType LOCAL_REMOTE 重試策略
retryMethod RetryMethod RetryAnnotationMethod 重試處理入口
idempotentId IdempotentIdGenerate SimpleIdempotentIdGenerate 冪等id生成器,默認(rèn)的idempotentId生成器{@link SimpleIdempotentIdGenerate} 對(duì)所有參數(shù)進(jìn)行MD5
retryCompleteCallback RetryCompleteCallback SimpleRetryCompleteCallback 服務(wù)端重試完成(重試成功、重試到達(dá)最大次數(shù))回調(diào)客戶(hù)端
bizNo String 無(wú) bizNo spel表達(dá)式
async boolean true 異步上報(bào)數(shù)據(jù)到服務(wù)端
timeout long 60 * 1000 同步(async:false)上報(bào)數(shù)據(jù)需要配置超時(shí)時(shí)間
unit TimeUnit TimeUnit.MILLISECONDS 超時(shí)時(shí)間單位
forceReport boolean false 是否強(qiáng)制上報(bào)數(shù)據(jù)到服務(wù)端

新建一個(gè)自定義任務(wù)執(zhí)行器

// 這個(gè)一個(gè)自定義任務(wù)執(zhí)行器
@ExecutorMethodRegister(scene = CustomSyncCreateTask.SCENE, async = false, timeout = 10000, unit = TimeUnit.MILLISECONDS, forceReport = true)
@Slf4j
public class CustomSyncCreateTask implements ExecutorMethod {

    public static final String SCENE = "customSyncCreateTask";

    @Override
    public Object doExecute(Object obj) {
        return "測(cè)試成功";
    }

}

在代碼中執(zhí)行重試

public void generateAsyncTaskTest() throws InterruptedException {

        Cat cat = new Cat();
        cat.setName("zsd");
        Zoo zoo = new Zoo();
        zoo.setNow(LocalDateTime.now());
        EasyRetryTemplate retryTemplate = RetryTaskTemplateBuilder.newBuilder()
            .withExecutorMethod(CustomAsyncCreateTask.class)
            .withParam(zoo)
            .withScene(CustomAsyncCreateTask.SCENE)
            .build();

        retryTemplate.executeRetry();

        Thread.sleep(90000);
    }

??ExecutorMethodRegister 這個(gè)也是一個(gè)注解,這個(gè)我猜測(cè)是跟手動(dòng)重試相關(guān)。

5.源碼賞析

5.1 客戶(hù)端自動(dòng)裝配入口

package com.aizuda.easy.retry.client.starter;

import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;

@Configuration
@ComponentScan("com.aizuda.easy.retry.client.core")
@ConditionalOnProperty(prefix = "easy-retry", name = "enabled", havingValue = "true")
public class EasyRetryClientAutoConfiguration {

}

??該自動(dòng)裝配類(lèi)會(huì)將com.aizuda.easy.retry.client.core核心包下交給springBoot自動(dòng)注入和管理。

5.2 Netty 客戶(hù)端

分布式重試服務(wù)平臺(tái) Easy-Retry

5.3 客戶(hù)端注冊(cè)掃描Retryable和ExecutorMethodRegister

分布式重試服務(wù)平臺(tái) Easy-Retry

這兩個(gè)注解的解析最終會(huì)被放到RetryerInfoCache這個(gè)類(lèi)的的一個(gè)table中:

public class RetryerInfoCache {

    private static Table<String, String, RetryerInfo> RETRY_HANDLER_REPOSITORY = HashBasedTable.create();

    public static RetryerInfo put(RetryerInfo retryerInfo) {
       return RETRY_HANDLER_REPOSITORY.put(retryerInfo.getScene(), retryerInfo.getExecutorClassName(), retryerInfo);
    }

    public static RetryerInfo get(String sceneName, String executorClassName) {
        return RETRY_HANDLER_REPOSITORY.get(sceneName, executorClassName);
    }

}

??可以看出注冊(cè)掃描信息始終是在內(nèi)存中,沒(méi)有上報(bào)給服務(wù)端的。

5.4 客戶(hù)端重試觸發(fā)入口

分布式重試服務(wù)平臺(tái) Easy-Retry

??從上圖可以看出重試是在加了Retryable注解的方法上采用Aspect的AOP動(dòng)態(tài)代理,當(dāng)目標(biāo)方法被調(diào)用前會(huì)被攔截,AOP的思想就是對(duì)目標(biāo)對(duì)象的代理和增強(qiáng)

@Aspect 注解用于標(biāo)識(shí)或者描述AOP中的切面類(lèi)型,基于切面類(lèi)型構(gòu)建的對(duì)象用于為目標(biāo)對(duì)象進(jìn)行功能擴(kuò)展或控制目標(biāo)對(duì)象的執(zhí)行。

@Pointcut 注解用于描述切面中的方法,并定義切面中的切入點(diǎn),后面會(huì)對(duì)切入點(diǎn)表達(dá)式進(jìn)行詳解

@Around注解 用于描述切面中方法,這樣的方法會(huì)被認(rèn)為是一個(gè)環(huán)繞通知,后面會(huì)對(duì)aop各個(gè)通知類(lèi)型詳解

ProceedingJoinPoint 類(lèi)為一個(gè)連接點(diǎn)類(lèi)型,此類(lèi)型的對(duì)象用于封裝要執(zhí)行的目標(biāo)方法相關(guān)的一些信息。一般用于@Around注解描述的方法參數(shù)。

通知類(lèi)型
spring中定義了五種類(lèi)型的通知,基于A(yíng)spectJ框架標(biāo)準(zhǔn),它們分別是:

環(huán)繞通知 (@Around) : 包圍一個(gè)連接點(diǎn)的通知,最強(qiáng)大的一種通知類(lèi)型,環(huán)繞通知可以在方法前后完成自定義的行為,它可以自己選擇是否繼續(xù)執(zhí)行連接點(diǎn)或直接返回方法的返回值或拋異常結(jié)束執(zhí)行

前置通知 (@Before) : 在指定連接點(diǎn)(join point)前執(zhí)行的通知,但它不能阻止連接點(diǎn)前的執(zhí)行(除非拋異常)

后置通知 (@After): 在指定連接點(diǎn)(join point)退出的時(shí)候執(zhí)行(不管是正常返回還是異常退出)

返回通知 (@AfterReturning) : 在指定連接點(diǎn)(join point)正常返回后執(zhí)行,如果拋出異常則不執(zhí)行(和After通知同時(shí)存在則在A(yíng)fter通知執(zhí)行完之后再執(zhí)行)

異常通知 (@AfterThrowing) : 在目標(biāo)方法拋出異常退出時(shí)執(zhí)行

通知執(zhí)行順序

假如這些通知全部寫(xiě)到一個(gè)切面對(duì)象中,其執(zhí)行順序及過(guò)程,如圖:

分布式重試服務(wù)平臺(tái) Easy-Retry

進(jìn)入目標(biāo)方法前先進(jìn)入環(huán)繞通知(@Aroud)
在環(huán)繞通知里調(diào)用連接點(diǎn)(joinPoint)的proceed方法后進(jìn)入前置通知(@Before)
前置通知執(zhí)行完后進(jìn)入目標(biāo)方法(targetMethod)
目標(biāo)方法邏輯執(zhí)行完進(jìn)入環(huán)繞通知里調(diào)用proceed方法后的邏輯
環(huán)繞通知全部執(zhí)行完后進(jìn)入后置通知(@After)
后置通知執(zhí)行完后若目標(biāo)方法正常返回后則進(jìn)入返回通知(@AfterReturning),若目標(biāo)方法拋出異常則進(jìn)入異常通知(@AfterThrowing)
注:若是存在環(huán)繞通知(@Aroud)一定要調(diào)用連接點(diǎn)的proceed()方法,否則會(huì)在環(huán)繞通知后直接返回,跳過(guò)目標(biāo)方法。

around環(huán)繞通知源碼如下:

package com.aizuda.easy.retry.client.core.intercepter;

import cn.hutool.core.util.IdUtil;
import cn.hutool.core.util.StrUtil;
import com.aizuda.easy.retry.client.core.cache.GroupVersionCache;
import com.aizuda.easy.retry.client.core.config.EasyRetryProperties;
import com.aizuda.easy.retry.client.core.exception.EasyRetryClientException;
import com.aizuda.easy.retry.client.core.intercepter.RetrySiteSnapshot.EnumStage;
import com.aizuda.easy.retry.client.core.strategy.RetryStrategy;
import com.aizuda.easy.retry.client.core.annotation.Retryable;
import com.aizuda.easy.retry.client.core.retryer.RetryerResultContext;
import com.aizuda.easy.retry.common.core.alarm.Alarm;
import com.aizuda.easy.retry.common.core.alarm.AlarmContext;
import com.aizuda.easy.retry.common.core.alarm.AltinAlarmFactory;
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.enums.NotifySceneEnum;
import com.aizuda.easy.retry.common.core.enums.RetryResultStatusEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.common.core.model.EasyRetryHeaders;
import com.aizuda.easy.retry.common.core.util.EnvironmentUtils;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.model.dto.ConfigDTO;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.core.Ordered;
import org.springframework.core.env.StandardEnvironment;
import org.springframework.stereotype.Component;

import java.lang.reflect.Method;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Objects;
import java.util.UUID;

/**
 * @author: www.byteblogs.com
 * @date : 2022-03-03 11:41
 */
@Aspect
@Component
@Slf4j
public class RetryAspect implements Ordered {

    private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
    private static String retryErrorMoreThresholdTextMessageFormatter =
            "<font face=\"微軟雅黑\" color=#ff0000 size=4>{}環(huán)境 重試組件異常</font>  \r\n" +
                    "> 名稱(chēng):{}  \r\n" +
                    "> 時(shí)間:{}  \r\n" +
                    "> 異常:{}  \n"
            ;

    @Autowired
    @Qualifier("localRetryStrategies")
    private RetryStrategy retryStrategy;
    @Autowired
    private AltinAlarmFactory altinAlarmFactory;
    @Autowired
    private StandardEnvironment standardEnvironment;

    @Around("@annotation(com.aizuda.easy.retry.client.core.annotation.Retryable)")
    public Object around(ProceedingJoinPoint point) throws Throwable {
        String traceId = UUID.randomUUID().toString();

        LogUtils.debug(log,"Start entering the around method traceId:[{}]", traceId);
        Retryable retryable = getAnnotationParameter(point);
        String executorClassName = point.getTarget().getClass().getName();
        String methodEntrance = getMethodEntrance(retryable, executorClassName);
        if (StrUtil.isBlank(RetrySiteSnapshot.getMethodEntrance())) {
            RetrySiteSnapshot.setMethodEntrance(methodEntrance);
        }

        Throwable throwable = null;
        Object result = null;
        RetryerResultContext retryerResultContext;
        try {
            result = point.proceed();
        } catch (Throwable t) {
            throwable = t;
        } finally {

            LogUtils.debug(log,"Start retrying. traceId:[{}] scene:[{}] executorClassName:[{}]", traceId, retryable.scene(), executorClassName);
            // 入口則開(kāi)始處理重試
            retryerResultContext = doHandlerRetry(point, traceId, retryable, executorClassName, methodEntrance, throwable);
        }

        LogUtils.debug(log,"Method return value is [{}]. traceId:[{}]", result, traceId, throwable);

        // 若是重試完成了, 則判斷是否返回重試完成后的數(shù)據(jù)
        if (Objects.nonNull(retryerResultContext)) {
            // 重試成功直接返回結(jié)果 若注解配置了isThrowException=false 則不拋出異常
            if (retryerResultContext.getRetryResultStatusEnum().getStatus().equals(RetryResultStatusEnum.SUCCESS.getStatus())
            || !retryable.isThrowException()) {
                return retryerResultContext.getResult();
            }
        }

        if (throwable != null) {
            throw throwable;
        } else {
            return result;
        }

    }

    private RetryerResultContext doHandlerRetry(ProceedingJoinPoint point, String traceId, Retryable retryable, String executorClassName, String methodEntrance, Throwable throwable) {

        if (!RetrySiteSnapshot.isMethodEntrance(methodEntrance)
                || RetrySiteSnapshot.isRunning()
                || Objects.isNull(throwable)
                // 重試流量不開(kāi)啟重試
                || RetrySiteSnapshot.isRetryFlow()
                // 下游響應(yīng)不重試碼,不開(kāi)啟重試
                || RetrySiteSnapshot.isRetryForStatusCode()
        ) {
            if (!RetrySiteSnapshot.isMethodEntrance(methodEntrance)) {
                LogUtils.debug(log, "Non-method entry does not enable local retries. traceId:[{}] [{}]", traceId, RetrySiteSnapshot.getMethodEntrance());
            } else if (RetrySiteSnapshot.isRunning()) {
                LogUtils.debug(log, "Existing running retry tasks do not enable local retries. traceId:[{}] [{}]", traceId, EnumStage.valueOfStage(RetrySiteSnapshot.getStage()));
            } else if (Objects.isNull(throwable)) {
                LogUtils.debug(log, "No exception, no local retries. traceId:[{}]", traceId);
            } else if (RetrySiteSnapshot.isRetryFlow()) {
                LogUtils.debug(log, "Retry traffic does not enable local retries. traceId:[{}] [{}]", traceId,  RetrySiteSnapshot.getRetryHeader());
            } else if (RetrySiteSnapshot.isRetryForStatusCode()) {
                LogUtils.debug(log, "Existing exception retry codes do not enable local retries. traceId:[{}]", traceId);
            } else {
                LogUtils.debug(log, "Unknown situations do not enable local retry scenarios. traceId:[{}]", traceId);
            }
            return null;
        }

        return openRetry(point, traceId, retryable, executorClassName, throwable);
    }

    private RetryerResultContext openRetry(ProceedingJoinPoint point, String traceId, Retryable retryable, String executorClassName, Throwable throwable) {

        try {

            // 標(biāo)識(shí)重試流量
            initHeaders(retryable);

            RetryerResultContext context = retryStrategy.openRetry(retryable.scene(), executorClassName, point.getArgs());
            LogUtils.info(log,"local retry result. traceId:[{}] message:[{}]", traceId, context);
            if (RetryResultStatusEnum.SUCCESS.getStatus().equals(context.getRetryResultStatusEnum().getStatus())) {
                LogUtils.debug(log, "local retry successful. traceId:[{}] result:[{}]", traceId, context.getResult());
            }

            return context;
        } catch (Exception e) {
            LogUtils.error(log,"retry component handling exception,traceId:[{}]", traceId,  e);

            // 預(yù)警
            sendMessage(e);

        } finally {
            RetrySiteSnapshot.removeAll();
        }

        return null;
    }

    private void initHeaders(final Retryable retryable) {

        EasyRetryHeaders easyRetryHeaders = new EasyRetryHeaders();
        easyRetryHeaders.setEasyRetry(Boolean.TRUE);
        easyRetryHeaders.setEasyRetryId(IdUtil.getSnowflakeNextIdStr());
        easyRetryHeaders.setDdl(GroupVersionCache.getDdl(retryable.scene()));
        RetrySiteSnapshot.setRetryHeader(easyRetryHeaders);
    }

    private void sendMessage(Exception e) {

        try {
            ConfigDTO.Notify notifyAttribute = GroupVersionCache.getNotifyAttribute(NotifySceneEnum.CLIENT_COMPONENT_ERROR.getNotifyScene());
            if (Objects.nonNull(notifyAttribute)) {
                AlarmContext context = AlarmContext.build()
                        .text(retryErrorMoreThresholdTextMessageFormatter,
                                EnvironmentUtils.getActiveProfile(),
                                EasyRetryProperties.getGroup(),
                                LocalDateTime.now().format(formatter),
                                e.getMessage())
                        .title("retry component handling exception:[{}]", EasyRetryProperties.getGroup())
                        .notifyAttribute(notifyAttribute.getNotifyAttribute());

                Alarm<AlarmContext> alarmType = altinAlarmFactory.getAlarmType(notifyAttribute.getNotifyType());
                alarmType.asyncSendMessage(context);
            }
        } catch (Exception e1) {
            LogUtils.error(log, "Client failed to send component exception alert.", e1);
        }

    }

    public String getMethodEntrance(Retryable retryable, String executorClassName) {

        if (Objects.isNull(retryable)) {
            return StrUtil.EMPTY;
        }

        return retryable.scene().concat("_").concat(executorClassName);
    }

    private Retryable getAnnotationParameter(ProceedingJoinPoint point) {
        String methodName = point.getSignature().getName();
        Class<?> classTarget = point.getTarget().getClass();
        Class<?>[] par = ((MethodSignature) point.getSignature()).getParameterTypes();
        Method objMethod = null;
        try {
            objMethod = classTarget.getMethod(methodName, par);
        } catch (NoSuchMethodException e) {
            throw new EasyRetryClientException("注解配置異常:[{}}", methodName);
        }
        return objMethod.getAnnotation(Retryable.class);
    }

    @Override
    public int getOrder() {
        String order = standardEnvironment
            .getProperty("easy-retry.aop.order", String.valueOf(Ordered.HIGHEST_PRECEDENCE));
        return Integer.parseInt(order);
    }
}

5.5 客戶(hù)端重試類(lèi)型

分布式重試服務(wù)平臺(tái) Easy-Retry

5.6 客戶(hù)端重試執(zhí)行器GuavaRetryExecutor

分布式重試服務(wù)平臺(tái) Easy-Retry

??最終重試會(huì)調(diào)用到GuavaRetryExecutor的call方法,Easy-Retry本質(zhì)上就是對(duì)GuavaRetry的深度封裝,做了一些可視化和告警的能力。

5.7 客戶(hù)端上報(bào)方式

分布式重試服務(wù)平臺(tái) Easy-Retry

客戶(hù)端上報(bào)方式分為:

異步上報(bào)數(shù)據(jù):該方式借鑒了sentinel的滑動(dòng)窗口的RetryTaskDTO做了監(jiān)聽(tīng)然后進(jìn)行call重試

同步上報(bào)數(shù)據(jù):同client將重試任務(wù)上報(bào)給服務(wù)端

        NettyResult result = client.reportRetryInfo(Collections.singletonList(retryTaskDTO));

分布式重試服務(wù)平臺(tái) Easy-Retry

5.8 netty server

分布式重試服務(wù)平臺(tái) Easy-Retry

5.8 Handler

Handler可以大體分為兩類(lèi):處理Get請(qǐng)求的Handler和處理Post請(qǐng)求的Handler,客戶(hù)端心跳、版本和上報(bào)任務(wù)都屬于Get請(qǐng)求或者是Post請(qǐng)求。

分布式重試服務(wù)平臺(tái) Easy-Retry

5.9 服務(wù)端向客戶(hù)端發(fā)起重試

??服務(wù)端的亮點(diǎn)就是使用了akka,Akka是一個(gè)開(kāi)發(fā)庫(kù)和運(yùn)行環(huán)境,可以用于構(gòu)建高并發(fā)、分布式、可容錯(cuò)、事件驅(qū)動(dòng)的基于JVM的應(yīng)用,使構(gòu)建高并發(fā)的分布式應(yīng)用更加容易服務(wù)端在啟動(dòng)前會(huì)做一個(gè)scan,把客戶(hù)端上報(bào)給服務(wù)端的重試數(shù)據(jù)全部掃描出來(lái):

AbstractScanGroup類(lèi)中的doScan:

protected void doScan(final ScanTaskDTO scanTaskDTO) {

        LocalDateTime defLastAt = LocalDateTime.now().minusDays(systemProperties.getLastDays());

        String groupName = scanTaskDTO.getGroupName();
        LocalDateTime lastAt = Optional.ofNullable(getLastAt(groupName)).orElse(defLastAt);

        // 掃描當(dāng)前Group 待重試的數(shù)據(jù)
        List<RetryTask> list = retryTaskAccessProcessor.listAvailableTasks(groupName, lastAt, systemProperties.getRetryPullPageSize(),
            getTaskType());

        if (!CollectionUtils.isEmpty(list)) {

            // 更新拉取的最大的創(chuàng)建時(shí)間
            putLastAt(scanTaskDTO.getGroupName(), list.get(list.size() - 1).getCreateDt());

            for (RetryTask retryTask : list) {

                // 重試次數(shù)累加
                retryCountIncrement(retryTask);

                RetryContext retryContext = builderRetryContext(groupName, retryTask);
                RetryExecutor executor = builderResultRetryExecutor(retryContext);

                if (!executor.filter()) {
                    continue;
                }

                productExecUnitActor(executor);
            }
        } else {
            // 數(shù)據(jù)為空則休眠5s
            try {
                Thread.sleep((DispatchService.PERIOD / 2) * 1000);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }

            putLastAt(groupName, defLastAt);
        }

    }

??AbstractScanGroup該抽象類(lèi)有兩個(gè)子類(lèi):ScanCallbackGroupActor和ScanGroupActor

   productExecUnitActor(executor); //抽象父類(lèi)中定義的方法
   private void productExecUnitActor(RetryExecutor retryExecutor) {
        String groupIdHash = retryExecutor.getRetryContext().getRetryTask().getGroupName();
        Long retryId = retryExecutor.getRetryContext().getRetryTask().getId();
        idempotentStrategy.set(groupIdHash, retryId.intValue());

        // 重試成功回調(diào)客戶(hù)端
        ActorRef actorRef = getActorRef();
        actorRef.tell(retryExecutor, actorRef);
    }
   // 兩個(gè)子類(lèi)中都有該重試客戶(hù)端的方法
   @Override
    protected ActorRef getActorRef() {
        return ActorGenerator.execUnitActor();
    }
   //getActorRef()方法會(huì)調(diào)用ActorGenerator類(lèi)里面的方法來(lái)生成一個(gè)Actor生成器,通過(guò)akka的屬性然后將這個(gè)ExecUnitActor執(zhí)行器的類(lèi)注入到spring容器中
   getDispatchExecUnitActorSystem().actorOf(getSpringExtension().props(ExecUnitActor.BEAN_NAME)

??akka采用消息的發(fā)布訂閱模型,生產(chǎn)者發(fā)布消息,消費(fèi)者只訂閱自己感興趣的主題,然后接收消息,這樣就具有解耦的功能。

??ExecUnitActor類(lèi)里面的createReceive()方法才是具體給客戶(hù)端發(fā)送重試請(qǐng)求的執(zhí)行者:

package com.aizuda.easy.retry.server.support.dispatch.actor.exec;

import akka.actor.AbstractActor;
import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.client.model.DispatchRetryDTO;
import com.aizuda.easy.retry.client.model.DispatchRetryResultDTO;
import com.aizuda.easy.retry.common.core.constant.SystemConstants;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.common.core.model.Result;
import com.aizuda.easy.retry.common.core.model.EasyRetryHeaders;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.exception.EasyRetryServerException;
import com.aizuda.easy.retry.server.persistence.mybatis.mapper.RetryTaskLogMapper;
import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTask;
import com.aizuda.easy.retry.server.persistence.mybatis.po.RetryTaskLog;
import com.aizuda.easy.retry.server.persistence.mybatis.po.ServerNode;
import com.aizuda.easy.retry.server.service.convert.RetryTaskLogConverter;
import com.aizuda.easy.retry.server.support.IdempotentStrategy;
import com.aizuda.easy.retry.server.support.context.MaxAttemptsPersistenceRetryContext;
import com.aizuda.easy.retry.server.support.retry.RetryExecutor;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang.StringUtils;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.stereotype.Component;
import org.springframework.web.client.RestTemplate;

import java.text.MessageFormat;
import java.time.LocalDateTime;
import java.util.Objects;
import java.util.concurrent.Callable;

/**
 * 重試結(jié)果執(zhí)行器
 *
 * @author www.byteblogs.com
 * @date 2021-10-30
 * @since 2.0
 */
@Component("ExecUnitActor")
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
@Slf4j
public class ExecUnitActor extends AbstractActor  {

    public static final String BEAN_NAME = "ExecUnitActor";
    public static final String URL = "http://{0}:{1}/{2}/retry/dispatch/v1";

    @Autowired
    @Qualifier("bitSetIdempotentStrategyHandler")
    private IdempotentStrategy<String, Integer> idempotentStrategy;
    @Autowired
    private RetryTaskLogMapper retryTaskLogMapper;
    @Autowired
    private RestTemplate restTemplate;

    @Override
    public Receive createReceive() {
        return receiveBuilder().match(RetryExecutor.class, retryExecutor -> {

            MaxAttemptsPersistenceRetryContext context = (MaxAttemptsPersistenceRetryContext) retryExecutor.getRetryContext();
            RetryTask retryTask = context.getRetryTask();
            ServerNode serverNode = context.getServerNode();

            RetryTaskLog retryTaskLog = RetryTaskLogConverter.INSTANCE.toRetryTask(retryTask);
            retryTaskLog.setErrorMessage(StringUtils.EMPTY);

            try {

                if (Objects.nonNull(serverNode)) {
                    retryExecutor.call((Callable<Result<DispatchRetryResultDTO>>) () -> callClient(retryTask, retryTaskLog, serverNode));
                    if (context.hasException()) {
                        retryTaskLog.setErrorMessage(context.getException().getMessage());
                    }
                } else {
                    retryTaskLog.setErrorMessage("暫無(wú)可用的客戶(hù)端POD");
                }

            }catch (Exception e) {
                LogUtils.error(log, "回調(diào)客戶(hù)端失敗 retryTask:[{}]", JsonUtil.toJsonString(retryTask), e);
                retryTaskLog.setErrorMessage(StringUtils.isBlank(e.getMessage()) ? StringUtils.EMPTY : e.getMessage());
            } finally {

                // 清除冪等標(biāo)識(shí)位
                idempotentStrategy.clear(retryTask.getGroupName(), retryTask.getId().intValue());
                getContext().stop(getSelf());

                // 記錄重試日志
                retryTaskLog.setCreateDt(LocalDateTime.now());
                retryTaskLog.setId(null);
                Assert.isTrue(1 ==  retryTaskLogMapper.insert(retryTaskLog),
                    () -> new EasyRetryServerException("新增重試日志失敗"));
            }

        }).build();
    }

    /**
     * 調(diào)用客戶(hù)端
     *
     * @param retryTask {@link RetryTask} 需要重試的數(shù)據(jù)
     * @return 重試結(jié)果返回值
     */
    private Result<DispatchRetryResultDTO> callClient(RetryTask retryTask, RetryTaskLog retryTaskLog, ServerNode serverNode) {

        DispatchRetryDTO dispatchRetryDTO = new DispatchRetryDTO();
        dispatchRetryDTO.setIdempotentId(retryTask.getIdempotentId());
        dispatchRetryDTO.setScene(retryTask.getSceneName());
        dispatchRetryDTO.setExecutorName(retryTask.getExecutorName());
        dispatchRetryDTO.setArgsStr(retryTask.getArgsStr());
        dispatchRetryDTO.setUniqueId(retryTask.getUniqueId());
        dispatchRetryDTO.setRetryCount(retryTask.getRetryCount());

        // 設(shè)置header
        HttpHeaders requestHeaders = new HttpHeaders();
        EasyRetryHeaders easyRetryHeaders = new EasyRetryHeaders();
        easyRetryHeaders.setEasyRetry(Boolean.TRUE);
        easyRetryHeaders.setEasyRetryId(retryTask.getUniqueId());
        requestHeaders.add(SystemConstants.EASY_RETRY_HEAD_KEY, JsonUtil.toJsonString(easyRetryHeaders));

        HttpEntity<DispatchRetryDTO> requestEntity = new HttpEntity<>(dispatchRetryDTO, requestHeaders);

        String format = MessageFormat.format(URL, serverNode.getHostIp(), serverNode.getHostPort().toString(), serverNode.getContextPath());
        Result<DispatchRetryResultDTO> result = restTemplate.postForObject(format, requestEntity, Result.class);

        if (1 != result.getStatus()  && StringUtils.isNotBlank(result.getMessage())) {
            retryTaskLog.setErrorMessage(result.getMessage());
        } else {
            DispatchRetryResultDTO data = JsonUtil.parseObject(JsonUtil.toJsonString(result.getData()), DispatchRetryResultDTO.class);
            result.setData(data);
            if (Objects.nonNull(data) && StringUtils.isNotBlank(data.getExceptionMsg())) {
                retryTaskLog.setErrorMessage(data.getExceptionMsg());
            }

        }

        LogUtils.info(log, "請(qǐng)求客戶(hù)端 response:[{}}] ", JsonUtil.toJsonString(result));
        return result;

    }

}

??可以看出服務(wù)端給客戶(hù)端發(fā)送重試是使用的是:restTemplate的方式

5.10 服務(wù)端手動(dòng)下發(fā)重試策略

@PostMapping("/generate/idempotent-id")
    public Result<String> idempotentIdGenerate(@RequestBody @Validated GenerateRetryIdempotentIdVO generateRetryIdempotentIdVO){
        return new Result<>(retryTaskService.idempotentIdGenerate(generateRetryIdempotentIdVO));
}

RetryTaskServiceImplle類(lèi)的idempotentIdGenerate()方法:

@Override
    public String idempotentIdGenerate(final GenerateRetryIdempotentIdVO generateRetryIdempotentIdVO) {
        ServerNode serverNode = clientNodeAllocateHandler.getServerNode(generateRetryIdempotentIdVO.getGroupName());
        Assert.notNull(serverNode, () -> new EasyRetryServerException("生成idempotentId失敗: 不存在活躍的客戶(hù)端節(jié)點(diǎn)"));

        // 委托客戶(hù)端生成idempotentId
        String url = MessageFormat
            .format(URL, serverNode.getHostIp(), serverNode.getHostPort().toString(), serverNode.getContextPath());

        GenerateRetryIdempotentIdDTO generateRetryIdempotentIdDTO = new GenerateRetryIdempotentIdDTO();
        generateRetryIdempotentIdDTO.setGroup(generateRetryIdempotentIdVO.getGroupName());
        generateRetryIdempotentIdDTO.setScene(generateRetryIdempotentIdVO.getSceneName());
        generateRetryIdempotentIdDTO.setArgsStr(generateRetryIdempotentIdVO.getArgsStr());
        generateRetryIdempotentIdDTO.setExecutorName(generateRetryIdempotentIdVO.getExecutorName());

        HttpEntity<GenerateRetryIdempotentIdDTO> requestEntity = new HttpEntity<>(generateRetryIdempotentIdDTO);
        Result result = restTemplate.postForObject(url, requestEntity, Result.class);

        Assert.notNull(result, () -> new EasyRetryServerException("idempotentId生成失敗"));
        Assert.isTrue(1 == result.getStatus(), () -> new EasyRetryServerException("idempotentId生成失敗:請(qǐng)確保參數(shù)與執(zhí)行器名稱(chēng)正確"));

        return (String) result.getData();
    }

關(guān)鍵代碼如下:

    ServerNode serverNode = clientNodeAllocateHandler.getServerNode(generateRetryIdempotentIdVO.getGroupName());
     /**
     * 獲取分配的節(jié)點(diǎn),獲取服務(wù)端的節(jié)點(diǎn),服務(wù)端信息采用數(shù)據(jù)庫(kù)(server_node表就是記錄服務(wù)端的節(jié)點(diǎn)信息)做了一個(gè)集群,選擇一個(gè)服務(wù)端來(lái)執(zhí)行重試任務(wù)
     */
    public ServerNode getServerNode(String groupName) {

        GroupConfig groupConfig = configAccess.getGroupConfigByGroupName(groupName);
        List<ServerNode> serverNodes = serverNodeMapper.selectList(new LambdaQueryWrapper<ServerNode>().eq(ServerNode::getGroupName, groupName));

        if (CollectionUtils.isEmpty(serverNodes)) {
            return null;
        }

        ClientLoadBalance clientLoadBalanceRandom = ClientLoadBalanceManager.getClientLoadBalance(groupConfig.getRouteKey());

        String hostIp = clientLoadBalanceRandom.route(groupName, new TreeSet<>(serverNodes.stream().map(ServerNode::getHostIp).collect(Collectors.toSet())));
        return serverNodes.stream().filter(s -> s.getHostIp().equals(hostIp)).findFirst().get();
    }

ClientLoadBalanceManager類(lèi)的選擇客戶(hù)端節(jié)點(diǎn)的算法有如下幾種:

        CONSISTENT_HASH(1, new ClientLoadBalanceConsistentHash(100)), //一致性hash
        RANDOM(2, new ClientLoadBalanceRandom()), //隨機(jī)
        LRU(3, new ClientLoadBalanceLRU(100)), // LRU

5.11 客戶(hù)端接收服務(wù)端下發(fā)重試的端點(diǎn)RetryEndPoint

package com.aizuda.easy.retry.client.core.client;

import cn.hutool.core.lang.Assert;
import com.aizuda.easy.retry.client.core.IdempotentIdGenerate;
import com.aizuda.easy.retry.client.core.RetryArgSerializer;
import com.aizuda.easy.retry.client.core.cache.GroupVersionCache;
import com.aizuda.easy.retry.client.core.cache.RetryerInfoCache;
import com.aizuda.easy.retry.client.core.callback.RetryCompleteCallback;
import com.aizuda.easy.retry.client.core.exception.EasyRetryClientException;
import com.aizuda.easy.retry.client.core.intercepter.RetrySiteSnapshot;
import com.aizuda.easy.retry.client.core.retryer.RetryerInfo;
import com.aizuda.easy.retry.client.core.retryer.RetryerResultContext;
import com.aizuda.easy.retry.client.core.serializer.JacksonSerializer;
import com.aizuda.easy.retry.client.core.strategy.RetryStrategy;
import com.aizuda.easy.retry.client.model.GenerateRetryIdempotentIdDTO;
import com.aizuda.easy.retry.common.core.context.SpringContext;
import com.aizuda.easy.retry.common.core.enums.RetryResultStatusEnum;
import com.aizuda.easy.retry.common.core.enums.RetryStatusEnum;
import com.aizuda.easy.retry.common.core.log.LogUtils;
import com.aizuda.easy.retry.common.core.model.Result;
import com.aizuda.easy.retry.common.core.util.JsonUtil;
import com.aizuda.easy.retry.server.model.dto.ConfigDTO;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.aizuda.easy.retry.client.model.DispatchRetryDTO;
import com.aizuda.easy.retry.client.model.DispatchRetryResultDTO;
import com.aizuda.easy.retry.client.model.RetryCallbackDTO;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.util.ReflectionUtils;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;

import javax.servlet.http.HttpServletRequest;
import java.lang.reflect.Method;
import java.util.Objects;

/**
 * 服務(wù)端調(diào)調(diào)用客戶(hù)端進(jìn)行重試流量下發(fā)、配置變更通知等操作
 *
 * @author: www.byteblogs.com
 * @date : 2022-03-09 16:33
 */
@RestController
@RequestMapping("/retry")
@Slf4j
public class RetryEndPoint {

    @Autowired
    @Qualifier("remoteRetryStrategies")
    private RetryStrategy retryStrategy;

    /**
     * 服務(wù)端調(diào)度重試入口
     */
    @PostMapping("/dispatch/v1")
   public Result<DispatchRetryResultDTO> dispatch(@RequestBody DispatchRetryDTO executeReqDto) {

        RetryerInfo retryerInfo = RetryerInfoCache.get(executeReqDto.getScene(), executeReqDto.getExecutorName());
        if (Objects.isNull(retryerInfo)) {
            throw new EasyRetryClientException("場(chǎng)景:[{}]配置不存在", executeReqDto.getScene());
        }

        RetryArgSerializer retryArgSerializer = new JacksonSerializer();

        Object[] deSerialize = null;
        try {
            deSerialize = (Object[]) retryArgSerializer.deSerialize(executeReqDto.getArgsStr(), retryerInfo.getExecutor().getClass(), retryerInfo.getMethod());
        } catch (JsonProcessingException e) {
            throw new EasyRetryClientException("參數(shù)解析異常", e);
        }

        DispatchRetryResultDTO executeRespDto = new DispatchRetryResultDTO();

        try {
            ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
            HttpServletRequest request = Objects.requireNonNull(attributes).getRequest();
            request.setAttribute("attemptNumber", executeReqDto.getRetryCount());

            RetryerResultContext retryerResultContext = retryStrategy.openRetry(executeReqDto.getScene(), executeReqDto.getExecutorName(), deSerialize);

            if (RetrySiteSnapshot.isRetryForStatusCode()) {
                executeRespDto.setStatusCode(RetryResultStatusEnum.STOP.getStatus());

                // TODO 需要標(biāo)記是哪個(gè)系統(tǒng)不需要重試
                executeRespDto.setExceptionMsg("下游標(biāo)記不需要重試");
            } else {
                executeRespDto.setStatusCode(retryerResultContext.getRetryResultStatusEnum().getStatus());
                executeRespDto.setExceptionMsg(retryerResultContext.getMessage());
            }

            executeRespDto.setIdempotentId(executeReqDto.getIdempotentId());
            executeRespDto.setUniqueId(executeReqDto.getUniqueId());
            if (Objects.nonNull(retryerResultContext.getResult())) {
                executeRespDto.setResultJson(JsonUtil.toJsonString(retryerResultContext.getResult()));
            }


        } finally {
            RetrySiteSnapshot.removeAll();
        }

        return new Result<>(executeRespDto);
    }

    /**
     * 同步版本
     */
    @PostMapping("/sync/version/v1")
    public Result syncVersion(@RequestBody ConfigDTO configDTO) {
        GroupVersionCache.configDTO = configDTO;
        return new Result();
    }

    @PostMapping("/callback/v1")
    public Result callback(@RequestBody RetryCallbackDTO callbackDTO) {
        RetryerInfo retryerInfo = RetryerInfoCache.get(callbackDTO.getScene(), callbackDTO.getExecutorName());
        if (Objects.isNull(retryerInfo)) {
            throw new EasyRetryClientException("場(chǎng)景:[{}]配置不存在", callbackDTO.getScene());
        }

        RetryArgSerializer retryArgSerializer = new JacksonSerializer();

        Object[] deSerialize = null;
        try {
            deSerialize = (Object[]) retryArgSerializer.deSerialize(callbackDTO.getArgsStr(), retryerInfo.getExecutor().getClass(), retryerInfo.getMethod());
        } catch (JsonProcessingException e) {
            throw new EasyRetryClientException("參數(shù)解析異常", e);
        }

        Class<? extends RetryCompleteCallback> retryCompleteCallbackClazz = retryerInfo.getRetryCompleteCallback();
        RetryCompleteCallback retryCompleteCallback = SpringContext.getBeanByType(retryCompleteCallbackClazz);

        if (RetryStatusEnum.FINISH.getStatus().equals(callbackDTO.getRetryStatus())) {
            retryCompleteCallback.doSuccessCallback(retryerInfo.getScene(), retryerInfo.getExecutorClassName(), deSerialize);
        }

        if (RetryStatusEnum.MAX_RETRY_COUNT.getStatus().equals(callbackDTO.getRetryStatus())) {
            retryCompleteCallback.doMaxRetryCallback(retryerInfo.getScene(), retryerInfo.getExecutorClassName(), deSerialize);
        }

        return new Result();
    }

    /**
     * 手動(dòng)新增重試數(shù)據(jù),模擬生成idempotentId
     *
     * @param generateRetryIdempotentIdDTO 生成idempotentId模型
     * @return idempotentId
     */
    @PostMapping("/generate/idempotent-id/v1")
    public Result<String> idempotentIdGenerate(@RequestBody @Validated GenerateRetryIdempotentIdDTO generateRetryIdempotentIdDTO) {

        String scene = generateRetryIdempotentIdDTO.getScene();
        String executorName = generateRetryIdempotentIdDTO.getExecutorName();
        String argsStr = generateRetryIdempotentIdDTO.getArgsStr();

        RetryerInfo retryerInfo = RetryerInfoCache.get(scene, executorName);
        Assert.notNull(retryerInfo, ()-> new EasyRetryClientException("重試信息不存在 scene:[{}] executorName:[{}]", scene, executorName));

        Method executorMethod = retryerInfo.getMethod();

        RetryArgSerializer retryArgSerializer = new JacksonSerializer();

        Object[] deSerialize = null;
        try {
            deSerialize = (Object[]) retryArgSerializer.deSerialize(argsStr, retryerInfo.getExecutor().getClass(), retryerInfo.getMethod());
        } catch (JsonProcessingException e) {
            throw new EasyRetryClientException("參數(shù)解析異常", e);
        }

        String idempotentId;
        try {
            Class<? extends IdempotentIdGenerate> idempotentIdGenerate = retryerInfo.getIdempotentIdGenerate();
            IdempotentIdGenerate generate = idempotentIdGenerate.newInstance();
            Method method = idempotentIdGenerate.getMethod("idGenerate", Object[].class);
            Object p = new Object[]{scene, executorName, deSerialize, executorMethod.getName()};
            idempotentId = (String) ReflectionUtils.invokeMethod(method, generate, p);
        } catch (Exception exception) {
            LogUtils.error(log, "冪等id生成異常:{},{}", scene, argsStr, exception);
            throw new EasyRetryClientException("idempotentId生成異常:{},{}", scene, argsStr);
        }

        return new Result<>(idempotentId);
    }
}

可以看到只有服務(wù)端重試會(huì)再次上報(bào),手動(dòng)重試的不會(huì):

RetryerResultContext retryerResultContext = retryStrategy.openRetry(executeReqDto.getScene(), executeReqDto.getExecutorName(), deSerialize);

5.12 服務(wù)端的schedule任務(wù)

分布式重試服務(wù)平臺(tái) Easy-Retry

schedule任務(wù)里面使用了:@SchedulerLock注解 和數(shù)據(jù)庫(kù)加了一張表:shedlock表

分布式重試服務(wù)平臺(tái) Easy-Retry

這種就可以然服務(wù)端是集群部署的時(shí)候只有一個(gè)節(jié)點(diǎn)可以執(zhí)行定時(shí)任務(wù)了。

@SchedulerLock詳解

https://blog.csdn.net/qq_45498460/article/details/119454759

6.集群架構(gòu)

分布式重試服務(wù)平臺(tái) Easy-Retry

7.總結(jié)

??到此easy-retry分布式開(kāi)源重試框架已經(jīng)分享完了,一般這種框架都是這種套路的,使用netty來(lái)做客戶(hù)端和服務(wù)端的心跳、采集、監(jiān)控、上報(bào),只不過(guò)每一個(gè)側(cè)重解決的業(yè)務(wù)痛點(diǎn)不一樣,就比如xxl-job分布式任務(wù)框架,簡(jiǎn)單的業(yè)務(wù)中使用spring-retry就足夠了,希望我的分享對(duì)你有幫助,請(qǐng)一鍵三連,么么噠!文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-487283.html

到了這里,關(guān)于分布式重試服務(wù)平臺(tái) Easy-Retry的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶(hù)投稿,該文觀(guān)點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 分布式監(jiān)控平臺(tái)—zabbix

    分布式監(jiān)控平臺(tái)—zabbix

    作為一個(gè)運(yùn)維,需要會(huì)使用監(jiān)控系統(tǒng)查看服務(wù)器狀態(tài)以及網(wǎng)站流量指標(biāo),利用監(jiān)控系統(tǒng)的數(shù)據(jù)去了解上線(xiàn)發(fā)布的結(jié)果,和網(wǎng)站的健康狀態(tài)。 利用一個(gè)優(yōu)秀的監(jiān)控軟件,我們可以: 通過(guò)一個(gè)友好的界面進(jìn)行瀏覽整個(gè)網(wǎng)站所有的服務(wù)器狀態(tài) 可以在Web 前端方便的查看監(jiān)控?cái)?shù)據(jù) 可

    2024年02月13日
    瀏覽(30)
  • 分布式監(jiān)控平臺(tái)——Zabbix

    分布式監(jiān)控平臺(tái)——Zabbix

    市場(chǎng)上常用的監(jiān)控軟件: 傳統(tǒng)運(yùn)維:zabbix、 Nagios 作為一個(gè)運(yùn)維,需要會(huì)使用監(jiān)控系統(tǒng)查看服務(wù)器狀態(tài)以及網(wǎng)站流量指標(biāo),利用監(jiān)控系統(tǒng)的數(shù)據(jù)去了解上線(xiàn)發(fā)布的結(jié)果,和網(wǎng)站的健康狀態(tài)。 利用一個(gè)優(yōu)秀的監(jiān)控軟件,我們可以: 通過(guò)一個(gè)友好的界面進(jìn)行瀏覽整個(gè)網(wǎng)站所有的

    2024年02月13日
    瀏覽(33)
  • 分布式監(jiān)控平臺(tái)-Zabbix

    分布式監(jiān)控平臺(tái)-Zabbix

    作為一個(gè)運(yùn)維,需要會(huì)使用監(jiān)控系統(tǒng)查看服務(wù)器狀態(tài)以及網(wǎng)站流量指標(biāo),利用監(jiān)控系統(tǒng)的數(shù)據(jù)去了解上線(xiàn)發(fā)布的結(jié)果,和網(wǎng)站的健康狀態(tài)。 利用一個(gè)優(yōu)秀的監(jiān)控軟件,我們可以: 通過(guò)一個(gè)友好的界面進(jìn)行瀏覽整個(gè)網(wǎng)站所有的服務(wù)器狀態(tài) 可以在Web 前端方便的查看監(jiān)控?cái)?shù)據(jù) 可

    2023年04月19日
    瀏覽(133)
  • 分布式監(jiān)控平臺(tái)---Zabbix

    分布式監(jiān)控平臺(tái)---Zabbix

    作為一個(gè)運(yùn)維,需要會(huì)使用監(jiān)控系統(tǒng)查看服務(wù)器狀態(tài)以及網(wǎng)站流量指標(biāo),利用監(jiān)控系統(tǒng)的數(shù)據(jù)去了解上線(xiàn)發(fā)布的結(jié)果,和網(wǎng)站的健康狀態(tài)。 利用一個(gè)優(yōu)秀的監(jiān)控軟件,我們可以: 通過(guò)一個(gè)友好的界面進(jìn)行瀏覽整個(gè)網(wǎng)站所有的服務(wù)器狀態(tài) 可以在Web 前端方便的查看監(jiān)控?cái)?shù)據(jù) 可

    2024年04月22日
    瀏覽(34)
  • hadoop平臺(tái)完全分布式搭建

    安裝前準(zhǔn)備 一、設(shè)置ssh免密登錄 1.編輯hosts文件,添加主機(jī)名映射內(nèi)容 vim ?/etc/hosts 添加內(nèi)容: 172.17.0.2 ?????master 172.17.0.3 ?????slave1 172.17.0.4 ?????slave2 2.生成公鑰和私鑰 ssh-keygen –t rsa 然后按三次回車(chē) 3.復(fù)制公鑰到其他容器(包括自己) ssh-copy-id master ssh-copy-id slav

    2024年03月17日
    瀏覽(30)
  • 一鍵構(gòu)建分布式云原生平臺(tái)

    一鍵構(gòu)建分布式云原生平臺(tái)

    ??作者簡(jiǎn)介: 哪吒 ,CSDN2022博客之星Top1、CSDN2021博客之星Top2、多屆新星計(jì)劃導(dǎo)師?、博客專(zhuān)家?? , 專(zhuān)注Java硬核干貨分享,立志做到Java賽道全網(wǎng)Top N。 ??本文收錄于 Java基礎(chǔ)教程系列(進(jìn)階篇) ,本專(zhuān)欄是針對(duì)大學(xué)生、初級(jí)Java工程師精心打造, 針對(duì)Java生態(tài),逐個(gè)擊破,

    2023年04月17日
    瀏覽(27)
  • 分布式計(jì)算平臺(tái) Hadoop 簡(jiǎn)介

    分布式計(jì)算平臺(tái) Hadoop 簡(jiǎn)介

    Hadoop是一種分析和處理大數(shù)據(jù)的軟件平臺(tái),是一個(gè)用Java語(yǔ)言實(shí)現(xiàn)的Apache的開(kāi)源軟件框架,在大量計(jì)算機(jī)組成的集群中實(shí)現(xiàn)了對(duì)海量數(shù)據(jù)的分布式計(jì)算。其主要采用MapReduce分布式計(jì)算框架,包括根據(jù)GFS原理開(kāi)發(fā)的分布式文件系統(tǒng)HDFS、根據(jù)BigTable原理開(kāi)發(fā)的數(shù)據(jù)存儲(chǔ)系統(tǒng)HBase以及

    2024年02月01日
    瀏覽(97)
  • 分布式運(yùn)用——監(jiān)控平臺(tái) Zabbix

    分布式運(yùn)用——監(jiān)控平臺(tái) Zabbix

    作為一個(gè)運(yùn)維,需要會(huì)使用監(jiān)控系統(tǒng)查看服務(wù)器系統(tǒng)性能、應(yīng)用服務(wù)狀態(tài)和網(wǎng)站流量指標(biāo)等,利用監(jiān)控系統(tǒng)的數(shù)據(jù)去了解網(wǎng)站上線(xiàn)發(fā)布的結(jié)果和健康狀態(tài)。 利用一個(gè)優(yōu)秀的監(jiān)控軟件,我們可以: 通過(guò)一個(gè)友好的界面進(jìn)行瀏覽整個(gè)網(wǎng)站所有的服務(wù)器狀態(tài) 可以在 Web 前端方便的查

    2024年02月13日
    瀏覽(23)
  • DAY 76 分布式監(jiān)控平臺(tái):zabbix

    DAY 76 分布式監(jiān)控平臺(tái):zabbix

    市場(chǎng)上常用的監(jiān)控軟件: 傳統(tǒng)運(yùn)維:zabbix、 Nagios 云原生環(huán)境: Prometheus (go語(yǔ)言開(kāi)發(fā)的) 作為一個(gè)運(yùn)維,需要會(huì)使用監(jiān)控系統(tǒng)查看服務(wù)器狀態(tài)以及網(wǎng)站流量指標(biāo),利用監(jiān)控系統(tǒng)的數(shù)據(jù)去了解上線(xiàn)發(fā)布的結(jié)果,和網(wǎng)站的健康狀態(tài) 利用一個(gè)優(yōu)秀的監(jiān)控軟件,我們可以: 通過(guò)一

    2024年02月08日
    瀏覽(60)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包