国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

分布式任務(wù)調(diào)度系統(tǒng)分析

這篇具有很好參考價(jià)值的文章主要介紹了分布式任務(wù)調(diào)度系統(tǒng)分析。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

背景介紹

首先,我們來思考一些幾個(gè)業(yè)務(wù)場(chǎng)景:

  • XX 信用卡中心,每月 28 日凌晨 1:00 到 3:00 需要完成全網(wǎng)用戶當(dāng)月的費(fèi)用清單的生成
  • XX 電商平臺(tái),需要每天上午 9:00 開始向會(huì)員推送送優(yōu)惠券使用提醒
  • XX 公司,需要定時(shí)執(zhí)行 Python 腳本,清理掉某文件服務(wù)系統(tǒng)中無效的 tmp 文件

最開始,在單臺(tái)服務(wù)器使用Linux Cron 就能滿足定時(shí)任務(wù)需求,但是隨著任務(wù)數(shù)量的不斷增長,單機(jī)模式會(huì)對(duì)機(jī)器負(fù)載產(chǎn)生巨大的壓力,無法保證正常地觸發(fā)運(yùn)行任務(wù)。由此,就誕生了各種個(gè)樣的分布式定時(shí)任務(wù)調(diào)度平臺(tái),比如 Quartz、XXL-Job、ElasticJob,PowerJob。但是,大部分公司可能都會(huì)選擇自研

  • 自研更容易適配自有基礎(chǔ)框架和技術(shù)工具
  • 自研系統(tǒng)的架構(gòu)可靈活調(diào)整,并適配業(yè)務(wù)
  • 對(duì)開源項(xiàng)目做二次開發(fā)或者封裝第三方 SDK 的開發(fā)和維護(hù)成本也不低

分布式任務(wù)調(diào)度系統(tǒng)設(shè)計(jì)

分布式任務(wù)調(diào)度系統(tǒng)分析

  • 采用分布式架構(gòu),解決單體架構(gòu)遇到的性能瓶頸問題
  • 主要由調(diào)度器,執(zhí)行器和Web控制臺(tái),API服務(wù)四個(gè)模塊構(gòu)成
    • 根據(jù)配置的路由策略進(jìn)行調(diào)度計(jì)算、執(zhí)行和停止具體任務(wù)、界面化管理任務(wù)和集群資源
      整個(gè)系統(tǒng)的核心在于調(diào)度器,調(diào)度器會(huì)實(shí)現(xiàn)負(fù)責(zé)管理任務(wù)的生命周期,維護(hù)任務(wù)的依賴關(guān)系(DAG 編排),支持定時(shí)任務(wù)觸發(fā),監(jiān)控任務(wù)狀態(tài),管理任務(wù)的生命周期,維護(hù)任務(wù)狀態(tài)機(jī)。
    • 分配任務(wù)到指定執(zhí)行器。根據(jù)任務(wù)的類型、等待時(shí)間、優(yōu)先級(jí)等信息,按照多種調(diào)度算法,對(duì)任務(wù)進(jìn)行調(diào)度并將任務(wù)分發(fā)給合理的 執(zhí)行器 來執(zhí)行任務(wù)

開源系統(tǒng) XXX-Job

一個(gè)分布式任務(wù)調(diào)度系統(tǒng),基本會(huì)實(shí)現(xiàn)幾個(gè)任務(wù)注冊(cè),任務(wù)調(diào)度,任務(wù)執(zhí)行幾個(gè)核心點(diǎn)

  • 任務(wù)注冊(cè):業(yè)務(wù)方注冊(cè)任務(wù)到XXX-Job Admin
  • 任務(wù)觸發(fā):XXX-Job Admin 根據(jù)配置觸發(fā)任務(wù)調(diào)度
  • 任務(wù)調(diào)度:任務(wù)觸發(fā)之后,根據(jù)調(diào)度算法,找到執(zhí)行器
  • 任務(wù)執(zhí)行 :執(zhí)行器執(zhí)行任務(wù),返回結(jié)果

XXX-Job

分布式任務(wù)調(diào)度系統(tǒng)分析

調(diào)度中心

負(fù)責(zé)管理調(diào)度信息,按照調(diào)度配置發(fā)出調(diào)度請(qǐng)求,自身不承擔(dān)業(yè)務(wù)代碼。調(diào)度系統(tǒng)與任務(wù)解耦,提高了系統(tǒng)可用性和穩(wěn)定性,同時(shí)調(diào)度系統(tǒng)性能不再受限于任務(wù)模塊;
支持可視化、簡(jiǎn)單且動(dòng)態(tài)的管理調(diào)度信息,包括任務(wù)新建,更新,刪除,GLUE開發(fā)和任務(wù)報(bào)警等,所有上述操作都會(huì)實(shí)時(shí)生效,同時(shí)支持監(jiān)控調(diào)度結(jié)果以及執(zhí)行日志,支持執(zhí)行器Failover,支持創(chuàng)建執(zhí)行器等功能。

執(zhí)行器

負(fù)責(zé)接收調(diào)度請(qǐng)求并執(zhí)行任務(wù)邏輯。任務(wù)模塊專注于任務(wù)的執(zhí)行等操作,開發(fā)和維護(hù)更加簡(jiǎn)單和高效;接收“調(diào)度中心”的執(zhí)行請(qǐng)求、終止請(qǐng)求和日志請(qǐng)求等。

