背景介紹
首先,我們來思考一些幾個(gè)業(yè)務(wù)場(chǎng)景:
- XX 信用卡中心,每月 28 日凌晨 1:00 到 3:00 需要完成全網(wǎng)用戶當(dāng)月的費(fèi)用清單的生成
- XX 電商平臺(tái),需要每天上午 9:00 開始向會(huì)員推送送優(yōu)惠券使用提醒
- XX 公司,需要定時(shí)執(zhí)行 Python 腳本,清理掉某文件服務(wù)系統(tǒng)中無效的 tmp 文件
最開始,在單臺(tái)服務(wù)器使用Linux Cron 就能滿足定時(shí)任務(wù)需求,但是隨著任務(wù)數(shù)量的不斷增長,單機(jī)模式會(huì)對(duì)機(jī)器負(fù)載產(chǎn)生巨大的壓力,無法保證正常地觸發(fā)運(yùn)行任務(wù)。由此,就誕生了各種個(gè)樣的分布式定時(shí)任務(wù)調(diào)度平臺(tái),比如 Quartz、XXL-Job、ElasticJob,PowerJob。但是,大部分公司可能都會(huì)選擇自研
- 自研更容易適配自有基礎(chǔ)框架和技術(shù)工具
- 自研系統(tǒng)的架構(gòu)可靈活調(diào)整,并適配業(yè)務(wù)
- 對(duì)開源項(xiàng)目做二次開發(fā)或者封裝第三方 SDK 的開發(fā)和維護(hù)成本也不低
分布式任務(wù)調(diào)度系統(tǒng)設(shè)計(jì)
- 采用分布式架構(gòu),解決單體架構(gòu)遇到的性能瓶頸問題
- 主要由調(diào)度器,執(zhí)行器和Web控制臺(tái),API服務(wù)四個(gè)模塊構(gòu)成
- 根據(jù)配置的路由策略進(jìn)行調(diào)度計(jì)算、執(zhí)行和停止具體任務(wù)、界面化管理任務(wù)和集群資源
整個(gè)系統(tǒng)的核心在于調(diào)度器,調(diào)度器會(huì)實(shí)現(xiàn)負(fù)責(zé)管理任務(wù)的生命周期,維護(hù)任務(wù)的依賴關(guān)系(DAG 編排),支持定時(shí)任務(wù)觸發(fā),監(jiān)控任務(wù)狀態(tài),管理任務(wù)的生命周期,維護(hù)任務(wù)狀態(tài)機(jī)。 - 分配任務(wù)到指定執(zhí)行器。根據(jù)任務(wù)的類型、等待時(shí)間、優(yōu)先級(jí)等信息,按照多種調(diào)度算法,對(duì)任務(wù)進(jìn)行調(diào)度并將任務(wù)分發(fā)給合理的 執(zhí)行器 來執(zhí)行任務(wù)
- 根據(jù)配置的路由策略進(jìn)行調(diào)度計(jì)算、執(zhí)行和停止具體任務(wù)、界面化管理任務(wù)和集群資源
開源系統(tǒng) XXX-Job
一個(gè)分布式任務(wù)調(diào)度系統(tǒng),基本會(huì)實(shí)現(xiàn)幾個(gè)任務(wù)注冊(cè),任務(wù)調(diào)度,任務(wù)執(zhí)行幾個(gè)核心點(diǎn)
- 任務(wù)注冊(cè):業(yè)務(wù)方注冊(cè)任務(wù)到XXX-Job Admin
- 任務(wù)觸發(fā):XXX-Job Admin 根據(jù)配置觸發(fā)任務(wù)調(diào)度
- 任務(wù)調(diào)度:任務(wù)觸發(fā)之后,根據(jù)調(diào)度算法,找到執(zhí)行器
- 任務(wù)執(zhí)行 :執(zhí)行器執(zhí)行任務(wù),返回結(jié)果
XXX-Job
調(diào)度中心
負(fù)責(zé)管理調(diào)度信息,按照調(diào)度配置發(fā)出調(diào)度請(qǐng)求,自身不承擔(dān)業(yè)務(wù)代碼。調(diào)度系統(tǒng)與任務(wù)解耦,提高了系統(tǒng)可用性和穩(wěn)定性,同時(shí)調(diào)度系統(tǒng)性能不再受限于任務(wù)模塊;
支持可視化、簡(jiǎn)單且動(dòng)態(tài)的管理調(diào)度信息,包括任務(wù)新建,更新,刪除,GLUE開發(fā)和任務(wù)報(bào)警等,所有上述操作都會(huì)實(shí)時(shí)生效,同時(shí)支持監(jiān)控調(diào)度結(jié)果以及執(zhí)行日志,支持執(zhí)行器Failover,支持創(chuàng)建執(zhí)行器等功能。
執(zhí)行器
負(fù)責(zé)接收調(diào)度請(qǐng)求并執(zhí)行任務(wù)邏輯。任務(wù)模塊專注于任務(wù)的執(zhí)行等操作,開發(fā)和維護(hù)更加簡(jiǎn)單和高效;接收“調(diào)度中心”的執(zhí)行請(qǐng)求、終止請(qǐng)求和日志請(qǐng)求等。
項(xiàng)目構(gòu)成
- admin 是xxljob的控制臺(tái),可以配置執(zhí)行器,定時(shí)任務(wù),dashboard查看等功能
- core 是業(yè)務(wù)方也要引入的jar包,內(nèi)置通過netty于admin進(jìn)行通信
- samples 項(xiàng)目是測(cè)試項(xiàng)目,包含傳統(tǒng)spring項(xiàng)目如何引入xxljob和springboot如何引入
核心類調(diào)用關(guān)系
類名 | 作用 | 備注 |
---|---|---|
XxlJobAdminConfig | 負(fù)責(zé)創(chuàng)建XxlJobScheduler實(shí)例 | |
XxlJobScheduler | 負(fù)責(zé)創(chuàng)建各種線程,包括任務(wù)注冊(cè)主線程,調(diào)度容器的主線程,以及調(diào)度參數(shù)的配置線程池 JobTriggerPoolHelper | |
JobScheduleHelper | 調(diào)度容器,創(chuàng)建一個(gè)守護(hù)線程查詢所有下次執(zhí)行時(shí)間在當(dāng)前時(shí)間5秒內(nèi)的定時(shí)任務(wù),并按條件執(zhí)行 | |
JobTriggerPoolHelper | 創(chuàng)建操作XxlJobTrigger的線程池,并添加trigger | |
XxlJobTrigger | 表示一個(gè)調(diào)度參數(shù)的配置,會(huì)查詢具體的定時(shí)任務(wù)信息XxlJobInfo | |
XxlJob | 定義執(zhí)行器的注解 | |
JobThread | 調(diào)用IJobHandler的executer執(zhí)行任務(wù),并回調(diào)調(diào)度中心 | |
IJobHandler | 抽象的執(zhí)行器接口,定義了要執(zhí)行的具體內(nèi)容,同樣的也是一個(gè)execute方法 | |
EmbedServer | 內(nèi)嵌的Server,默認(rèn)端口是9999 | |
ExecutorBiz | 其中的run方法用于調(diào)用執(zhí)行器,有兩個(gè)是實(shí)現(xiàn)類ExecutorBizImpl以及ExecutorBizClient 。 |
任務(wù)注冊(cè)
xxlJobExecutor.setXxlJobBeanList(Arrays.asList(new SampleXxlJob()))
; 注冊(cè)自己的任務(wù)
public void initXxlJobExecutor() {
// load executor prop
Properties xxlJobProp = loadProperties("xxl-job-executor.properties");
// init executor
xxlJobExecutor = new XxlJobSimpleExecutor();
xxlJobExecutor.setAdminAddresses(xxlJobProp.getProperty("xxl.job.admin.addresses"));
xxlJobExecutor.setAccessToken(xxlJobProp.getProperty("xxl.job.accessToken"));
xxlJobExecutor.setAppname(xxlJobProp.getProperty("xxl.job.executor.appname"));
xxlJobExecutor.setAddress(xxlJobProp.getProperty("xxl.job.executor.address"));
xxlJobExecutor.setIp(xxlJobProp.getProperty("xxl.job.executor.ip"));
xxlJobExecutor.setPort(Integer.valueOf(xxlJobProp.getProperty("xxl.job.executor.port")));
xxlJobExecutor.setLogPath(xxlJobProp.getProperty("xxl.job.executor.logpath"));
xxlJobExecutor.setLogRetentionDays(Integer.valueOf(xxlJobProp.getProperty("xxl.job.executor.logretentiondays")));
// registry job bean
xxlJobExecutor.setXxlJobBeanList(Arrays.asList(new SampleXxlJob()));
// start executor
try {
xxlJobExecutor.start();
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
}
xxlJobExecutor 會(huì)執(zhí)行 initJobHandlerMethodRepository :將任務(wù)處理handler記錄在本地
super.start() 調(diào)用 XxlJobExecutor ,
@Override
public void start() {
// init JobHandler Repository (for method)
initJobHandlerMethodRepository(xxlJobBeanList);
// super start
try {
super.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
- TriggerCallbackThread: 執(zhí)行handler 之后上班結(jié)果的線程
- initEmbedServer: 初始化服務(wù),這里面也會(huì)把自己注冊(cè)到 admin 里面去,讓自己成為一個(gè) executor
public void start() throws Exception {
// init logpath
XxlJobFileAppender.initLogPath(logPath);
// init invoker, admin-client
initAdminBizList(adminAddresses, accessToken);
// init JobLogFileCleanThread
JobLogFileCleanThread.getInstance().start(logRetentionDays);
// init TriggerCallbackThread
TriggerCallbackThread.getInstance().start();
// init executor-server
initEmbedServer(address, ip, port, appname, accessToken);
}
任務(wù)調(diào)度
admin的核心功能
public void init() throws Exception {
//初始化fastTriggerPool和slowTriggerPool線程池對(duì)象
JobTriggerPoolHelper.toStart();
/**
* 開啟線程,每90s查詢執(zhí)行器的數(shù)據(jù),如果執(zhí)行器上次更新時(shí)間超過90s未更新,就移除這個(gè)執(zhí)行器,并把存活的執(zhí)行器更新
*/
JobRegistryHelper.getInstance().start();
/**
* 啟動(dòng)線程,查找任務(wù)執(zhí)行失敗的任務(wù),
* 1.設(shè)置了重試次數(shù),就再次觸發(fā)任務(wù)
* 2.判斷是否需郵件預(yù)警
*/
JobFailMonitorHelper.getInstance().start();
/**
* 啟動(dòng)線程,處理任務(wù)結(jié)果丟失的數(shù)據(jù)
* 任務(wù)結(jié)果丟失處理:調(diào)度記錄停留在 "運(yùn)行中" 狀態(tài)超過10min,且對(duì)應(yīng)執(zhí)行器心跳注冊(cè)失敗不在線,則將本地調(diào)度主動(dòng)標(biāo)記失??;
*/
JobCompleteHelper.getInstance().start();
JobLogReportHelper.getInstance().start();
/**
* 啟動(dòng)線程,執(zhí)行任務(wù)
*/
JobScheduleHelper.getInstance().start();
logger.info(">>>>>>>>> init xxl-job admin success.");
}
任務(wù)查詢
-
獲取數(shù)據(jù)庫的 lock:
preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
-
找到待調(diào)度的任務(wù)
List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
-
下發(fā)任務(wù):
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
public void start(){
// schedule thread
scheduleThread = new Thread(new Runnable() {
@Override
public void run() {
try {
TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );
} catch (InterruptedException e) {
if (!scheduleThreadToStop) {
logger.error(e.getMessage(), e);
}
}
logger.info(">>>>>>>>> init xxl-job admin scheduler success.");
// pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)
int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;
while (!scheduleThreadToStop) {
// Scan Job
long start = System.currentTimeMillis();
Connection conn = null;
Boolean connAutoCommit = null;
PreparedStatement preparedStatement = null;
boolean preReadSuc = true;
try {
conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
connAutoCommit = conn.getAutoCommit();
conn.setAutoCommit(false);
preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
preparedStatement.execute();
// tx start
// 1、 查詢從當(dāng)前時(shí)間+5秒內(nèi)要執(zhí)行的任務(wù)
long nowTime = System.currentTimeMillis();
List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
if (scheduleList!=null && scheduleList.size()>0) {
// 2、push time-ring
for (XxlJobInfo jobInfo: scheduleList) {
// time-ring jump todo 如果任務(wù)超時(shí)5秒
if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {
// 2.1、trigger-expire > 5s:pass && make next-trigger-time
logger.warn(">>>>>>>>>>> xxl-job, schedule misfire, jobId = " + jobInfo.getId());
// 1、misfire match
MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);
if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {
// FIRE_ONCE_NOW 》 trigger todo 如果任務(wù)配置的"調(diào)度過期策略"是"立即執(zhí)行一次",那么就觸發(fā)一次任務(wù)
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);
logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
}
// 2、fresh next todo 從當(dāng)前時(shí)間開始,計(jì)算任務(wù)的下一次執(zhí)行時(shí)間
refreshNextValidTime(jobInfo, new Date());
} else if (nowTime > jobInfo.getTriggerNextTime()) {
// 2.2、trigger-expire < 5s:direct-trigger && make next-trigger-time todo 任務(wù)執(zhí)行時(shí)間在當(dāng)前時(shí)間的5s內(nèi)
// 1、trigger 觸發(fā)任務(wù)
JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
logger.debug(">>>>>>>>>>> xxl-job, schedule push trigger : jobId = " + jobInfo.getId() );
// 2、fresh next todo 從當(dāng)前時(shí)間開始,計(jì)算任務(wù)的下一次執(zhí)行時(shí)間
refreshNextValidTime(jobInfo, new Date());
// next-trigger-time in 5s, pre-read again todo 如果下一次的執(zhí)行時(shí)間在未來5s內(nèi)
if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {
// 1、make ring second
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
// 2、push time ring
pushTimeRing(ringSecond, jobInfo.getId());
// 3、fresh next
refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
}
} else {
// 還沒有到達(dá)任務(wù)執(zhí)行的時(shí)間
// 2.3、trigger-pre-read:time-ring trigger && make next-trigger-time
// 1 計(jì)算剩余的秒數(shù)字
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
// 2、把剩余秒數(shù)--任務(wù)id存入map中 ;==>下面的 ringThread 線程,會(huì)每一秒執(zhí)行一次,查到對(duì)應(yīng)的數(shù)據(jù)后,觸發(fā)任務(wù)
pushTimeRing(ringSecond, jobInfo.getId());
// 3、重新計(jì)算下一次調(diào)度時(shí)間
refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
}
}
// 3、update trigger info TODO 修改jonInfo的內(nèi)容
for (XxlJobInfo jobInfo: scheduleList) {
XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
}
} else {
preReadSuc = false;
}
任務(wù)下發(fā)
-
初始化下發(fā)參數(shù),
triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
-
找到需要下發(fā)的地址:
routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
-
路由算法
-
調(diào)用執(zhí)行器接口:·XxlJobRemotingUtil.postBody(addressUrl + “run”, accessToken, timeout, triggerParam, String.class);·
private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){
// param
ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION); // block strategy
ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null); // route strategy
String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null;
XxlJobLog jobLog = new XxlJobLog();
jobLog.setJobGroup(jobInfo.getJobGroup());
jobLog.setJobId(jobInfo.getId());
jobLog.setTriggerTime(new Date());
XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);
logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());
// 初始化下發(fā)的參數(shù)
TriggerParam triggerParam = new TriggerParam();
triggerParam.setJobId(jobInfo.getId());
triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
/ 代碼省略
// 找到需要下發(fā)的地址
String address = null;
ReturnT<String> routeAddressResult = null;
if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {
// 廣播地址處理
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {
if (index < group.getRegistryList().size()) {
address = group.getRegistryList().get(index);
} else {
address = group.getRegistryList().get(0);
}
} else {
// 根據(jù)路由找到地址
routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {
address = routeAddressResult.getContent();
}
}
} else {
routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));
}
// 4、trigger remote executor
ReturnT<String> triggerResult = null;
if (address != null) {
// 調(diào)用執(zhí)行器
//
triggerResult = runExecutor(triggerParam, address);
} else {
triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);
}
/ 代碼省略
logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
調(diào)用執(zhí)行器接口
public ReturnT<String> run(TriggerParam triggerParam) {
return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class);
}
任務(wù)執(zhí)行
admin 會(huì)調(diào)用執(zhí)行器的 addressUrl + “run”,這個(gè)接口主要用來觸發(fā)任務(wù)執(zhí)行的
// services mapping
try {
switch (uri) {
case "/beat":
// 類似心跳接口
return executorBiz.beat();
case "/idleBeat":
IdleBeatParam idleBeatParam = GsonTool.fromJson(requestData, IdleBeatParam.class);
return executorBiz.idleBeat(idleBeatParam);
case "/run":
TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
return executorBiz.run(triggerParam);
case "/kill":
KillParam killParam = GsonTool.fromJson(requestData, KillParam.class);
return executorBiz.kill(killParam);
case "/log":
LogParam logParam = GsonTool.fromJson(requestData, LogParam.class);
return executorBiz.log(logParam);
default:
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping(" + uri + ") not found.");
}
} catch (Exception e) {
logger.error(e.getMessage(), e);
return new ReturnT<String>(ReturnT.FAIL_CODE, "request error:" + ThrowableUtil.toString(e));
}
- 這里面主要做下面幾件事情
- 根據(jù)任務(wù)類型,創(chuàng)建不同的 jobHandler
- 創(chuàng)建一個(gè)線程執(zhí)行 jobHandler,監(jiān)聽參數(shù)隊(duì)列
- 將任務(wù)參數(shù)給到 參數(shù)隊(duì)列,jobHandler線程 獲取參數(shù)
public ReturnT<String> run(TriggerParam triggerParam) {
// load old:jobHandler + jobThread TODO 查詢這個(gè)任務(wù)的線程,第一次執(zhí)行任務(wù),是沒有這個(gè)任務(wù)的
JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
String removeOldReason = null;
// 根據(jù)任務(wù)類型,創(chuàng)建不同的 jobHandler
GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
if (GlueTypeEnum.BEAN == glueTypeEnum) {
// 創(chuàng)建 jobhandler,備注,執(zhí)行器在啟動(dòng)的時(shí)候,會(huì)掃描@xxlJob注解修飾的方法,注冊(cè)到map中,這里直接取出來,IJobHandler就是對(duì)jdk的method對(duì)象的封裝
IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());
if (jobThread!=null && jobHandler != newJobHandler) {
// change handler, need kill old thread
removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";
jobThread = null;
jobHandler = null;
}
if (jobHandler == null) {
jobHandler = newJobHandler;
if (jobHandler == null) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");
}
}
} else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {
// valid old jobThread
if (jobThread != null &&
!(jobThread.getHandler() instanceof GlueJobHandler
&& ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
// change handler or gluesource updated, need kill old thread
removeOldReason = "change job source or glue type, and terminate the old job thread.";
jobThread = null;
jobHandler = null;
}
if (jobHandler == null) {
try {
IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource());
jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime());
} catch (Exception e) {
logger.error(e.getMessage(), e);
return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());
}
}
}
// 判斷堵塞策略
if (jobThread != null) {
ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
// DISCARD_LATER,不能直接執(zhí)行的話,就直接退出
if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
// discard when running
if (jobThread.isRunningOrHasQueue()) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
}
} else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
//替換一個(gè)新的 jobThread
if (jobThread.isRunningOrHasQueue()) {
removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();
jobThread = null;
}
} else {
// just queue trigger
}
}
// jobThread 為null的話,就創(chuàng)建一個(gè),并執(zhí)行線程,是一個(gè)死循環(huán),從 triggerQueue中讀取參數(shù),并執(zhí)行
if (jobThread == null) {
// 這里會(huì)stop 掉之前舊的存在的 jobThread
jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
}
// 把參數(shù)放在任務(wù)隊(duì)列線程中的 triggerQueue 中
// pushTriggerQueue 也會(huì)判斷是否是重復(fù)執(zhí)行的 job
ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
return pushResult;
}
結(jié)果返回
- 將結(jié)果給:
TriggerCallbackThread.pushCallBack
-
TriggerCallbackThread
消費(fèi)返回的結(jié)果 -
admin
接受返回結(jié)果,寫入到DB
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
triggerParam.getLogId(),
triggerParam.getLogDateTime(),
XxlJobContext.getXxlJobContext().getHandleCode(),
XxlJobContext.getXxlJobContext().getHandleMsg() )
拿到任務(wù)的執(zhí)行結(jié)果
@Override
public void run() {
// normal callback
while(!toStop){
try {
// 拿到任務(wù)的執(zhí)行結(jié)果
HandleCallbackParam callback = getInstance().callBackQueue.take();
if (callback != null) {
// callback list param
List<HandleCallbackParam> callbackParamList = new ArrayList<HandleCallbackParam>();
int drainToNum = getInstance().callBackQueue.drainTo(callbackParamList);
callbackParamList.add(callback);
// callback, will retry if error
if (callbackParamList!=null && callbackParamList.size()>0) {
// 調(diào)用 admin 接口:api/callback 給 admin
doCallback(callbackParamList);
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
}
// 代碼省略
}
});
總結(jié)
一個(gè)任務(wù)的生命周期
不足之處
- 待調(diào)度的任務(wù)存儲(chǔ)在一張表中,如果待調(diào)度任務(wù)過多的時(shí)候,可能會(huì)造成任務(wù)調(diào)度延遲
- 只能選擇一個(gè)調(diào)度策略
- 執(zhí)行器服務(wù)是優(yōu)雅關(guān)閉,會(huì)自動(dòng)調(diào)用 /reigstRomeve方法告訴admin,自己移除,admin會(huì)通過定時(shí)任務(wù)掃描,每90秒掃描一次執(zhí)行器上次注冊(cè)到時(shí)間,如果超過90s,就主動(dòng)移除這個(gè)執(zhí)行器異常退出,某些策略會(huì)導(dǎo)致任務(wù)在90s內(nèi)一直失?。ū热绲谝粋€(gè))
- 分片策略,這個(gè)策略是所有的機(jī)器都執(zhí)行相同的參數(shù),由執(zhí)行器自己區(qū)分
一個(gè)任務(wù)調(diào)度系統(tǒng)核心點(diǎn)
- 數(shù)據(jù)量大時(shí)保證調(diào)度時(shí)間:分庫分表
- 任務(wù)丟失處理:調(diào)度器定時(shí)掃描執(zhí)行時(shí)間過長的任務(wù)
// 任務(wù)結(jié)果丟失處理:調(diào)度記錄停留在 "運(yùn)行中" 狀態(tài)超過10min,且對(duì)應(yīng)執(zhí)行器心跳注冊(cè)失敗不在線,則將本地調(diào)度主動(dòng)標(biāo)記失?。?/span>
Date losedTime = DateUtil.addMinutes(new Date(), -10);
List<Long> losedJobIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findLostJobIds(losedTime);
- 任務(wù)冪等性:不能重復(fù)執(zhí)行,在執(zhí)行器記錄現(xiàn)在有哪一些任務(wù)在執(zhí)行, 但是,執(zhí)行器在執(zhí)行業(yè)務(wù)代碼時(shí)也建議最好冪等
public ReturnT<String> pushTriggerQueue(TriggerParam triggerParam) {
// triggerLogIdSet 記錄了當(dāng)前執(zhí)行器正在執(zhí)行的任務(wù),發(fā)現(xiàn)重復(fù)時(shí)不會(huì)寫入到 triggerQueue 中
if (triggerLogIdSet.contains(triggerParam.getLogId())) {
logger.info(">>>>>>>>>>> repeate trigger job, logId:{}", triggerParam.getLogId());
return new ReturnT<String>(ReturnT.FAIL_CODE, "repeate trigger job, logId:" + triggerParam.getLogId());
}
triggerLogIdSet.add(triggerParam.getLogId());
triggerQueue.add(triggerParam);
return ReturnT.SUCCESS;
}
- 調(diào)度策略:解決如何分配到哪一個(gè)執(zhí)行器問題,實(shí)現(xiàn)負(fù)載均衡,分片處理,高容錯(cuò),節(jié)點(diǎn)故障轉(zhuǎn)移作業(yè)
- 實(shí)現(xiàn)負(fù)載均衡的一個(gè)需要執(zhí)行器上報(bào)當(dāng)前的狀態(tài):當(dāng)前的任務(wù)執(zhí)行數(shù)量,待調(diào)度的任務(wù)數(shù)量,讀取服務(wù)器系統(tǒng)負(fù)載水平,根據(jù)這些,選擇一個(gè)綜合數(shù)值最小的執(zhí)行器出來
- 分片處理:分片處理通常需要調(diào)度器支持
- java 實(shí)現(xiàn)分片
private static String calculatedExecutorParamValue(XxlJobInfo jobInfo,Integer index,Integer adminTotal) {
String executorParam = jobInfo.getExecutorParam();
if(StringUtils.isEmpty(executorParam)){
return null;
}
try {
List<String> paramList = JSONObject.parseArray(executorParam, String.class);
if(CollectionUtils.isEmpty(paramList)){
return null;
}
List<List<String>> paramAverageList = averageAssign(paramList, adminTotal);
List<String> result = paramAverageList.get(index);
return JSON.toJSONString(result);
}catch (Exception e){
logger.error("分片并行計(jì)算,解析參數(shù)錯(cuò)誤,參數(shù):{},錯(cuò)誤原因:{}",executorParam,e.getMessage());
}
return null;
}
public static <T> List <List<T>> averageAssign(List<T>source,int n){
List <List<T>> result=new ArrayList<List<T>>();
int remainder=source.size()%n; //先計(jì)算出余數(shù)
int number=source.size()/n; //然后是商
int offset=0;//偏移量(用以標(biāo)識(shí)加的余數(shù))
for(int i=0;i<n;i++){
List<T>value;
if(remainder>0){
value=source.subList(i*number+offset, (i+1)*number+offset+1);
remainder--;
offset++;
}else{
value=source.subList(i*number+offset, (i+1)*number+offset);
}
result.add(value);
}
return result;
}
- go 實(shí)現(xiàn)分片
package main
import (
"fmt"
)
func chunk(slice []int, size int) [][]int {
var chunks [][]int
chunkSize := (len(slice) + size - 1) / size
for i := 0; i < len(slice); i += chunkSize {
end := i + chunkSize
if end > len(slice) {
end = len(slice)
}
chunks = append(chunks, slice[i:end])
}
return chunks
}
func main() {
slice := []int{1, 2, 3, 4, 5}
chunks := chunk(slice, 2)
fmt.Println(chunks)
}
任務(wù)調(diào)度依賴:工作流模式調(diào)度依賴,一個(gè)有向無環(huán)圖:DAG(directed acyclic graph)
文章來源:http://www.zghlxwxcb.cn/news/detail-421355.html
文章來源地址http://www.zghlxwxcb.cn/news/detail-421355.html
package main
import (
"fmt"
"sync"
"time"
)
//圖結(jié)構(gòu)
type DAG struct {
Vertexs []*Vertex
}
//頂點(diǎn)
type Vertex struct {
Key string
Value interface{}
Parents []*Vertex
Children []*Vertex
}
//添加頂點(diǎn)
func (dag *DAG) AddVertex(v *Vertex) {
dag.Vertexs = append(dag.Vertexs, v)
}
//添加邊
func (dag *DAG) AddEdge(from, to *Vertex) {
from.Children = append(from.Children, to)
to.Parents = append(to.Parents, from)
}
func main() {
var dag = &DAG{}
//添加頂點(diǎn)
va := &Vertex{Key: "a", Value: "1"}
vb := &Vertex{Key: "b", Value: "2"}
vc := &Vertex{Key: "c", Value: "3"}
vd := &Vertex{Key: "d", Value: "4"}
ve := &Vertex{Key: "e", Value: "5"}
vf := &Vertex{Key: "f", Value: "6"}
vg := &Vertex{Key: "g", Value: "7"}
vh := &Vertex{Key: "h", Value: "8"}
vi := &Vertex{Key: "i", Value: "9"}
//添加邊
dag.AddEdge(va, vb)
dag.AddEdge(va, vc)
dag.AddEdge(va, vd)
dag.AddEdge(vb, ve)
dag.AddEdge(vb, vh)
dag.AddEdge(vb, vf)
dag.AddEdge(vc, vf)
dag.AddEdge(vc, vg)
dag.AddEdge(vd, vg)
dag.AddEdge(vh, vi)
dag.AddEdge(ve, vi)
dag.AddEdge(vf, vi)
dag.AddEdge(vg, vi)
//[1] [] { a }
//[2] [] { b, c, d }
//[3] [] { h, e, f, g }
//[4] [] { i }
all := LayerBFS(va)
startTime := time.Now()
for _, layer := range all {
fmt.Println("------------------")
doTasks(layer)
}
fmt.Printf("cost:%f\n", time.Since(startTime).Seconds())
}
type Queue []interface{}
func (q *Queue) Push(x interface{}) {
*q = append(*q, x)
}
func (q *Queue) Pop() interface{} {
h := *q
var el interface{}
l := len(h)
el, *q = h[0], h[1:l]
return el
}
func (q *Queue) Len() int {
return len(*q)
}
func NewQueue() *Queue {
return &Queue{}
}
func LayerBFS(root *Vertex) [][]*Vertex {
queue := NewQueue()
queue.Push(root)
visited := make(map[string]*Vertex)
all := make([][]*Vertex, 0)
for queue.Len() > 0 {
qSize := queue.Len()
tmp := make([]*Vertex, 0)
for i := 0; i < qSize; i++ {
//pop vertex
e := queue.Pop()
currVert := e.(*Vertex)
if _, ok := visited[currVert.Key]; ok {
continue
}
visited[currVert.Key] = currVert
tmp = append(tmp, currVert)
for _, val := range currVert.Children {
if _, ok := visited[val.Key]; !ok {
queue.Push(val) //add child
}
}
}
all = append(all, [][]*Vertex{tmp}...)
}
return all
}
//并發(fā)執(zhí)行
func doTasks(vertexs []*Vertex) {
var wg sync.WaitGroup
startTime := time.Now()
for _, v := range vertexs {
wg.Add(1)
go func(v *Vertex) {
defer wg.Done()
time.Sleep(2 * time.Second)
fmt.Printf("do %v, result is %v \n", v.Key, v.Value)
}(v) //notice
}
wg.Wait()
fmt.Printf("cost:%0.0f\n", time.Since(startTime).Seconds())
}
文檔參考
- 實(shí)現(xiàn)一個(gè)任務(wù)調(diào)度系統(tǒng),看這篇就夠了
- 伴魚分布式調(diào)度系統(tǒng) Jarvis 的設(shè)計(jì)與實(shí)現(xiàn)
- 如何設(shè)計(jì)一個(gè)分布式任務(wù)調(diào)度系統(tǒng)
到了這里,關(guān)于分布式任務(wù)調(diào)度系統(tǒng)分析的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!