1. xxl-job基本介紹
1.1 Quartz的體系結(jié)構(gòu)
Quartz中最重要的三個(gè)對象:Job(作業(yè))、Trigger(觸發(fā)器)、Scheduler(調(diào)度器)。
xxl-job的調(diào)度原理:調(diào)度線程在一個(gè)while循環(huán)中不斷地獲取一定數(shù)量的即將觸發(fā)的Trigger,拿到綁定的Job,包裝成工作線程執(zhí)行。
當(dāng)然,不管在任何調(diào)度系統(tǒng)中,底層都是線程模型。如果要自己寫一個(gè)調(diào)度系統(tǒng),一定要對多線程并發(fā)這一塊有比較深入的學(xué)習(xí),比如線程怎么啟動(dòng)怎么wait,怎么notify ,怎么加鎖等等。
1.1. Quartz的不足
Quartz有差不多二十年的歷史,調(diào)度模型已經(jīng)非常成熟了,而且很容易集成到Spring中去,用來執(zhí)行業(yè)務(wù)任務(wù)是一個(gè)很好的選擇。
但是還是會(huì)有一些問題,比如:
1、調(diào)度邏輯(Scheduler)和任務(wù)類耦合在同一個(gè)項(xiàng)目中,隨著調(diào)度任務(wù)數(shù)量逐漸增多,同時(shí)調(diào)度任務(wù)邏輯逐漸加重,調(diào)度系統(tǒng)的整體性能會(huì)受到很大的影響;
2、Quartz集群的節(jié)點(diǎn)之間負(fù)載結(jié)果是隨機(jī)的,誰搶到了數(shù)據(jù)庫鎖就由誰去執(zhí)行任務(wù),這就有可能出現(xiàn)旱的旱死,澇的澇死的情況,發(fā)揮不了機(jī)器的性能。
3、Quartz本身沒有提供動(dòng)態(tài)調(diào)度和管理界面的功能,需要自己根據(jù)API進(jìn)行開發(fā)。
4、Quartz的日志記錄、數(shù)據(jù)統(tǒng)計(jì)、監(jiān)控不是特別完善。
所以xxl-job和Elastic-Job都是對Quartz進(jìn)行了封裝,讓用起來更簡單,功能更強(qiáng)大。
1.2. xxl-job發(fā)展歷史
源碼地址:https://github.com/xuxueli/xxl-job
中文文檔:https://www.xuxueli.com/xxl-job/
2015年開源,一個(gè)大眾點(diǎn)評的程序員的業(yè)余之作。眾所周知,大眾點(diǎn)評因?yàn)楸幻缊F(tuán)收購了,現(xiàn)在是美團(tuán)點(diǎn)評。
xxl是作者名字許雪里的首字母簡寫,除了xxl-job之外作者還開源了很多其他組件,現(xiàn)在一共有11個(gè)開源項(xiàng)目。
到目前為止使用xxl-job的公司有幾百家,算上那些沒有登記的公司,實(shí)際上應(yīng)該有幾千家。
在xxl-job早期的版本中,直接使用了Quartz的調(diào)度模型,直到2019年7月7日發(fā)布的7.27 版本才移除Quartz依賴。
實(shí)際上即使重構(gòu)代碼移除了Quartz的依賴,xxl-job中也到處是Quartz的影子。比如任務(wù)、調(diào)度器、觸發(fā)器的三個(gè)維度設(shè)計(jì),是非常經(jīng)典的。
最新發(fā)布版本是:2.3.1。
但是后面自從2.2.0版本開始,版本更新幾乎沒有什么變化
1.3. xxl-job特性
跟老牌的Quartz相比,xxl-job擁有更加豐富的功能。
總體上可以分成三類:
性能的提升:可以調(diào)度更多的任務(wù)。
可靠性的提升:任務(wù)超時(shí)、失敗、故障轉(zhuǎn)移的處理。
運(yùn)維更加便捷:提供操作界面、有用戶權(quán)限、詳細(xì)的日志、提供通知配置、自動(dòng)生成報(bào)表等等。
2. Xxl-job快速入門
1.1. 下載源碼
1.1.1. release頁面下載
https://github.com/xuxueli/xxl-job/releases
https://gitee.com/xuxueli0323/xxl-job
注意不要直接clone最新的master代碼(SNAPSHOT版本),從發(fā)布界面下載穩(wěn)定版本。
1.1.2. 在IDEA中打開
- /doc :文檔資料,包括“調(diào)度數(shù)據(jù)庫”建表腳本
- /xxl-job-admin :調(diào)度中心,項(xiàng)目源碼,Spring Boot工程,可以直接啟動(dòng)
- /xxl-job-core :公共Jar依賴
- /xxl-job-executor-samples :執(zhí)行器,Sample示例項(xiàng)目,其中的Spring Boot工程,可以直接啟動(dòng)??梢栽谠擁?xiàng)目上進(jìn)行開發(fā),也可以將現(xiàn)有項(xiàng)目改造生成執(zhí)行器項(xiàng)目。
1.2. 初始化數(shù)據(jù)庫
數(shù)據(jù)庫腳本在doc/db目錄下:
生成8張表。
表名 | 作用 |
---|---|
xxl_job_group | 執(zhí)行器信息表,維護(hù)任務(wù)執(zhí)行器信息 |
xxl_job_info | 調(diào)度擴(kuò)展信息表:用于保存XXL-JOB調(diào)度任務(wù)的擴(kuò)展信息,如任務(wù)分組、任務(wù)名、機(jī)器地址、執(zhí)行器、執(zhí)行入?yún)⒑蛨?bào)警郵件等等 |
xxl_job_lock | 任務(wù)調(diào)度鎖表 |
xxl_job_log | 調(diào)度日志表:用于保存XXL-JOB任務(wù)調(diào)度的歷史信息,如調(diào)度結(jié)果、執(zhí)行結(jié)果、調(diào)度入?yún)?、調(diào)度機(jī)器和執(zhí)行器等等 |
xxl_job_log_report | 調(diào)度日志報(bào)表:用戶存儲(chǔ)XXL-JOB任務(wù)調(diào)度日志的報(bào)表,調(diào)度中心報(bào)表功能頁面會(huì)用到 |
xxl_job_logglue | 任務(wù)GLUE日志:用于保存GLUE更新歷史,用于支持GLUE的版本回溯功能 |
xxl_job_registry | 執(zhí)行器注冊表,維護(hù)在線的執(zhí)行器和調(diào)度中心機(jī)器地址信息 |
xxl_job_user | 系統(tǒng)用戶表 |
表初始化好以后,就可以配置代碼工程了。這里先說一下總體概念。
xxl-job的調(diào)度器和業(yè)務(wù)執(zhí)行是獨(dú)立的。調(diào)度器(調(diào)度線程)決定任務(wù)的調(diào)度,并且通過HTTP的方式調(diào)用執(zhí)行器接口執(zhí)行任務(wù)。
所以在這里需要先配置至少一個(gè)調(diào)度中心,運(yùn)行起來,也可以集群部署。然后再配置至少一個(gè)執(zhí)行器,運(yùn)行起來,同樣可以集群部署。
1.3. 配置調(diào)度中心
調(diào)度中心是任務(wù)的指揮中心,可以有多個(gè)實(shí)例。
1.1.1. 修改配置
/xxl-job/xxl-job-admin/src/main/resources/application.properties
檢查各項(xiàng)配置,主要是端口,數(shù)據(jù)庫。
默認(rèn)用戶名admin,密碼 123456。
可以顯式加上一行配置:
xxl.job.login.username=admin
xxl.job.login.password=123456
1.1.2. 編譯打包
如果通過jar包方式部署運(yùn)行,需要先編譯打包。在xxl-job-admin目錄下執(zhí)行命令:
mvn package -Dmaven.test.skip=true
1.1.3. 啟動(dòng)工程
運(yùn)行調(diào)度中心,xxl-job\xxl-job-admin\target目錄下
java -jar xxl-job-admin-2.2.1-SNAPSHOT.jar
或者直接運(yùn)行Spring Boot根目錄下的XxlJobAdminApplication啟動(dòng)類。調(diào)度器是帶界面的,訪問:http://127.0.0.1:8080/xxl-job-admin
主要功能:
為了保證可用性,調(diào)度中心可以做集群部署,需要滿足幾個(gè)條件:
· DB配置保持一致;
· 集群機(jī)器時(shí)鐘保持一致(單機(jī)集群忽視);
· 建議:推薦通過nginx為調(diào)度中心集群做負(fù)載均衡,分配域名。調(diào)度中心訪問、執(zhí)行器回調(diào)配置、調(diào)用API服務(wù)等操作均通過該域名進(jìn)行。
1.4. 創(chuàng)建執(zhí)行器
執(zhí)行器負(fù)責(zé)任務(wù)的具體執(zhí)行,分配線程。執(zhí)行器需要注冊到調(diào)度中心,這樣調(diào)度器才知道怎么選擇執(zhí)行器,或者說做路由。執(zhí)行器的執(zhí)行結(jié)果,也需要通過回調(diào)的方式通知調(diào)度器。
xxl-job提供了6個(gè)執(zhí)行器的demo,非常地貼心,這里選用Spring Boot。
1.1.1. 修改配置
/xxl-job/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/resources/application.properties
主要是修改了日志目錄,還有admin的端口號。
xxl.job.admin.addresses=http://127.0.0.1:7391/xxl-job-admin
xxl.job.executor.logpath=E:/dev_logs/xxl-job/jobhandler
配置類會(huì)在com.xxl.job.executor.core.config.XxlJobConfig用到。
1.1.1. 編譯打包
如果通過jar包方式部署運(yùn)行,需要先編譯打包。
在xxl-job\xxl-job-executor-samples\xxl-job-executor-sample-springboot目錄下:
mvn package -Dmaven.test.skip=true
如果這個(gè)目錄無法打包,就在根目錄下打包。
1.1.2. 啟動(dòng)工程
運(yùn)行執(zhí)行器,上述路徑的target目錄下:
java -jar xxl-job-admin-2.2.1-SNAPSHOT.jar
或者直接運(yùn)行XxlJobExecutorApplication啟動(dòng)類。
可以做集群部署, 這兩項(xiàng)配置要一致:
xxl.job.admin.addresses=
xxl.job.executor.appname=
1.1. 添加任務(wù)
指揮官有了,工人也有了,下面就可以派事情給工人做了,也就是創(chuàng)建任務(wù)。
登錄調(diào)度中心,打開任務(wù)管理界面,新增任務(wù)。
1.1.1. 路由策略
路由策略指的是一個(gè)任務(wù)選擇哪個(gè)執(zhí)行器去執(zhí)行,Quartz只能隨機(jī)負(fù)載。當(dāng)執(zhí)行器做集群部署的時(shí)候才有意義。Xxl-job提供了豐富的路由策略,包括:
策略 | 參數(shù)值 | 詳細(xì)含義 |
---|---|---|
第一個(gè) | FIRST | 固定選擇第一個(gè)機(jī)器 |
最后一個(gè) | LAST | 固定選擇最后一個(gè)機(jī)器 |
輪詢 | ROUND | 依次選擇執(zhí)行 |
隨機(jī) | RANDOM | 隨機(jī)選擇在線的機(jī)器 |
一致性HASH | CONSISTENT_HASH | 每個(gè)任務(wù)按照Hash算法固定選擇某一臺機(jī)器,且所有任務(wù)均勻散列在不同機(jī)器上 |
最不經(jīng)常使用 | LEAST_FREQUENTLY_USED | 使用頻率最低的機(jī)器優(yōu)先被選舉 |
最近最久未使用 | LEAST_RECENTLY_USED | 最久未使用的機(jī)器優(yōu)先被選舉 |
故障轉(zhuǎn)移 | FAILOVER | 按照順序依次進(jìn)行心跳檢測,第一個(gè)心跳檢測成功的機(jī)器選定為目標(biāo)執(zhí)行器并發(fā)起調(diào)度 |
忙碌轉(zhuǎn)移 | BUSYOVER | 按照順序依次進(jìn)行空閑檢測,第一個(gè)空閑檢測成功的機(jī)器選定為目標(biāo)執(zhí)行器并發(fā)起調(diào)度 |
分片廣播 | SHARDING_BROADCAST | 廣播觸發(fā)對應(yīng)集群中所有機(jī)器執(zhí)行一次任務(wù),同時(shí)系統(tǒng)自動(dòng)傳遞分片參數(shù);可根據(jù)分片參數(shù)開發(fā)分片任務(wù) |
1.1.2. 運(yùn)行模式
在xxl-job中,不僅支持運(yùn)行預(yù)先編寫好的任務(wù)類,還可以直接輸入代碼或者腳本運(yùn)行(上代碼都不用審核了??)。
運(yùn)行任務(wù)類,這種方式就叫做BEAN模式,需要指定任務(wù)類,這個(gè)任務(wù)類就叫做JobHandler,是在執(zhí)行器端編寫的。
運(yùn)行代碼或者腳本,叫做GLUE模式,支持Java、Shell、Python、PHP、Nodejs、PowerShell,這個(gè)時(shí)候代碼是直接維護(hù)在調(diào)度器這邊的。
1.1.3. 阻塞處理策略
阻塞處理策略,指的是任務(wù)的一次運(yùn)行還沒有結(jié)束的時(shí)候,下一次調(diào)度的時(shí)間又到了,這個(gè)時(shí)候怎么處理。
策略 | 參數(shù)值 | 詳細(xì)含義 |
---|---|---|
單機(jī)串行,默認(rèn) | SERIAL_EXECUTION | 調(diào)度請求進(jìn)入單機(jī)執(zhí)行器后,調(diào)度請求進(jìn)入FIFO隊(duì)列并以串行方式運(yùn)行 |
丟棄后續(xù)調(diào)度 | DISCARD_LATER | 調(diào)度請求進(jìn)入單機(jī)執(zhí)行器后,發(fā)現(xiàn)執(zhí)行器存在運(yùn)行的調(diào)度任務(wù),本次請求將會(huì)被丟棄并標(biāo)記為失敗 |
覆蓋之前調(diào)度 | COVER_EARLY | 調(diào)度請求進(jìn)入單機(jī)執(zhí)行器后,發(fā)現(xiàn)執(zhí)行器存在運(yùn)行的調(diào)度任務(wù),將會(huì)終止運(yùn)行中的調(diào)度任務(wù)并清空隊(duì)列,然后運(yùn)行本地調(diào)度任務(wù) |
1、SERIAL_EXECUTION(單機(jī)串行,默認(rèn)):對當(dāng)前線程不做任何處理,并在當(dāng)前線程的隊(duì)列里增加一個(gè)執(zhí)行任務(wù)(一次只執(zhí)行一個(gè)任務(wù))。
2、DISCARD_LATER(丟棄后續(xù)調(diào)度):如果當(dāng)前線程阻塞,后續(xù)任務(wù)不再執(zhí)行,直接返回失?。ㄗ枞筒辉賵?zhí)行了)。
3、COVER_EARLY(覆蓋之前調(diào)度):創(chuàng)建一個(gè)移除原因,新建一個(gè)線程去執(zhí)行后續(xù)任務(wù)(殺掉當(dāng)前線程)。
1.1.4. 子任務(wù)
如果需要在本任務(wù)執(zhí)行結(jié)束并且執(zhí)行成功的時(shí)候觸發(fā)另外一個(gè)任務(wù),那么就可以把另外的任務(wù)作為本任務(wù)的子任務(wù)運(yùn)行。
因?yàn)槊總€(gè)任務(wù)都擁有一個(gè)唯一的任務(wù)ID(任務(wù)ID可以從任務(wù)列表獲取),只需要把JobId填上就可以了。
比如:下載對賬文件的任務(wù)成功以后,開始解析文件入庫。入庫成功以后,開始對賬。
這樣,多個(gè)任務(wù)就實(shí)現(xiàn)了串行調(diào)度。
1.2. 任務(wù)操作
1. Xxl-job 任務(wù)詳解
1.1. Spring Boot任務(wù)類型
com.xxl.job.executor.service.jobhandler.SampleXxlJob
開發(fā)步驟:
1、在Spring Bean實(shí)例中,開發(fā)Job方法,方式格式要求為 “public ReturnT<String> execute(String param)”
2、為Job方法添加注解 “@XxlJob(value=“自定義jobhandler名稱”, init = “JobHandler初始化方法”, destroy = “JobHandler銷毀方法”)”,注解value值對應(yīng)的是調(diào)度中心新建任務(wù)的JobHandler屬性的值。
3、執(zhí)行日志:需要通過 "XxlJobLogger.log" 打印執(zhí)行日志;
**demoJobHandler:**簡單示例任務(wù),任務(wù)內(nèi)部模擬耗時(shí)任務(wù)邏輯,用戶可在線體驗(yàn)Rolling Log等功能;
**shardingJobHandler:**分片示例任務(wù),任務(wù)內(nèi)部模擬處理分片參數(shù),可參考熟悉分片任務(wù);
**commandJobHandler:**命令行任務(wù);
**httpJobHandler:**通用HTTP任務(wù)Handler;業(yè)務(wù)方只需要提供HTTP鏈接等信息即可,不限制語言、平臺。
/**
* 分片廣播任務(wù)
* 直接由路由策略--> 分片廣播
* @author xuxueli 2017-07-25 20:56:50
* 如果想實(shí)現(xiàn)分片策略可以在此業(yè)務(wù)類中進(jìn)行實(shí)現(xiàn)邏輯
*/
public class ShardingJobHandler extends IJobHandler {
@Override
public ReturnT<String> execute(String param) throws Exception {
// 分片參數(shù)
ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo();
XxlJobLogger.log("分片參數(shù):當(dāng)前分片序號 = {}, 總分片數(shù) = {}", shardingVO.getIndex(), shardingVO.getTotal());
// 業(yè)務(wù)邏輯
for (int i = 0; i < shardingVO.getTotal(); i++) {
if (i == shardingVO.getIndex()) {
XxlJobLogger.log("第 {} 片, 命中分片開始處理", i);
} else {
XxlJobLogger.log("第 {} 片, 忽略", i);
}
}
return SUCCESS;
}
}
/**
* 命令行任務(wù)
*
* @author xuxueli 2018-09-16 03:48:34
*/
public class CommandJobHandler extends IJobHandler {
@Override
public ReturnT<String> execute(String param) throws Exception {
String command = param;
int exitValue = -1;
BufferedReader bufferedReader = null;
try {
// command process
Process process = Runtime.getRuntime().exec(command);
BufferedInputStream bufferedInputStream = new BufferedInputStream(process.getInputStream());
bufferedReader = new BufferedReader(new InputStreamReader(bufferedInputStream));
// command log
String line;
while ((line = bufferedReader.readLine()) != null) {
XxlJobLogger.log(line);
}
// command exit
process.waitFor();
exitValue = process.exitValue();
} catch (Exception e) {
XxlJobLogger.log(e);
} finally {
if (bufferedReader != null) {
bufferedReader.close();
}
}
if (exitValue == 0) {
return IJobHandler.SUCCESS;
} else {
return new ReturnT<String>(IJobHandler.FAIL.getCode(), "command exit value("+exitValue+") is failed");
}
}
}
/**
* 跨平臺Http任務(wù)
*
* @author xuxueli 2018-09-16 03:48:34
*/
public class HttpJobHandler extends IJobHandler {
@Override
public ReturnT<String> execute(String param) throws Exception {
// param parse
if (param==null || param.trim().length()==0) {
XxlJobLogger.log("param["+ param +"] invalid.");
return ReturnT.FAIL;
}
String[] httpParams = param.split("\n");
String url = null;
String method = null;
String data = null;
for (String httpParam: httpParams) {
if (httpParam.startsWith("url:")) {
url = httpParam.substring(httpParam.indexOf("url:") + 4).trim();
}
if (httpParam.startsWith("method:")) {
method = httpParam.substring(httpParam.indexOf("method:") + 7).trim().toUpperCase();
}
if (httpParam.startsWith("data:")) {
data = httpParam.substring(httpParam.indexOf("data:") + 5).trim();
}
}
// param valid
if (url==null || url.trim().length()==0) {
XxlJobLogger.log("url["+ url +"] invalid.");
return ReturnT.FAIL;
}
if (method==null || !Arrays.asList("GET", "POST").contains(method)) {
XxlJobLogger.log("method["+ method +"] invalid.");
return ReturnT.FAIL;
}
// request
HttpURLConnection connection = null;
BufferedReader bufferedReader = null;
try {
// connection
URL realUrl = new URL(url);
connection = (HttpURLConnection) realUrl.openConnection();
// connection setting
connection.setRequestMethod(method);
connection.setDoOutput(true);
connection.setDoInput(true);
connection.setUseCaches(false);
connection.setReadTimeout(5 * 1000);
connection.setConnectTimeout(3 * 1000);
connection.setRequestProperty("connection", "Keep-Alive");
connection.setRequestProperty("Content-Type", "application/json;charset=UTF-8");
connection.setRequestProperty("Accept-Charset", "application/json;charset=UTF-8");
// do connection
connection.connect();
// data
if (data!=null && data.trim().length()>0) {
DataOutputStream dataOutputStream = new DataOutputStream(connection.getOutputStream());
dataOutputStream.write(data.getBytes("UTF-8"));
dataOutputStream.flush();
dataOutputStream.close();
}
// valid StatusCode
int statusCode = connection.getResponseCode();
if (statusCode != 200) {
throw new RuntimeException("Http Request StatusCode(" + statusCode + ") Invalid.");
}
// result
bufferedReader = new BufferedReader(new InputStreamReader(connection.getInputStream(), "UTF-8"));
StringBuilder result = new StringBuilder();
String line;
while ((line = bufferedReader.readLine()) != null) {
result.append(line);
}
String responseMsg = result.toString();
XxlJobLogger.log(responseMsg);
return ReturnT.SUCCESS;
} catch (Exception e) {
XxlJobLogger.log(e);
return ReturnT.FAIL;
} finally {
try {
if (bufferedReader != null) {
bufferedReader.close();
}
if (connection != null) {
connection.disconnect();
}
} catch (Exception e2) {
XxlJobLogger.log(e2);
}
}
}
}
2. xxl-job架構(gòu)設(shè)計(jì)
xxl-job跟Quartz特性和部署方式的不同,本質(zhì)上是因?yàn)榧軜?gòu)設(shè)計(jì)有著很大的區(qū)別。
1.1. 設(shè)計(jì)思想
1.1.1. 調(diào)度與任務(wù)解耦
在Quartz中,調(diào)度邏輯和任務(wù)代碼是耦合在一起的。
而xxl-job把調(diào)度的動(dòng)作抽象和獨(dú)立出來,形成“調(diào)度中心”公共平臺。調(diào)度中心只負(fù)責(zé)發(fā)起調(diào)度請求,平臺自身并不承擔(dān)業(yè)務(wù)邏輯。
將任務(wù)抽象成分散的JobHandler,交由“執(zhí)行器”統(tǒng)一管理,“執(zhí)行器”負(fù)責(zé)接收調(diào)度請求并執(zhí)行對應(yīng)的JobHandler中業(yè)務(wù)邏輯。
因此,“調(diào)度”和“任務(wù)”兩部分可以相互解耦,提高系統(tǒng)整體穩(wěn)定性和擴(kuò)展性。
1.1.2. 全異步化& 輕量級
全異步化設(shè)計(jì):XXL-JOB系統(tǒng)中業(yè)務(wù)邏輯在遠(yuǎn)程執(zhí)行器執(zhí)行,觸發(fā)流程全異步化設(shè)計(jì)。相比直接在調(diào)度中心內(nèi)部執(zhí)行業(yè)務(wù)邏輯,極大的降低了調(diào)度線程占用時(shí)間;
異步調(diào)度:調(diào)度中心每次任務(wù)觸發(fā)時(shí)僅發(fā)送一次調(diào)度請求,該調(diào)度請求首先推送“異步調(diào)度隊(duì)列”,然后異步推送給遠(yuǎn)程執(zhí)行器。
異步執(zhí)行:執(zhí)行器會(huì)將請求存入“異步執(zhí)行隊(duì)列”并且立即響應(yīng)調(diào)度中心,異步運(yùn)行。
輕量級設(shè)計(jì):XXL-JOB調(diào)度中心中每個(gè)JOB邏輯非常 “輕”,在全異步化的基礎(chǔ)上,單個(gè)JOB一次運(yùn)行平均耗時(shí)基本在 “10ms” 之內(nèi)(基本為一次請求的網(wǎng)絡(luò)開銷);因此,可以保證使用有限的線程支撐大量的JOB并發(fā)運(yùn)行;
得益于上述兩點(diǎn)優(yōu)化,理論上默認(rèn)配置下的調(diào)度中心,單機(jī)能夠支撐5000 任務(wù)并發(fā)運(yùn)行穩(wěn)定運(yùn)行;
實(shí)際場景中,由于調(diào)度中心與執(zhí)行器網(wǎng)絡(luò)ping延遲不同、DB讀寫耗時(shí)不同、任務(wù)調(diào)度密集程度不同,會(huì)導(dǎo)致任務(wù)量上限會(huì)上下波動(dòng)。
如若需要支撐更多的任務(wù)量,可以通過調(diào)大調(diào)度線程數(shù)、降低調(diào)度中心與執(zhí)行器ping延遲和提升機(jī)器配置幾種方式優(yōu)化。
1.1.3. 均衡調(diào)度
調(diào)度中心在集群部署時(shí)會(huì)自動(dòng)進(jìn)行任務(wù)平均分配,觸發(fā)組件每次獲取與線程池?cái)?shù)量(調(diào)度中心支持自定義調(diào)度線程池大?。┫嚓P(guān)數(shù)量的任務(wù),避免大量任務(wù)集中在單個(gè)調(diào)度中心集群節(jié)點(diǎn)。
1.2. 系統(tǒng)組成
整體上分為兩個(gè)模塊:
1.1.4. 調(diào)度模塊(調(diào)度中心)
負(fù)責(zé)管理調(diào)度信息,按照調(diào)度配置發(fā)出調(diào)度請求,自身不承擔(dān)業(yè)務(wù)代碼。調(diào)度系統(tǒng)與任務(wù)解耦,提高了系統(tǒng)可用性和穩(wěn)定性,同時(shí)調(diào)度系統(tǒng)性能不再受限于任務(wù)模塊;
調(diào)度中心支持可視化、簡單且動(dòng)態(tài)的管理調(diào)度信息,包括任務(wù)新建,更新,刪除,GLUE開發(fā)和任務(wù)報(bào)警等,所有上述操作都會(huì)實(shí)時(shí)生效,同時(shí)支持監(jiān)控調(diào)度結(jié)果以及執(zhí)行日志,支持執(zhí)行器Failover。
1.1.5. 執(zhí)行模塊(執(zhí)行器)
負(fù)責(zé)接收調(diào)度請求并執(zhí)行任務(wù)邏輯。任務(wù)模塊專注于任務(wù)的執(zhí)行等操作,開發(fā)和維護(hù)更加簡單和高效;
接收“調(diào)度中心”的執(zhí)行請求、終止請求和日志請求等。
從整體來看,xxl-job架構(gòu)依賴較少,功能強(qiáng)大,簡約而不簡單,方便部署,易于使用。
3. xxl-job原理分析
源碼部分
當(dāng)執(zhí)行器集群部署的時(shí)候,調(diào)度器需要為任務(wù)執(zhí)行選擇執(zhí)行器。所以,執(zhí)行器在啟動(dòng)的時(shí)候,必須先注冊到調(diào)度中心,保存在數(shù)據(jù)庫。
1.1. 執(zhí)行器啟動(dòng)與注冊
https://blog.csdn.net/oushitian/article/details/87938682
https://blog.csdn.net/RabbitInTheGrass/article/details/106918236
執(zhí)行器的注冊與發(fā)現(xiàn)有兩種方式:
1、一種是執(zhí)行器啟動(dòng)的時(shí)候,主動(dòng)到調(diào)度中心注冊,并定時(shí)發(fā)送心跳,保持續(xù)約。執(zhí)行器正常關(guān)閉時(shí),也主動(dòng)告知調(diào)度中心注銷掉。這種方式叫做主動(dòng)注冊。(調(diào)度中心類似于注冊中心eureka,nacos,rocketmq的namesrv
)
2、如果執(zhí)行器宕機(jī)或者網(wǎng)絡(luò)出問題了,調(diào)度中心就不知道執(zhí)行器的情況,如果把任務(wù)路由給一個(gè)不可用的執(zhí)行器執(zhí)行,就會(huì)導(dǎo)致任務(wù)執(zhí)行失敗。
所以,調(diào)度中心本身也需要不斷地對執(zhí)行器進(jìn)行探活。調(diào)度中心會(huì)啟動(dòng)一個(gè)專門的后臺線程,定時(shí)調(diào)用執(zhí)行器接口,如果發(fā)現(xiàn)異常就下線掉。
下面從執(zhí)行器的源碼去驗(yàn)證一下。
首先,一個(gè)Spring Boot的項(xiàng)目啟動(dòng)從哪里入手?
從配置類XxlJobConfig出發(fā),這里用到了配置的參數(shù)。
配置類定義了一個(gè)XxlJobSpringExecutor
,會(huì)在啟動(dòng)掃描配置類的時(shí)候創(chuàng)建執(zhí)行器。XxlJobSpringExecutor繼承了XxlJobExecutor
。
父類實(shí)現(xiàn)了SmartInitializingSingleton接口,在對象初始化的時(shí)候會(huì)調(diào)用afterSingletonsInstantiated()方法,這里面父類的start()方法。
這里面做了幾件事:
// 初始化日志路徑
XxlJobFileAppender.initLogPath(logPath);
// 創(chuàng)建調(diào)度器的客戶端
initAdminBizList(adminAddresses, accessToken);
// 初始化日志清理線程
JobLogFileCleanThread.getInstance().start(logRetentionDays);
// 初始化Trigger回調(diào)線程
TriggerCallbackThread.getInstance().start();
// 初始化執(zhí)行器服務(wù)器
initEmbedServer(address, ip, port, appname, accessToken);
initAdminBizList創(chuàng)建調(diào)度器客戶端,是執(zhí)行器用來連接調(diào)度器的。
Trigger回調(diào)線程用來處理任務(wù)執(zhí)行完畢后的回調(diào),為什么需要回調(diào)。
從initEmbedServer方法進(jìn)入執(zhí)行器的創(chuàng)建,到embedServer.start。叫做embedServer是因?yàn)镾pring Boot里面是用的內(nèi)置的Tomcat啟動(dòng)的。
embedServer = new EmbedServer();
embedServer.start(address, port, appname, accessToken);
在這個(gè)start方法里面,最后有一個(gè)thread.start(),也就是調(diào)用了線程的run方法。線程是上面new出來的。在run方法里面,創(chuàng)建了一個(gè)名字叫bizThreadPool 的ThreadPoolExecutor,也就是業(yè)務(wù)線程的線程池。
ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(
然后啟動(dòng)了一個(gè)Netty包的ServerBootstrap,然后啟動(dòng)服務(wù)器。
ServerBootstrap bootstrap = new ServerBootstrap();
在這里面要把執(zhí)行器注冊到調(diào)度中心。
startRegistry(appname, address);
到了ExecutorRegistryThread,在start方法里面最后啟動(dòng)了這個(gè)線程:
ExecutorRegistryThread.getInstance().start(appname, address);
registryThread.start();也就是執(zhí)行了這個(gè)創(chuàng)建的線程的run方法。
首先拿到調(diào)度器的列表,它有可能是集群部署的。
for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
然后挨個(gè)注冊上去,調(diào)用的是AdminBizClient的registry方法(這個(gè)類是core包里面的):
public ReturnT<String> registry(RegistryParam registryParam) {
return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, timeout, registryParam, String.class);
}
調(diào)用了HTTP的接口,實(shí)際地址是:
http://127.0.0.1:7391/xxl-job-admin/ api/registry
在舊的版本中用的是XXL-RPC,后來改成了Restful的API。
請求的是com.xxl.job.admin.controller.JobApiController的api方法,這里有一個(gè)分支:
if ("registry".equals(uri)) {
RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);
return adminBiz.registry(registryParam);
}
這個(gè)時(shí)候會(huì)調(diào)用到AdminBizImpl的registryUpdate方法:
//注冊中心也是類似這么設(shè)計(jì)的
int ret = xxlJobRegistryDao.registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
//小于1說明記錄不存在,插入記錄
xxlJobRegistryDao.registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
這個(gè)接口方法是沒有實(shí)現(xiàn)類的——其實(shí)就是MyBatis的Mapper,把執(zhí)行器保存到數(shù)據(jù)庫。
XxlJobRegistryMapper.xml
<update id="registryUpdate" >
UPDATE xxl_job_registry SET `update_time` = #{updateTime}
WHERE `registry_group` = #{registryGroup}
AND `registry_key` = #{registryKey}
AND `registry_value` = #{registryValue}
</update>
<insert id="registrySave" >
INSERT INTO xxl_job_registry( `registry_group` , `registry_key` , `registry_value`, `update_time`)
VALUES( #{registryGroup} , #{registryKey} , #{registryValue}, #{updateTime})
</insert>
后臺線程探活這一塊,在調(diào)度器的代碼中:
JobRegistryMonitorHelper.getInstance().start();
1.2. 調(diào)度器啟動(dòng)與任務(wù)執(zhí)行(調(diào)度中心如何啟動(dòng)的)
執(zhí)行器啟動(dòng)好以后,工人就準(zhǔn)備干活了,接下來就看一下指揮官上崗以后是怎么指揮工人的。實(shí)際上是先啟動(dòng)調(diào)度器再啟動(dòng)執(zhí)行器,但是因?yàn)檎{(diào)度的流程涉及到執(zhí)行器,所以先分析執(zhí)行器。
下面看看調(diào)度器是如何啟動(dòng)的,任務(wù)是如何得到執(zhí)行的。
1.1.1. 從配置類入手
Spring Boot的工程,一樣從配置類XxlJobAdminConfig入手。它實(shí)現(xiàn)了InitializingBean接口,會(huì)在初始化的時(shí)候調(diào)用afterPropertiesSet方法:
public void afterPropertiesSet() throws Exception {
adminConfig = this;
xxlJobScheduler = new XxlJobScheduler();
xxlJobScheduler.init();
}
init方法里面做了幾件事情:
// 任務(wù)注冊監(jiān)控器
JobRegistryMonitorHelper.getInstance().start();
// 任務(wù)調(diào)度失敗的監(jiān)控器,失敗重試,失敗郵件發(fā)送
JobFailMonitorHelper.getInstance().start();
// 任務(wù)結(jié)果丟失處理
JobLosedMonitorHelper.getInstance().start();
// trigger pool啟動(dòng)
JobTriggerPoolHelper.toStart();
// log report啟動(dòng)
JobLogReportHelper.getInstance().start();
// start-schedule
JobScheduleHelper.getInstance().start();
JobRegistryMonitorHelper做的事情是不停地更新注冊表,把超時(shí)的執(zhí)行器剔除。每隔30秒執(zhí)行一次:
TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
JobTriggerPoolHelper創(chuàng)建了兩個(gè)線程池,一個(gè)快的線程池,一個(gè)慢的線程池,作用后面就會(huì)講到。
這里主要關(guān)注的是調(diào)度器(指揮官)如何啟動(dòng)的,進(jìn)入JobScheduleHelper的start方法,這段方法總體上看起來是這樣的:
public void start(){
// schedule thread
scheduleThread = new Thread(…...);
scheduleThread.setDaemon(true);
scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");
scheduleThread.start();
// ring thread
ringThread = new Thread(…...);
ringThread.setDaemon(true);
ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");
ringThread.start();
}
也就是創(chuàng)建并且啟動(dòng)了兩個(gè)后臺線程,一個(gè)是調(diào)度線程,一個(gè)是時(shí)間輪線程。先從第一個(gè)線程開始說起。
1.1.2. 調(diào)度器線程
這里創(chuàng)建了一個(gè)scheduleThread線程,后面調(diào)用了start方法,也就是會(huì)進(jìn)入run方法。
scheduleThread的run方法中,先隨機(jī)睡眠4-5秒,為什么?為了防止執(zhí)行器集中啟動(dòng)出現(xiàn)過多的資源競爭。
TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );
然后計(jì)算預(yù)讀取的任務(wù)數(shù),這里默認(rèn)是6000個(gè)。
int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;
后面是一個(gè)while循環(huán),也就是調(diào)度器重復(fù)不斷地在做的事情。
1.1.3. 獲取任務(wù)鎖
第一步是獲取數(shù)據(jù)庫的排他鎖,因?yàn)樗械墓?jié)點(diǎn)連接到的數(shù)據(jù)庫是同一個(gè)實(shí)例,所以這里是一個(gè)分布式環(huán)境的鎖。也就是后面的過程是互斥的,如果有多個(gè)調(diào)度器的服務(wù),同一時(shí)間只能有一個(gè)調(diào)度器在獲取任務(wù)信息:
preparedStatement = conn.prepareStatement( "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
獲取的是job_lock表的lock_name=schedule_lock這一行數(shù)據(jù)的行鎖。
如果加鎖沒有成功,說明其他調(diào)度中心在加載任務(wù)了,只能等其他節(jié)點(diǎn)提交事務(wù)或者回滾事務(wù),釋放鎖以后才能獲取鎖。
獲取鎖成功后查詢?nèi)蝿?wù):
<select id="scheduleJobQuery" parameterType="java.util.HashMap" resultMap="XxlJobInfo">
SELECT <include refid="Base_Column_List" />
FROM xxl_job_info AS t
WHERE t.trigger_status = 1
and t.trigger_next_time <![CDATA[ <= ]]> #{maxNextTime}
ORDER BY id ASC
LIMIT #{pagesize}
</select>
這個(gè)SQL的含義:從任務(wù)表查詢狀態(tài)是1,并且下次觸發(fā)時(shí)間小于{maxNextTime}的任務(wù);{maxNextTime}=nowTime(當(dāng)前時(shí)間) + PRE_READ_MS(5秒),也就是說查詢5秒鐘之內(nèi)需要觸發(fā)的任務(wù):
1.1.4. 調(diào)度任務(wù)
這里根據(jù)任務(wù)的觸發(fā)時(shí)間分成了三種情況。
這里假設(shè)任務(wù)的下次觸發(fā)時(shí)間(TriggerNextTime)是9點(diǎn)0分30秒,2秒鐘觸發(fā)一次。
第一種情況就是當(dāng)前時(shí)間已經(jīng)是9點(diǎn)0分35秒以后了。
如果nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS,也就是觸發(fā)時(shí)間已經(jīng)過期5秒以上,那就不能調(diào)度了(misfire了),讓它到下次觸發(fā)的時(shí)間再跑,這里只需要更新下次觸發(fā)時(shí)間。
什么時(shí)候會(huì)超時(shí)?比如你的查詢非常慢,或者你查詢到等待觸發(fā)的任務(wù)以后,debug停在上面很久才走到時(shí)間判斷。
第二種情況(正常情況):nowTime > jobInfo.getTriggerNextTime(),已經(jīng)過了觸發(fā)時(shí)間,但是沒有超過5秒,時(shí)間是9點(diǎn)0分30秒到9點(diǎn)0分35秒之間。
這里要做的事情有四步:
1、觸發(fā)任務(wù)
2、更新下次觸發(fā)時(shí)間
一次觸發(fā)完成之后,來一次預(yù)讀,看看再下次的觸發(fā)時(shí)間是不是滿足:
nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()
下次觸發(fā)時(shí)間是9點(diǎn)0分32秒,現(xiàn)在時(shí)間大于9點(diǎn)0分27秒(距離下次觸發(fā)時(shí)間不足5秒了):
3、丟入時(shí)間輪
4、觸發(fā)完了再把時(shí)間更新為下次更新時(shí)間
這里重點(diǎn)有兩個(gè),觸發(fā)的時(shí)候做了什么。丟入時(shí)間輪做了什么。
第三種情況:還沒到9點(diǎn)0分30秒。
1、丟入時(shí)間輪
2、刷新一下下次觸發(fā)時(shí)間,因?yàn)檫€沒觸發(fā),實(shí)際上時(shí)間沒變
所以,這里要重點(diǎn)關(guān)注一下,任務(wù)觸發(fā)的時(shí)候,是怎么觸發(fā)的。丟入時(shí)間輪,又是一個(gè)什么操作。
1.1.5. 任務(wù)觸發(fā)
從JobTriggerPoolHelper的trigger方法進(jìn)入,又到了JobTriggerPoolHelper的addTrigger方法。
這里設(shè)計(jì)了兩個(gè)線程池:fastTriggerPool和slowTriggerPool,如果1分鐘內(nèi)過期了10次,就使用慢的線程池。
ThreadPoolExecutor triggerPool_ = fastTriggerPool;
AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) { // job-timeout 10 times in 1 min
triggerPool_ = slowTriggerPool;
}
這里相當(dāng)于做了一個(gè)線程池隔離,即使有很多慢的任務(wù),也只能把慢任務(wù)的線程池耗光(秒?。。?。
什么樣的任務(wù)會(huì)使用慢的線程池來執(zhí)行呢?JobTriggerPoolHelper中addTrigger的末尾:如果這次執(zhí)行超過了500ms,就給它標(biāo)記一下,超過10次,它就要被丟到下等艙了。
if (cost > 500) { // ob-timeout threshold 500ms
AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));
if (timeoutCount != null) {
timeoutCount.incrementAndGet();
}
}
線程池選擇好以后,execute一下,也就是分配線程來執(zhí)行觸發(fā)任務(wù)。
進(jìn)入XxlJobTrigger的trigger方法:
XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);
在XxlJobTrigger的trigger方法中,先拿到任務(wù)信息,如果方法參數(shù)failRetryCount>0,就用參數(shù)值,否則用Job定義的failRetryCount。這里傳進(jìn)來的是-1。
XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);
拿到失敗重試次數(shù)和組別:
int finalFailRetryCount = failRetryCount>=0?failRetryCount:jobInfo.getExecutorFailRetryCount();
XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());
先不考慮廣播分片的情況,分片的原理后面再分析。直接走到末尾的else,processTrigger:
processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);
前面是獲取一些參數(shù),然后記錄日志,初始化Trigger參數(shù)。
然后獲取路由,把結(jié)果放入routeAddressResult。如果是廣播分片,所有的節(jié)點(diǎn)都要參與負(fù)載,否則要根據(jù)策略獲取執(zhí)行器地址。
不同的路由策略,獲取路由的方式也不一樣,這里是典型的策略模式:
routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
回顧一下:
路由參數(shù) | 翻譯 | 詳細(xì)含義 |
---|---|---|
FIRST | 第一個(gè) | 固定選擇第一個(gè)機(jī)器 |
LAST | 最后一個(gè) | 固定選擇最后一個(gè)機(jī)器 |
ROUND | 輪詢 | 依次選擇執(zhí)行 |
RANDOM | 隨機(jī) | 隨機(jī)選擇在線的機(jī)器 |
CONSISTENT_HASH | 一致性HASH | 每個(gè)任務(wù)按照Hash算法固定選擇某一臺機(jī)器,且所有任務(wù)均勻散列在不同機(jī)器上 |
LEAST_FREQUENTLY_USED | LRU最不經(jīng)常使用 | 使用頻率最低的機(jī)器優(yōu)先被選舉 |
LEAST_RECENTLY_USED | LRU最近最久未使用 | 最久未使用的機(jī)器優(yōu)先被選舉 |
FAILOVER | 故障轉(zhuǎn)移 | 按照順序依次進(jìn)行心跳檢測,第一個(gè)心跳檢測成功的機(jī)器選定為目標(biāo)執(zhí)行器并發(fā)起調(diào)度 |
BUSYOVER | 忙碌轉(zhuǎn)移 | 按照順序依次進(jìn)行空閑檢測,第一個(gè)空閑檢測成功的機(jī)器選定為目標(biāo)執(zhí)行器并發(fā)起調(diào)度 |
SHARDING_BROADCAST | 分片廣播 |
廣播觸發(fā)對應(yīng)集群中所有機(jī)器執(zhí)行一次任務(wù),同時(shí)系統(tǒng)自動(dòng)傳遞分片參數(shù);可根據(jù)分片參數(shù)開發(fā)分片任務(wù) |
如果沒有啟動(dòng)執(zhí)行器,那就拿不到執(zhí)行器地址。
拿到執(zhí)行器地址以后,runExecutor觸發(fā)遠(yuǎn)程的執(zhí)行器:
triggerResult = runExecutor(triggerParam, address);
這里調(diào)用的是ExecutorBizClient的run方法:
public ReturnT<String> run(TriggerParam triggerParam) {
return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class);
}
這里就調(diào)用了執(zhí)行器的遠(yuǎn)程接口(http://192.168.44.1:9999/run),執(zhí)行器接收到調(diào)用請求怎么處理后面再說。
參數(shù)內(nèi)容:
TriggerParam{jobId=2, executorHandler='', executorParams='', executorBlockStrategy='SERIAL_EXECUTION', executorTimeout=0, logId=10337, logDateTime=1601783382002, glueType='GLUE_GROOVY', glueSource='package com.xxl.job.service.handler;
import com.xxl.job.core.log.XxlJobLogger;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.IJobHandler;
public class DemoGlueJobHandler extends IJobHandler {
@Override
public ReturnT<String> execute(String param) throws Exception {
XxlJobLogger.log("qingshan job, Hello World.");
return ReturnT.SUCCESS;
}
}
', glueUpdatetime=1601359733000, broadcastIndex=0, broadcastTotal=1}
回顧一下,這里說的是第二種情況:
一共四步,第一步結(jié)束了:
1、觸發(fā)任務(wù)
2、更新下次觸發(fā)時(shí)間
3、丟入時(shí)間輪
4、觸發(fā)完了再把時(shí)間更新為下次更新時(shí)間
第二步和第四步非常簡單,都是操作數(shù)據(jù)庫。
第三步,丟入時(shí)間輪,什么是時(shí)間輪?為什么要丟入時(shí)間輪?
1.1.6. 時(shí)間輪
要回答這個(gè)問題,先從Java中最原始的任務(wù)調(diào)度的方法說起。
給你一批任務(wù)(假設(shè)有1000個(gè)任務(wù)),都是不同的時(shí)間執(zhí)行的,時(shí)間精確到秒,你怎么實(shí)現(xiàn)對所有的任務(wù)的調(diào)度?
第一種思路是啟動(dòng)一個(gè)線程,每秒鐘對所有的任務(wù)進(jìn)行遍歷,找出執(zhí)行時(shí)間跟當(dāng)前時(shí)間匹配的,執(zhí)行它。如果任務(wù)數(shù)量太大,遍歷和比較所有任務(wù)會(huì)比較浪費(fèi)時(shí)間。
第二個(gè)思路,把這些任務(wù)進(jìn)行排序,執(zhí)行時(shí)間近(先觸發(fā))的放在前面。
用Java代碼怎么實(shí)現(xiàn)呢?
JDK包里面自帶了一個(gè)Timer工具類(java.util包下),可以實(shí)現(xiàn)延時(shí)任務(wù)(例如30分鐘以后觸發(fā)),也可以實(shí)現(xiàn)周期性任務(wù)(例如每1小時(shí)觸發(fā)一次)。
它的本質(zhì)是一個(gè)優(yōu)先隊(duì)列(TaskQueue),和一個(gè)執(zhí)行任務(wù)的線程(TimerThread)。
public class Timer {
private final TaskQueue queue = new TaskQueue();
private final TimerThread thread = new TimerThread(queue);
public Timer(String name, boolean isDaemon) {
thread.setName(name);
thread.setDaemon(isDaemon);
thread.start();
}
}
在這個(gè)優(yōu)先隊(duì)列中,最先需要執(zhí)行的任務(wù)排在優(yōu)先隊(duì)列的第一個(gè)。然后 TimerThread 不斷地拿第一個(gè)任務(wù)的執(zhí)行時(shí)間和當(dāng)前時(shí)間做對比。如果時(shí)間到了先看看這個(gè)任務(wù)是不是周期性執(zhí)行的任務(wù),如果是則修改當(dāng)前任務(wù)時(shí)間為下次執(zhí)行的時(shí)間,如果不是周期性任務(wù)則將任務(wù)從優(yōu)先隊(duì)列中移除。最后執(zhí)行任務(wù)。
但是Timer是單線程的,在很多場景下不能滿足業(yè)務(wù)需求。
在JDK1.5之后,引入了一個(gè)支持多線程的任務(wù)調(diào)度工具ScheduledThreadPoolExecutor用來替代TImer,它是幾種常用的線程池之一??纯礃?gòu)造函數(shù),里面是一個(gè)延遲隊(duì)列DelayedWorkQueue,也是一個(gè)優(yōu)先隊(duì)列。
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
優(yōu)先隊(duì)列的插入和刪除的時(shí)間復(fù)雜度是O(logn),當(dāng)數(shù)據(jù)量大的時(shí)候,頻繁的入堆出堆性能不是很好。
這里先考慮對所有的任務(wù)進(jìn)行分組,把相同執(zhí)行時(shí)刻的任務(wù)放在一起。比如這里,數(shù)組里面的一個(gè)下標(biāo)就代表1秒鐘。它就會(huì)變成一個(gè)數(shù)組加鏈表的數(shù)據(jù)結(jié)構(gòu)。分組以后遍歷和比較的時(shí)間會(huì)減少一些。
但是還是有問題,如果任務(wù)數(shù)量非常大,而且時(shí)間都不一樣,或者有執(zhí)行時(shí)間非常遙遠(yuǎn)的任務(wù),那這個(gè)數(shù)組長度是不是要非常地長?比如有個(gè)任務(wù)2個(gè)月之后執(zhí)行,從現(xiàn)在開始計(jì)算,它的下標(biāo)是5253120。
所以長度肯定不能是無限的,只能是固定長度的。比如固定長度是60,一個(gè)格子代表1秒(現(xiàn)在叫做一個(gè)bucket槽),一圈可以表示60秒。遍歷的線程只要一個(gè)格子一個(gè)格子的獲取任務(wù),并且執(zhí)行就OK了。
固定長度的數(shù)組怎么用來表示超出最大長度的時(shí)間呢?可以用循環(huán)數(shù)組。
比如一個(gè)循環(huán)數(shù)組長度60,可以表示60秒。60秒以后執(zhí)行的任務(wù)怎么放進(jìn)去?只要除以60,用得到的余數(shù),放到對應(yīng)的格子就OK了。比如90%60=30,它放在第30個(gè)格子。這里就有了輪次的概念,第90秒的任務(wù)是第二輪的時(shí)候才執(zhí)行。
這時(shí)候,時(shí)間輪的概念已經(jīng)出來了。
如果任務(wù)數(shù)量太多,相同時(shí)刻執(zhí)行的任務(wù)很多,會(huì)導(dǎo)致鏈表變得非常長。這里可以進(jìn)一步對這個(gè)時(shí)間輪做一個(gè)改造,做一個(gè)多層的時(shí)間輪。
比如:最內(nèi)層60個(gè)格子,每個(gè)格子1秒;外層60個(gè)格子,每個(gè)格子1分;再外層24個(gè)格子,每個(gè)格子1小時(shí)。最內(nèi)層走一圈,外層走一格。這時(shí)候時(shí)間輪就跟時(shí)鐘更像了。隨著時(shí)間流動(dòng),任務(wù)會(huì)降級,外層的任務(wù)會(huì)慢慢地向內(nèi)層移動(dòng)。
時(shí)間輪任務(wù)插入和刪除時(shí)間復(fù)雜度都為O(1),應(yīng)用范圍非常廣泛,更適合任務(wù)數(shù)很大的延時(shí)場景。Dubbo、Netty、Kafka中都有實(shí)現(xiàn)。
xxl-job中的時(shí)間輪是怎么實(shí)現(xiàn)的?回到JobScheduleHelper的start方法:
放入時(shí)間輪有這么兩步:
int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
pushTimeRing(ringSecond, jobInfo.getId());
ringSecond是0-59的秒數(shù)值(millionSeconds是毫秒數(shù))。
把它想象成一個(gè)表盤的秒針指數(shù)。放入時(shí)間輪的這一段代碼:
// push async ring
List<Integer> ringItemData = ringData.get(ringSecond);
if (ringItemData == null) {
ringItemData = new ArrayList<Integer>();
ringData.put(ringSecond, ringItemData);
}
ringItemData.add(jobId);
這個(gè)ringData是一個(gè)ConcurrentHashMap,key是Integer,放的是ringSecond(0-59)。Value是List<Integer>,里面放的是jobId。
到這里為止,JobScheduleHelper的start方法的前一半就分析完了。接下來是ringThread線程,看看時(shí)間輪的任務(wù)是怎么拿出來執(zhí)行的。
1.1.7. 時(shí)間輪線程ringThread
在初始化的時(shí)候先對齊秒數(shù):休眠當(dāng)前秒數(shù)模以1000的余數(shù),意思是下一個(gè)正秒運(yùn)行。
TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000 );
然后進(jìn)入一個(gè)while循環(huán)。獲取當(dāng)前秒數(shù):
// 避免處理耗時(shí)太長,跨過刻度,向前校驗(yàn)一個(gè)刻度;
int nowSecond = Calendar.getInstance().get(Calendar.SECOND);
注釋:根據(jù)當(dāng)前秒數(shù)刻度和前一個(gè)刻度進(jìn)行時(shí)間輪的任務(wù)獲取
for (int i = 0; i < 2; i++) {
List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
if (tmpData != null) {
ringItemData.addAll(tmpData);
}
}
(nowSecond+60-k)%60跟nowSecond-k的結(jié)果一模一樣,也就是當(dāng)前秒數(shù),和前一秒。比如當(dāng)前秒數(shù)是40,就獲取40和39的任務(wù)。從ringData里面拿出來,放進(jìn)ringItemData,這里面存的是這兩秒需要觸發(fā)的所有任務(wù)的jobId。
接下來就是觸發(fā)任務(wù)了。
JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
又調(diào)用了JobTriggerPoolHelper的addTrigger。
在XxlJobTrigger的trigger方法中,調(diào)用了processTrigger,又調(diào)用了runExecutor
runResult = executorBiz.run(triggerParam);
這里實(shí)現(xiàn)類是ExecutorBizClient,發(fā)起了一個(gè)HTTP的請求。
public ReturnT<String> run(TriggerParam triggerParam) {
return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class);
}
最終的URL地址是執(zhí)行器的9999端口:http://192.168.44.1:9999/run
跟上面一樣。也就是說,放進(jìn)時(shí)間輪等待觸發(fā)的任務(wù),也會(huì)通過遠(yuǎn)程請求,讓執(zhí)行器執(zhí)行任務(wù)。
1.1.8. 執(zhí)行器處理遠(yuǎn)程調(diào)用,回調(diào)
在業(yè)務(wù)實(shí)例這邊,執(zhí)行器啟動(dòng)9999端口監(jiān)聽的時(shí)候,在EmbedHttpServerHandler的channelRead0方法中,會(huì)創(chuàng)建線程池bizThreadPool,process方法處理URI的訪問。
if ("/run".equals(uri)) {
TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
return executorBiz.run(triggerParam);
}
這個(gè)時(shí)候調(diào)用的是core包的ExecutorBizImpl的run方法。
第一步,先拿到任務(wù)的JobThread(表示有沒有線程正在執(zhí)行這個(gè)JobId的任務(wù)):
JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
如果有線程,再拿到j(luò)obHandler。什么是jobHandler?
在SpringBoot的工程里面,jobHandler就是加了@XxlJob注解的任務(wù)方法(一個(gè)任務(wù)一個(gè)方法)。其他的四個(gè)框架中的使用,需要自己編寫Handler(任務(wù)類)繼承IJobHandler。這個(gè)IjobHandler接口意思跟Quartz里面的Job接口是一樣的,這里面必須要覆蓋父類的execute方法。
中間是對于jobThread和jobHandler的判斷。對于bean、GROOVY、其他腳本類型的任務(wù),處理不一樣?;驹瓌t就是必須要有一個(gè)Handler,而且跟之前的Handler必須相同。
如果當(dāng)前任務(wù)正在運(yùn)行(根據(jù)JobId能夠找到JobThread),需要根據(jù)配置的策略采取不同的措施,比如:
1、DISCARD_LATER(丟棄后續(xù)調(diào)度):如果當(dāng)前線程阻塞,后續(xù)任務(wù)不再執(zhí)行,直接返回失敗(阻塞就不再執(zhí)行了)。
if (jobThread.isRunningOrHasQueue()) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
}
2、COVER_EARLY(覆蓋之前調(diào)度):創(chuàng)建一個(gè)移除原因,新建一個(gè)線程去執(zhí)行后續(xù)任務(wù)(殺掉當(dāng)前線程)。
if (jobThread.isRunningOrHasQueue()) {
removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();
jobThread = null;
}
3、SERIAL_EXECUTION(單機(jī)串行,默認(rèn)):對當(dāng)前線程不做任何處理,并在當(dāng)前線程的隊(duì)列里增加一個(gè)執(zhí)行任務(wù)(一次只執(zhí)行一個(gè)任務(wù))。
最后,調(diào)用JobThread的pushTriggerQueue方法把Trigger放入隊(duì)列。
ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
這個(gè)隊(duì)列是什么?TriggerQueue是什么?LinkedBlockingQueue。
執(zhí)行器中單個(gè)任務(wù)處理線程一次只能執(zhí)行一個(gè)任務(wù)。
JobThread在創(chuàng)建的時(shí)候就會(huì)啟動(dòng),啟動(dòng)就會(huì)進(jìn)入run方法的死循環(huán),不斷地從隊(duì)列里面拿任務(wù):
triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);
最終調(diào)用到Handler(任務(wù)類)的execute方法:
return handler.execute(triggerParamTmp.getExecutorParams());
在finally方法中調(diào)用了回調(diào)方法,告知調(diào)度器執(zhí)行結(jié)果:
if(triggerParam != null) {
// callback handler info
if (!toStop) {
// commonm
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), executeResult));
} else {
// is killed
ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job running, killed]");
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), stopResult));
}
}
放入一個(gè)隊(duì)列。在TriggerCallbackThread的后臺線程的run方法里面,調(diào)用doCallback方法,連接到調(diào)度器,寫入調(diào)度結(jié)果。
總結(jié)一下整體調(diào)度流程:
1、調(diào)度中心獲取任務(wù)鎖,查詢?nèi)蝿?wù),根據(jù)情況觸發(fā)或者放入時(shí)間輪
2、觸發(fā)任務(wù)需要先獲取路由地址,然后調(diào)用執(zhí)行器接口
3、執(zhí)行器接收到調(diào)用請求,通過JobThread執(zhí)行任務(wù),并且回調(diào)(callback)調(diào)度器的接口
1.3. 任務(wù)分片原理
XxlJobTrigger的trigger方法:
int[] shardingParam = null;
if (executorShardingParam!=null){
String[] shardingArr = executorShardingParam.split("/");
if (shardingArr.length==2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1])) {
shardingParam = new int[2];
shardingParam[0] = Integer.valueOf(shardingArr[0]);
shardingParam[1] = Integer.valueOf(shardingArr[1]);
}
}
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)
&& group.getRegistryList()!=null && !group.getRegistryList().isEmpty()
&& shardingParam==null) {
for (int i = 0; i < group.getRegistryList().size(); i++) {
processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());
}
}
然后拿到分片參數(shù):sharding param。這個(gè)sharding param大家還記得是什么么?
最好是設(shè)計(jì)一個(gè)跟業(yè)務(wù)無關(guān)的分片字段,加上索引用它來獲取數(shù)據(jù)的分片信息。
用/來分割。
如果只有兩個(gè),并且都是數(shù)字,把他們轉(zhuǎn)換為整形。
如果是廣播任務(wù),則在所有節(jié)點(diǎn)上processTrigger。
1.4. 手動(dòng)觸發(fā)一次任務(wù)
com.xxl.job.admin.controller.JobInfoController
@RequestMapping("/trigger")
@ResponseBody
//@PermissionLimit(limit = false)
public ReturnT<String> triggerJob(int id, String executorParam, String addressList) {
// force cover job param
if (executorParam == null) {
executorParam = "";
}
JobTriggerPoolHelper.trigger(id, TriggerTypeEnum.MANUAL, -1, null, executorParam, addressList);
return ReturnT.SUCCESS;
}
之后就跟調(diào)度器自動(dòng)觸發(fā)任務(wù)的流程一樣了。
1.5. 任務(wù)H A & Failover
https://blog.csdn.net/Royal_lr/article/details/100113760
這里要解決兩個(gè)關(guān)鍵的問題。第一個(gè),調(diào)度器怎么知道任務(wù)執(zhí)行失敗了。第二個(gè),執(zhí)行失敗以后,怎么處理。
XxlJobTrigger的processTrigger方法:
routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
文章來源:http://www.zghlxwxcb.cn/news/detail-838169.html
4. xxl-job二次開發(fā)
https://blog.csdn.net/m0_37527542/article/details/104468785文章來源地址http://www.zghlxwxcb.cn/news/detail-838169.html
到了這里,關(guān)于分布式定時(shí)任務(wù)調(diào)度xxl-job的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!