項(xiàng)目構(gòu)成

分布式任務(wù)調(diào)度系統(tǒng)分析

  • admin 是xxljob的控制臺(tái),可以配置執(zhí)行器,定時(shí)任務(wù),dashboard查看等功能
  • core 是業(yè)務(wù)方也要引入的jar包,內(nèi)置通過netty于admin進(jìn)行通信
  • samples 項(xiàng)目是測(cè)試項(xiàng)目,包含傳統(tǒng)spring項(xiàng)目如何引入xxljob和springboot如何引入

核心類調(diào)用關(guān)系

分布式任務(wù)調(diào)度系統(tǒng)分析

類名 作用 備注
XxlJobAdminConfig 負(fù)責(zé)創(chuàng)建XxlJobScheduler實(shí)例
XxlJobScheduler 負(fù)責(zé)創(chuàng)建各種線程,包括任務(wù)注冊(cè)主線程,調(diào)度容器的主線程,以及調(diào)度參數(shù)的配置線程池 JobTriggerPoolHelper
JobScheduleHelper 調(diào)度容器,創(chuàng)建一個(gè)守護(hù)線程查詢所有下次執(zhí)行時(shí)間在當(dāng)前時(shí)間5秒內(nèi)的定時(shí)任務(wù),并按條件執(zhí)行
JobTriggerPoolHelper 創(chuàng)建操作XxlJobTrigger的線程池,并添加trigger
XxlJobTrigger 表示一個(gè)調(diào)度參數(shù)的配置,會(huì)查詢具體的定時(shí)任務(wù)信息XxlJobInfo
XxlJob 定義執(zhí)行器的注解
JobThread 調(diào)用IJobHandler的executer執(zhí)行任務(wù),并回調(diào)調(diào)度中心
IJobHandler 抽象的執(zhí)行器接口,定義了要執(zhí)行的具體內(nèi)容,同樣的也是一個(gè)execute方法
EmbedServer 內(nèi)嵌的Server,默認(rèn)端口是9999
ExecutorBiz 其中的run方法用于調(diào)用執(zhí)行器,有兩個(gè)是實(shí)現(xiàn)類ExecutorBizImpl以及ExecutorBizClient 。

任務(wù)注冊(cè)

xxlJobExecutor.setXxlJobBeanList(Arrays.asList(new SampleXxlJob())); 注冊(cè)自己的任務(wù)

public void initXxlJobExecutor() {
 
    // load executor prop
    Properties xxlJobProp = loadProperties("xxl-job-executor.properties");
 
    // init executor
    xxlJobExecutor = new XxlJobSimpleExecutor();
    xxlJobExecutor.setAdminAddresses(xxlJobProp.getProperty("xxl.job.admin.addresses"));
    xxlJobExecutor.setAccessToken(xxlJobProp.getProperty("xxl.job.accessToken"));
    xxlJobExecutor.setAppname(xxlJobProp.getProperty("xxl.job.executor.appname"));
    xxlJobExecutor.setAddress(xxlJobProp.getProperty("xxl.job.executor.address"));
    xxlJobExecutor.setIp(xxlJobProp.getProperty("xxl.job.executor.ip"));
    xxlJobExecutor.setPort(Integer.valueOf(xxlJobProp.getProperty("xxl.job.executor.port")));
    xxlJobExecutor.setLogPath(xxlJobProp.getProperty("xxl.job.executor.logpath"));
    xxlJobExecutor.setLogRetentionDays(Integer.valueOf(xxlJobProp.getProperty("xxl.job.executor.logretentiondays")));
 
    // registry job bean
    xxlJobExecutor.setXxlJobBeanList(Arrays.asList(new SampleXxlJob()));
 
    // start executor
    try {
        xxlJobExecutor.start();
    } catch (Exception e) {
        logger.error(e.getMessage(), e);
    }
}
xxlJobExecutor 會(huì)執(zhí)行 initJobHandlerMethodRepository :將任務(wù)處理handler記錄在本地

 super.start() 調(diào)用 XxlJobExecutor@Override
public void start() {
 
    // init JobHandler Repository (for method)
    initJobHandlerMethodRepository(xxlJobBeanList);
 
    // super start
    try {
        super.start();
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}
  • TriggerCallbackThread: 執(zhí)行handler 之后上班結(jié)果的線程
  • initEmbedServer: 初始化服務(wù),這里面也會(huì)把自己注冊(cè)到 admin 里面去,讓自己成為一個(gè) executor
public void start() throws Exception {
 
    // init logpath
    XxlJobFileAppender.initLogPath(logPath);
 
    // init invoker, admin-client
    initAdminBizList(adminAddresses, accessToken);
 
 
    // init JobLogFileCleanThread
    JobLogFileCleanThread.getInstance().start(logRetentionDays);
 
    // init TriggerCallbackThread
    TriggerCallbackThread.getInstance().start();
 
    // init executor-server
    initEmbedServer(address, ip, port, appname, accessToken);
}

任務(wù)調(diào)度

admin的核心功能

public void init() throws Exception {
    //初始化fastTriggerPool和slowTriggerPool線程池對(duì)象
    JobTriggerPoolHelper.toStart();
 
    /**
     * 開啟線程,每90s查詢執(zhí)行器的數(shù)據(jù),如果執(zhí)行器上次更新時(shí)間超過90s未更新,就移除這個(gè)執(zhí)行器,并把存活的執(zhí)行器更新
     */
    JobRegistryHelper.getInstance().start();
 
    /**
     * 啟動(dòng)線程,查找任務(wù)執(zhí)行失敗的任務(wù),
     * 1.設(shè)置了重試次數(shù),就再次觸發(fā)任務(wù)
     * 2.判斷是否需郵件預(yù)警
     */
    JobFailMonitorHelper.getInstance().start();
 
    /**
     * 啟動(dòng)線程,處理任務(wù)結(jié)果丟失的數(shù)據(jù)
     * 任務(wù)結(jié)果丟失處理:調(diào)度記錄停留在 "運(yùn)行中" 狀態(tài)超過10min,且對(duì)應(yīng)執(zhí)行器心跳注冊(cè)失敗不在線,則將本地調(diào)度主動(dòng)標(biāo)記失??;
     */
    JobCompleteHelper.getInstance().start();
 
    JobLogReportHelper.getInstance().start();
 
    /**
 
     * 啟動(dòng)線程,執(zhí)行任務(wù)
     */
    JobScheduleHelper.getInstance().start();
 
    logger.info(">>>>>>>>> init xxl-job admin success.");
}

任務(wù)查詢

  • 獲取數(shù)據(jù)庫的 lock:

    • preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
  • 找到待調(diào)度的任務(wù)

    • List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
  • 下發(fā)任務(wù):

    • JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
public void start(){
 
    // schedule thread
    scheduleThread = new Thread(new Runnable() {
        @Override
        public void run() {
 
            try {
                TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );
            } catch (InterruptedException e) {
                if (!scheduleThreadToStop) {
                    logger.error(e.getMessage(), e);
                }
            }
            logger.info(">>>>>>>>> init xxl-job admin scheduler success.");
 
            // pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)
            int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;
 
            while (!scheduleThreadToStop) {
 
                // Scan Job
                long start = System.currentTimeMillis();
 
                Connection conn = null;
                Boolean connAutoCommit = null;
                PreparedStatement preparedStatement = null;
 
                boolean preReadSuc = true;
                try {
 
                    conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
                    connAutoCommit = conn.getAutoCommit();
                    conn.setAutoCommit(false);
 
                    preparedStatement = conn.prepareStatement(  "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
                    preparedStatement.execute();
 
                    // tx start
 
                    // 1、 查詢從當(dāng)前時(shí)間+5秒內(nèi)要執(zhí)行的任務(wù)
                    long nowTime = System.currentTimeMillis();
                    List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
                    if (scheduleList!=null && scheduleList.size()>0) {
                        // 2、push time-ring
                        for (XxlJobInfo jobInfo: scheduleList) {
 
                            // time-ring jump todo 如果任務(wù)超時(shí)5秒
                            if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
                                // 2.1、trigger-expire > 5s:pass && make next-trigger-time
                                logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());
 
                                // 1、misfire match
                                MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);
                                if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {
                                    // FIRE_ONCE_NOW 》 trigger todo 如果任務(wù)配置的"調(diào)度過期策略"是"立即執(zhí)行一次",那么就觸發(fā)一次任務(wù)
                                    JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);
                                    logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
                                }
 
                                // 2、fresh next todo 從當(dāng)前時(shí)間開始,計(jì)算任務(wù)的下一次執(zhí)行時(shí)間
                                refreshNextValidTime(jobInfo, new Date());
 
                            } else if (nowTime > jobInfo.getTriggerNextTime()) {
                                // 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time todo 任務(wù)執(zhí)行時(shí)間在當(dāng)前時(shí)間的5s內(nèi)
 
                                // 1、trigger  觸發(fā)任務(wù)
                                JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
                                logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
 
                                // 2、fresh next todo 從當(dāng)前時(shí)間開始,計(jì)算任務(wù)的下一次執(zhí)行時(shí)間
                                refreshNextValidTime(jobInfo, new Date());
 
                                // next-trigger-time in 5s, pre-read again todo 如果下一次的執(zhí)行時(shí)間在未來5s內(nèi)
                                if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {
 
                                    // 1、make ring second
                                    int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
 
                                    // 2、push time ring
                                    pushTimeRing(ringSecond, jobInfo.getId());
 
                                    // 3、fresh next
                                    refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
 
                                }
 
                            } else {
                                // 還沒有到達(dá)任務(wù)執(zhí)行的時(shí)間
                                // 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time
 
                                // 1 計(jì)算剩余的秒數(shù)字
                                int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
 
                                // 2、把剩余秒數(shù)--任務(wù)id存入map中 ;==>下面的 ringThread 線程,會(huì)每一秒執(zhí)行一次,查到對(duì)應(yīng)的數(shù)據(jù)后,觸發(fā)任務(wù)
                                pushTimeRing(ringSecond, jobInfo.getId());
 
                                // 3、重新計(jì)算下一次調(diào)度時(shí)間
                                refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
 
                            }
 
                        }
 
                        // 3、update trigger info TODO 修改jonInfo的內(nèi)容
                        for (XxlJobInfo jobInfo: scheduleList) {
                            XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
                        }
 
                    } else {
                        preReadSuc = false;
                    }

任務(wù)下發(fā)

  • 初始化下發(fā)參數(shù), triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());

  • 找到需要下發(fā)的地址: routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());

  • 路由算法
    分布式任務(wù)調(diào)度系統(tǒng)分析

  • 調(diào)用執(zhí)行器接口:·XxlJobRemotingUtil.postBody(addressUrl + “run”, accessToken, timeout, triggerParam, String.class);·

private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){
 
        // param
        ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION);  // block strategy
        ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null);    // route strategy
        String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null;
 
        XxlJobLog jobLog = new XxlJobLog();
        jobLog.setJobGroup(jobInfo.getJobGroup());
        jobLog.setJobId(jobInfo.getId());
        jobLog.setTriggerTime(new Date());
        XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);
        logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());
 
        // 初始化下發(fā)的參數(shù)
        TriggerParam triggerParam = new TriggerParam();
        triggerParam.setJobId(jobInfo.getId());
        triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
 
        / 代碼省略
        // 找到需要下發(fā)的地址
        String address = null;
        ReturnT<String> routeAddressResult = null;
        if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {
           // 廣播地址處理
            if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {
                if (index < group.getRegistryList().size()) {
                    address = group.getRegistryList().get(index);
                } else {
                    address = group.getRegistryList().get(0);
                }
            } else {
                // 根據(jù)路由找到地址
                routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
                if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {
                    address = routeAddressResult.getContent();
                }
            }
        } else {
            routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));
        }
 
        // 4、trigger remote executor
        ReturnT<String> triggerResult = null;
        if (address != null) {
            // 調(diào)用執(zhí)行器
            // 
            triggerResult = runExecutor(triggerParam, address);
        } else {
            triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);
        }
 
        / 代碼省略
 
        logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());

調(diào)用執(zhí)行器接口

public ReturnT<String> run(TriggerParam triggerParam) {
    return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class);
}

任務(wù)執(zhí)行

admin 會(huì)調(diào)用執(zhí)行器的 addressUrl + “run”,這個(gè)接口主要用來觸發(fā)任務(wù)執(zhí)行的

// services mapping
try {
    switch (uri) {
        case "/beat":
            // 類似心跳接口
            return executorBiz.beat();
        case "/idleBeat":
            IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
            return executorBiz.idleBeat(idleBeatParam);
        case "/run":
            TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
            return executorBiz.run(triggerParam);
        case "/kill":
            KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
            return executorBiz.kill(killParam);
        case "/log":
            LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
            return executorBiz.log(logParam);
        default:
            return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping(" + uri + ") not found.");
    }
} catch (Exception e) {
    logger.error(e.getMessage(), e);
    return new ReturnT<String>(ReturnT.FAIL_CODE, "request error:" + ThrowableUtil.toString(e));
}
  • 這里面主要做下面幾件事情
    • 根據(jù)任務(wù)類型,創(chuàng)建不同的 jobHandler
    • 創(chuàng)建一個(gè)線程執(zhí)行 jobHandler,監(jiān)聽參數(shù)隊(duì)列
    • 將任務(wù)參數(shù)給到 參數(shù)隊(duì)列,jobHandler線程 獲取參數(shù)
public ReturnT<String> run(TriggerParam triggerParam) {
    // load old:jobHandler + jobThread TODO 查詢這個(gè)任務(wù)的線程,第一次執(zhí)行任務(wù),是沒有這個(gè)任務(wù)的
    JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
    IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
    String removeOldReason = null;
 
     // 根據(jù)任務(wù)類型,創(chuàng)建不同的 jobHandler
    GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
    if (GlueTypeEnum.BEAN == glueTypeEnum) {
        //  創(chuàng)建 jobhandler,備注,執(zhí)行器在啟動(dòng)的時(shí)候,會(huì)掃描@xxlJob注解修飾的方法,注冊(cè)到map中,這里直接取出來,IJobHandler就是對(duì)jdk的method對(duì)象的封裝
        IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());
 
        if (jobThread!=null && jobHandler != newJobHandler) {
            // change handler, need kill old thread
            removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";
 
            jobThread = null;
            jobHandler = null;
        }
 
        if (jobHandler == null) {
            jobHandler = newJobHandler;
            if (jobHandler == null) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");
            }
        }
 
    } else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {
 
        // valid old jobThread
        if (jobThread != null &&
                !(jobThread.getHandler() instanceof GlueJobHandler
                    && ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
            // change handler or gluesource updated, need kill old thread
            removeOldReason = "change job source or glue type, and terminate the old job thread.";
 
            jobThread = null;
            jobHandler = null;
        }
 
        if (jobHandler == null) {
            try {
                IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource());
                jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime());
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
                return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());
            }
        }
    }  
 
    // 判斷堵塞策略
    if (jobThread != null) {
        ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
         // DISCARD_LATER,不能直接執(zhí)行的話,就直接退出
        if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
            // discard when running
            if (jobThread.isRunningOrHasQueue()) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
            }
        } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
            //替換一個(gè)新的 jobThread
            if (jobThread.isRunningOrHasQueue()) {
                removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();
 
                jobThread = null;
            }
        } else {
            // just queue trigger
        }
    }
 
    //  jobThread 為null的話,就創(chuàng)建一個(gè),并執(zhí)行線程,是一個(gè)死循環(huán),從 triggerQueue中讀取參數(shù),并執(zhí)行
    if (jobThread == null) {
        // 這里會(huì)stop 掉之前舊的存在的 jobThread
        jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
    }
 
    //  把參數(shù)放在任務(wù)隊(duì)列線程中的 triggerQueue 中
    //  pushTriggerQueue 也會(huì)判斷是否是重復(fù)執(zhí)行的 job
    ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
    return pushResult;
}

結(jié)果返回

  • 將結(jié)果給:TriggerCallbackThread.pushCallBack
  • TriggerCallbackThread 消費(fèi)返回的結(jié)果
  • admin 接受返回結(jié)果,寫入到DB
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
                              triggerParam.getLogId(),
                              triggerParam.getLogDateTime(),
                              XxlJobContext.getXxlJobContext().getHandleCode(),
                              XxlJobContext.getXxlJobContext().getHandleMsg() )

拿到任務(wù)的執(zhí)行結(jié)果

@Override
    public void run() {
 
        // normal callback
        while(!toStop){
            try {
                // 拿到任務(wù)的執(zhí)行結(jié)果
                HandleCallbackParam callback = getInstance().callBackQueue.take();
                if (callback != null) {
 
                    // callback list param
                    List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
                    int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
                    callbackParamList.add(callback);
 
                    // callback, will retry if error
                    if (callbackParamList!=null && callbackParamList.size()>0) {
                        // 調(diào)用 admin 接口:api/callback 給 admin 
                        doCallback(callbackParamList);
                    }
                }
            } catch (Exception e) {
                if (!toStop) {
                    logger.error(e.getMessage(), e);
                }
            }
        }
 
       // 代碼省略
 
    }
});

總結(jié)

一個(gè)任務(wù)的生命周期

分布式任務(wù)調(diào)度系統(tǒng)分析

不足之處

  • 待調(diào)度的任務(wù)存儲(chǔ)在一張表中,如果待調(diào)度任務(wù)過多的時(shí)候,可能會(huì)造成任務(wù)調(diào)度延遲
  • 只能選擇一個(gè)調(diào)度策略
    • 執(zhí)行器服務(wù)是優(yōu)雅關(guān)閉,會(huì)自動(dòng)調(diào)用 /reigstRomeve方法告訴admin,自己移除,admin會(huì)通過定時(shí)任務(wù)掃描,每90秒掃描一次執(zhí)行器上次注冊(cè)到時(shí)間,如果超過90s,就主動(dòng)移除這個(gè)執(zhí)行器異常退出,某些策略會(huì)導(dǎo)致任務(wù)在90s內(nèi)一直失?。ū热绲谝粋€(gè))
  • 分片策略,這個(gè)策略是所有的機(jī)器都執(zhí)行相同的參數(shù),由執(zhí)行器自己區(qū)分

一個(gè)任務(wù)調(diào)度系統(tǒng)核心點(diǎn)

  • 數(shù)據(jù)量大時(shí)保證調(diào)度時(shí)間:分庫分表
  • 任務(wù)丟失處理:調(diào)度器定時(shí)掃描執(zhí)行時(shí)間過長的任務(wù)

// 任務(wù)結(jié)果丟失處理:調(diào)度記錄停留在 "運(yùn)行中" 狀態(tài)超過10min,且對(duì)應(yīng)執(zhí)行器心跳注冊(cè)失敗不在線,則將本地調(diào)度主動(dòng)標(biāo)記失?。?/span>
Date losedTime = DateUtil.addMinutes(new Date(), -10);
List<Long> losedJobIds  = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLostJobIds(losedTime);
  • 任務(wù)冪等性:不能重復(fù)執(zhí)行,在執(zhí)行器記錄現(xiàn)在有哪一些任務(wù)在執(zhí)行, 但是,執(zhí)行器在執(zhí)行業(yè)務(wù)代碼時(shí)也建議最好冪等
public ReturnT<String> pushTriggerQueue(TriggerParam triggerParam) {
    //  triggerLogIdSet 記錄了當(dāng)前執(zhí)行器正在執(zhí)行的任務(wù),發(fā)現(xiàn)重復(fù)時(shí)不會(huì)寫入到 triggerQueue 中
    if (triggerLogIdSet.contains(triggerParam.getLogId())) {
        logger.info(">>>>>>>>>>> repeate trigger job, logId:{}", triggerParam.getLogId());
        return new ReturnT<String>(ReturnT.FAIL_CODE, "repeate trigger job, logId:" + triggerParam.getLogId());
    }
 
    triggerLogIdSet.add(triggerParam.getLogId());
    triggerQueue.add(triggerParam);
    return ReturnT.SUCCESS;
}
  • 調(diào)度策略:解決如何分配到哪一個(gè)執(zhí)行器問題,實(shí)現(xiàn)負(fù)載均衡,分片處理,高容錯(cuò),節(jié)點(diǎn)故障轉(zhuǎn)移作業(yè)
    • 實(shí)現(xiàn)負(fù)載均衡的一個(gè)需要執(zhí)行器上報(bào)當(dāng)前的狀態(tài):當(dāng)前的任務(wù)執(zhí)行數(shù)量,待調(diào)度的任務(wù)數(shù)量,讀取服務(wù)器系統(tǒng)負(fù)載水平,根據(jù)這些,選擇一個(gè)綜合數(shù)值最小的執(zhí)行器出來
  • 分片處理:分片處理通常需要調(diào)度器支持
    • java 實(shí)現(xiàn)分片
private static String calculatedExecutorParamValue(XxlJobInfo jobInfo,Integer index,Integer adminTotal) {
    String executorParam = jobInfo.getExecutorParam();
    if(StringUtils.isEmpty(executorParam)){
        return null;
    }
    try {
        List<String> paramList = JSONObject.parseArray(executorParam, String.class);
        if(CollectionUtils.isEmpty(paramList)){
            return null;
        }
        List<List<String>> paramAverageList = averageAssign(paramList, adminTotal);
        List<String> result = paramAverageList.get(index);
        return JSON.toJSONString(result);
    }catch (Exception e){
        logger.error("分片并行計(jì)算,解析參數(shù)錯(cuò)誤,參數(shù):{},錯(cuò)誤原因:{}",executorParam,e.getMessage());
    }
    return null;
}
 
 
 
public static <T> List <List<T>> averageAssign(List<T>source,int n){
    List <List<T>> result=new ArrayList<List<T>>();
    int remainder=source.size()%n;  //先計(jì)算出余數(shù)
    int number=source.size()/n;  //然后是商
    int offset=0;//偏移量(用以標(biāo)識(shí)加的余數(shù))
    for(int i=0;i<n;i++){
        List<T>value;
        if(remainder>0){
            value=source.subList(i*number+offset, (i+1)*number+offset+1);
            remainder--;
            offset++;
        }else{
            value=source.subList(i*number+offset, (i+1)*number+offset);
        }
        result.add(value);
    }
    return result;
}
  • go 實(shí)現(xiàn)分片
package main
 
import (
    "fmt"
)
 
func chunk(slice []int, size int) [][]int {
    var chunks [][]int
    chunkSize := (len(slice) + size - 1) / size
    for i := 0; i < len(slice); i += chunkSize {
        end := i + chunkSize
        if end > len(slice) {
            end = len(slice)
        }
        chunks = append(chunks, slice[i:end])
    }
    return chunks
}
 
func main() {
    slice := []int{1, 2, 3, 4, 5}
    chunks := chunk(slice, 2)
    fmt.Println(chunks)
}

任務(wù)調(diào)度依賴:工作流模式調(diào)度依賴,一個(gè)有向無環(huán)圖:DAG(directed acyclic graph)

分布式任務(wù)調(diào)度系統(tǒng)分析文章來源地址http://www.zghlxwxcb.cn/news/detail-421355.html

package main
 
import (
    "fmt"
    "sync"
    "time"
)
 
//圖結(jié)構(gòu)
type DAG struct {
    Vertexs []*Vertex
}
 
//頂點(diǎn)
type Vertex struct {
    Key      string
    Value    interface{}
    Parents  []*Vertex
    Children []*Vertex
}
 
//添加頂點(diǎn)
func (dag *DAG) AddVertex(v *Vertex) {
    dag.Vertexs = append(dag.Vertexs, v)
}
 
//添加邊
func (dag *DAG) AddEdge(from, to *Vertex) {
    from.Children = append(from.Children, to)
    to.Parents = append(to.Parents, from)
}
 
func main() {
 
    var dag = &DAG{}
 
    //添加頂點(diǎn)
    va := &Vertex{Key: "a", Value: "1"}
    vb := &Vertex{Key: "b", Value: "2"}
    vc := &Vertex{Key: "c", Value: "3"}
    vd := &Vertex{Key: "d", Value: "4"}
    ve := &Vertex{Key: "e", Value: "5"}
    vf := &Vertex{Key: "f", Value: "6"}
    vg := &Vertex{Key: "g", Value: "7"}
    vh := &Vertex{Key: "h", Value: "8"}
    vi := &Vertex{Key: "i", Value: "9"}
    //添加邊
    dag.AddEdge(va, vb)
    dag.AddEdge(va, vc)
    dag.AddEdge(va, vd)
    dag.AddEdge(vb, ve)
    dag.AddEdge(vb, vh)
    dag.AddEdge(vb, vf)
    dag.AddEdge(vc, vf)
    dag.AddEdge(vc, vg)
    dag.AddEdge(vd, vg)
    dag.AddEdge(vh, vi)
    dag.AddEdge(ve, vi)
    dag.AddEdge(vf, vi)
    dag.AddEdge(vg, vi)
 
    //[1] [] { a }
    //[2] [] { b, c, d }
    //[3] [] { h, e, f, g }
    //[4] [] { i }
    all := LayerBFS(va)
    startTime := time.Now()
    for _, layer := range all {
        fmt.Println("------------------")
        doTasks(layer)
    }
    fmt.Printf("cost:%f\n", time.Since(startTime).Seconds())
}
 
type Queue []interface{}
 
func (q *Queue) Push(x interface{}) {
    *q = append(*q, x)
}
 
func (q *Queue) Pop() interface{} {
    h := *q
    var el interface{}
    l := len(h)
    el, *q = h[0], h[1:l]
    return el
}
 
func (q *Queue) Len() int {
    return len(*q)
}
 
func NewQueue() *Queue {
    return &Queue{}
}
 
func LayerBFS(root *Vertex) [][]*Vertex {
    queue := NewQueue()
    queue.Push(root)
    visited := make(map[string]*Vertex)
    all := make([][]*Vertex, 0)
    for queue.Len() > 0 {
        qSize := queue.Len()
        tmp := make([]*Vertex, 0)
        for i := 0; i < qSize; i++ {
            //pop vertex
            e := queue.Pop()
            currVert := e.(*Vertex)
            if _, ok := visited[currVert.Key]; ok {
                continue
            }
            visited[currVert.Key] = currVert
            tmp = append(tmp, currVert)
            for _, val := range currVert.Children {
                if _, ok := visited[val.Key]; !ok {
                    queue.Push(val) //add child
                }
            }
        }
        all = append(all, [][]*Vertex{tmp}...)
    }
    return all
}
 
//并發(fā)執(zhí)行
func doTasks(vertexs []*Vertex) {
    var wg sync.WaitGroup
    startTime := time.Now()
    for _, v := range vertexs {
        wg.Add(1)
        go func(v *Vertex) {
            defer wg.Done()
            time.Sleep(2 * time.Second)
            fmt.Printf("do %v, result is %v \n", v.Key, v.Value)
        }(v) //notice
    }
    wg.Wait()
    fmt.Printf("cost:%0.0f\n", time.Since(startTime).Seconds())
}

文檔參考

  • 實(shí)現(xiàn)一個(gè)任務(wù)調(diào)度系統(tǒng),看這篇就夠了
  • 伴魚分布式調(diào)度系統(tǒng) Jarvis 的設(shè)計(jì)與實(shí)現(xiàn)
  • 如何設(shè)計(jì)一個(gè)分布式任務(wù)調(diào)度系統(tǒng)

到了這里,關(guān)于分布式任務(wù)調(diào)度系統(tǒng)分析的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 如何本地搭建開源分布式任務(wù)調(diào)度系統(tǒng)DolphinScheduler并遠(yuǎn)程訪問

    如何本地搭建開源分布式任務(wù)調(diào)度系統(tǒng)DolphinScheduler并遠(yuǎn)程訪問

    本篇教程和大家分享一下DolphinScheduler的安裝部署及如何實(shí)現(xiàn)公網(wǎng)遠(yuǎn)程訪問,結(jié)合內(nèi)網(wǎng)穿透工具實(shí)現(xiàn)公網(wǎng)訪問DolphinScheduler內(nèi)網(wǎng)并進(jìn)行遠(yuǎn)程辦公,幫助開發(fā)人員進(jìn)行遠(yuǎn)程任務(wù)調(diào)度及管理,提高工作效率。 DolphinScheduler是一款開源的分布式任務(wù)調(diào)度系統(tǒng),它可以幫助開發(fā)人員更加方

    2024年02月05日
    瀏覽(21)
  • 基于docker的分布式任務(wù)調(diào)度系統(tǒng)xxl-job搭建

    基于docker的分布式任務(wù)調(diào)度系統(tǒng)xxl-job搭建

    本文所使用的操作系統(tǒng)為: CentOS-7-x86_64-DVD-2009 xxl-job 依賴 mysql,所以必須要安裝mysql才行! 訪問以下鏈接:https://hub.docker.com/_/mysql/ 尋找自己需要的MySQL版本拉取即可 1.下載鏡像 這里未指定版本號(hào),默認(rèn)拉取的是最新MySQL鏡像 2.導(dǎo)入zip包 下載xxljob項(xiàng)目,查看releases版本 https:

    2024年02月20日
    瀏覽(20)
  • 【xxl-job】分布式任務(wù)調(diào)度系統(tǒng)xxl-job搭建

    【xxl-job】分布式任務(wù)調(diào)度系統(tǒng)xxl-job搭建

    XXL-JOB是一個(gè)輕量級(jí)分布式任務(wù)調(diào)度平臺(tái),其核心設(shè)計(jì)目標(biāo)是開發(fā)迅速、學(xué)習(xí)簡(jiǎn)單、輕量級(jí)、易擴(kuò)展、開箱即用。 更多介紹,請(qǐng)?jiān)L問官網(wǎng):分布式任務(wù)調(diào)度平臺(tái)XXL-JOB 前提條件:任務(wù)調(diào)度中心(xxl-job admin)依賴于 mysql,所以必須要安裝mysql才行!安裝mysql有2種方式:docker部署或

    2024年02月16日
    瀏覽(23)
  • 高級(jí)分布式系統(tǒng)-第9講 實(shí)時(shí)調(diào)度--可調(diào)度性分析

    高級(jí)分布式系統(tǒng)-第9講 實(shí)時(shí)調(diào)度--可調(diào)度性分析

    高級(jí)分布式系統(tǒng)匯總:高級(jí)分布式系統(tǒng)目錄匯總-CSDN博客 分布式實(shí)時(shí)系統(tǒng)中,很多任務(wù)同時(shí)嘗試訪問共享資源(如處理器和網(wǎng)絡(luò)),調(diào)度試圖有效地利用這些資源來解決問題,以保證系統(tǒng)是正確的,換句話說是保證系統(tǒng)符合其所有的時(shí)間限制。 調(diào)度的運(yùn)行可以采取操作系統(tǒng)形

    2024年01月20日
    瀏覽(25)
  • 新起點(diǎn)!大數(shù)據(jù)分布式可視化的 DAG 任務(wù)調(diào)度系統(tǒng) Taier 正式發(fā)布1.4版本

    我們很高興向大家宣布,2023年4月14日,Taier 正式發(fā)布 1.4 版本。自2022年2月份 Taier 正式開源以來,收到了很多開發(fā)者和行業(yè)用戶的積極評(píng)價(jià),在諸多生產(chǎn)環(huán)境中已得到充分應(yīng)用。Taier 1.4版本正是吸收了各類實(shí)踐經(jīng)驗(yàn)及大家的建議,進(jìn)行了此次迭代優(yōu)化。 本次更新不僅包含了性

    2023年04月20日
    瀏覽(19)
  • 分布式任務(wù)調(diào)度,定時(shí)任務(wù)的處理方案

    分布式任務(wù)調(diào)度,定時(shí)任務(wù)的處理方案

    適用場(chǎng)景: Spring 定時(shí)任務(wù)是 Spring 框架提供的一種輕量級(jí)的任務(wù)調(diào)度方案,它的特點(diǎn)是簡(jiǎn)單易用、輕量級(jí)。Spring 定時(shí)任務(wù)的執(zhí)行是在 單個(gè)節(jié)點(diǎn) 上進(jìn)行的,如果需要分布式任務(wù)調(diào)度,需要自己實(shí)現(xiàn)相應(yīng)的解決方案。 1.導(dǎo)入依賴版本自己控制 2.啟動(dòng)類加上@EnableScheduling 3.編寫業(yè)

    2023年04月14日
    瀏覽(44)
  • 分布式任務(wù)調(diào)度(00)--Quartz

    分布式任務(wù)調(diào)度(00)--Quartz

    調(diào)度器 :工廠類創(chuàng)建Scheduler,根據(jù)觸發(fā)器定義的時(shí)間規(guī)則調(diào)度任務(wù) 任務(wù):Job表示被調(diào)度的任務(wù) 觸發(fā)器:Trigger 定義調(diào)度時(shí)間的元素,按啥時(shí)間規(guī)則執(zhí)行任務(wù)。一個(gè)Job可被多個(gè)Trigger關(guān)聯(lián),但是一個(gè)Trigger 只能關(guān)聯(lián)一個(gè)Job 執(zhí)行任務(wù)調(diào)度核心類QuartzSchedulerThread: 調(diào)度線程從JobSt

    2024年02月05日
    瀏覽(22)
  • 分布式之任務(wù)調(diào)度學(xué)習(xí)二

    分布式之任務(wù)調(diào)度學(xué)習(xí)二

    Spring-quartz 工程 Spring 在 spring-context-support.jar 中直接提供了對(duì) Quartz 的支持 可以在配置文件中把 JobDetail、Trigger、Scheduler 定義成 Bean。 4.1 定義 Job 4.2 定義 Trigger 4.3 定義 Scheduler 既然可以在配置文件配置,當(dāng)然也可以用@Bean 注解配置。在配置類上加上@Configuration 讓 Spring 讀取到

    2024年02月03日
    瀏覽(24)
  • 分布式定時(shí)任務(wù)調(diào)度框架Quartz

    分布式定時(shí)任務(wù)調(diào)度框架Quartz

    Quartz是一個(gè)定時(shí)任務(wù)調(diào)度框架,比如你遇到這樣的問題: 比如淘寶的待支付功能,后臺(tái)會(huì)在你生成訂單后24小時(shí)后,查看訂單是否支付,未支付則取消訂單 比如vip的每月自動(dòng)續(xù)費(fèi)功能 … 想定時(shí)在某個(gè)時(shí)間,去做某件事 Quartz是一套輕量級(jí)的任務(wù)調(diào)度框架,只需要定義了 Job(

    2024年02月04日
    瀏覽(34)
  • 【分布式任務(wù)調(diào)度】XXL-JOB的任務(wù)調(diào)度實(shí)現(xiàn)原理(四)

    【分布式任務(wù)調(diào)度】XXL-JOB的任務(wù)調(diào)度實(shí)現(xiàn)原理(四)

    XXL-JOB專題歷史文章列表: XXL-JOB調(diào)度中心集群部署配置(一) XXL-JOB執(zhí)行器配置及定時(shí)任務(wù)的創(chuàng)建(二) XXL-JOB調(diào)度中心對(duì)執(zhí)行器的上下線感知實(shí)現(xiàn)原理(三) 本篇的主要內(nèi)容是XXL-JOB的任務(wù)調(diào)度流程及其實(shí)現(xiàn)原理,包含了兩個(gè)部分: 調(diào)度中心如何進(jìn)行任務(wù)調(diào)度 執(zhí)行器執(zhí)行任

    2024年02月16日
    瀏覽(24)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包