国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

分布式定時(shí)任務(wù)調(diào)度xxl-job

這篇具有很好參考價(jià)值的文章主要介紹了分布式定時(shí)任務(wù)調(diào)度xxl-job。希望對大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

1. xxl-job基本介紹

1.1 Quartz的體系結(jié)構(gòu)

Quartz中最重要的三個(gè)對象:Job(作業(yè))、Trigger(觸發(fā)器)、Scheduler(調(diào)度器)。

分布式定時(shí)任務(wù)調(diào)度xxl-job,學(xué)習(xí)筆記,分布式

xxl-job的調(diào)度原理:調(diào)度線程在一個(gè)while循環(huán)中不斷地獲取一定數(shù)量的即將觸發(fā)的Trigger,拿到綁定的Job,包裝成工作線程執(zhí)行。

當(dāng)然,不管在任何調(diào)度系統(tǒng)中,底層都是線程模型。如果要自己寫一個(gè)調(diào)度系統(tǒng),一定要對多線程并發(fā)這一塊有比較深入的學(xué)習(xí),比如線程怎么啟動(dòng)怎么wait,怎么notify ,怎么加鎖等等。

1.1. Quartz的不足

Quartz有差不多二十年的歷史,調(diào)度模型已經(jīng)非常成熟了,而且很容易集成到Spring中去,用來執(zhí)行業(yè)務(wù)任務(wù)是一個(gè)很好的選擇。

但是還是會(huì)有一些問題,比如:

1、調(diào)度邏輯(Scheduler)和任務(wù)類耦合在同一個(gè)項(xiàng)目中,隨著調(diào)度任務(wù)數(shù)量逐漸增多,同時(shí)調(diào)度任務(wù)邏輯逐漸加重,調(diào)度系統(tǒng)的整體性能會(huì)受到很大的影響;

2、Quartz集群的節(jié)點(diǎn)之間負(fù)載結(jié)果是隨機(jī)的,誰搶到了數(shù)據(jù)庫鎖就由誰去執(zhí)行任務(wù),這就有可能出現(xiàn)旱的旱死,澇的澇死的情況,發(fā)揮不了機(jī)器的性能。

3、Quartz本身沒有提供動(dòng)態(tài)調(diào)度和管理界面的功能,需要自己根據(jù)API進(jìn)行開發(fā)。

4、Quartz的日志記錄、數(shù)據(jù)統(tǒng)計(jì)、監(jiān)控不是特別完善。

所以xxl-job和Elastic-Job都是對Quartz進(jìn)行了封裝,讓用起來更簡單,功能更強(qiáng)大。

1.2. xxl-job發(fā)展歷史

源碼地址:https://github.com/xuxueli/xxl-job

中文文檔:https://www.xuxueli.com/xxl-job/

2015年開源,一個(gè)大眾點(diǎn)評的程序員的業(yè)余之作。眾所周知,大眾點(diǎn)評因?yàn)楸幻缊F(tuán)收購了,現(xiàn)在是美團(tuán)點(diǎn)評。

xxl是作者名字許雪里的首字母簡寫,除了xxl-job之外作者還開源了很多其他組件,現(xiàn)在一共有11個(gè)開源項(xiàng)目。

到目前為止使用xxl-job的公司有幾百家,算上那些沒有登記的公司,實(shí)際上應(yīng)該有幾千家。

在xxl-job早期的版本中,直接使用了Quartz的調(diào)度模型,直到2019年7月7日發(fā)布的7.27 版本才移除Quartz依賴。

實(shí)際上即使重構(gòu)代碼移除了Quartz的依賴,xxl-job中也到處是Quartz的影子。比如任務(wù)、調(diào)度器、觸發(fā)器的三個(gè)維度設(shè)計(jì),是非常經(jīng)典的。

最新發(fā)布版本是:2.3.1。

但是后面自從2.2.0版本開始,版本更新幾乎沒有什么變化

1.3. xxl-job特性

跟老牌的Quartz相比,xxl-job擁有更加豐富的功能。

總體上可以分成三類:

性能的提升:可以調(diào)度更多的任務(wù)。

可靠性的提升:任務(wù)超時(shí)、失敗、故障轉(zhuǎn)移的處理。

運(yùn)維更加便捷:提供操作界面、有用戶權(quán)限、詳細(xì)的日志、提供通知配置、自動(dòng)生成報(bào)表等等。

2. Xxl-job快速入門

1.1. 下載源碼

1.1.1. release頁面下載

https://github.com/xuxueli/xxl-job/releases

https://gitee.com/xuxueli0323/xxl-job

注意不要直接clone最新的master代碼(SNAPSHOT版本),從發(fā)布界面下載穩(wěn)定版本。

1.1.2. IDEA中打開

分布式定時(shí)任務(wù)調(diào)度xxl-job,學(xué)習(xí)筆記,分布式

  • /doc :文檔資料,包括“調(diào)度數(shù)據(jù)庫”建表腳本
  • /xxl-job-admin :調(diào)度中心,項(xiàng)目源碼,Spring Boot工程,可以直接啟動(dòng)
  • /xxl-job-core :公共Jar依賴
  • /xxl-job-executor-samples :執(zhí)行器,Sample示例項(xiàng)目,其中的Spring Boot工程,可以直接啟動(dòng)??梢栽谠擁?xiàng)目上進(jìn)行開發(fā),也可以將現(xiàn)有項(xiàng)目改造生成執(zhí)行器項(xiàng)目。

1.2. 初始化數(shù)據(jù)庫

數(shù)據(jù)庫腳本在doc/db目錄下:
分布式定時(shí)任務(wù)調(diào)度xxl-job,學(xué)習(xí)筆記,分布式

生成8張表。

表名 作用
xxl_job_group 執(zhí)行器信息表,維護(hù)任務(wù)執(zhí)行器信息
xxl_job_info 調(diào)度擴(kuò)展信息表:用于保存XXL-JOB調(diào)度任務(wù)的擴(kuò)展信息,如任務(wù)分組、任務(wù)名、機(jī)器地址、執(zhí)行器、執(zhí)行入?yún)⒑蛨?bào)警郵件等等
xxl_job_lock 任務(wù)調(diào)度鎖表
xxl_job_log 調(diào)度日志表:用于保存XXL-JOB任務(wù)調(diào)度的歷史信息,如調(diào)度結(jié)果、執(zhí)行結(jié)果、調(diào)度入?yún)?、調(diào)度機(jī)器和執(zhí)行器等等
xxl_job_log_report 調(diào)度日志報(bào)表:用戶存儲(chǔ)XXL-JOB任務(wù)調(diào)度日志的報(bào)表,調(diào)度中心報(bào)表功能頁面會(huì)用到
xxl_job_logglue 任務(wù)GLUE日志:用于保存GLUE更新歷史,用于支持GLUE的版本回溯功能
xxl_job_registry 執(zhí)行器注冊表,維護(hù)在線的執(zhí)行器和調(diào)度中心機(jī)器地址信息
xxl_job_user 系統(tǒng)用戶表

表初始化好以后,就可以配置代碼工程了。這里先說一下總體概念。

xxl-job的調(diào)度器和業(yè)務(wù)執(zhí)行是獨(dú)立的。調(diào)度器(調(diào)度線程)決定任務(wù)的調(diào)度,并且通過HTTP的方式調(diào)用執(zhí)行器接口執(zhí)行任務(wù)。

所以在這里需要先配置至少一個(gè)調(diào)度中心,運(yùn)行起來,也可以集群部署。然后再配置至少一個(gè)執(zhí)行器,運(yùn)行起來,同樣可以集群部署。

1.3. 配置調(diào)度中心

調(diào)度中心是任務(wù)的指揮中心,可以有多個(gè)實(shí)例。

1.1.1. 修改配置

/xxl-job/xxl-job-admin/src/main/resources/application.properties

檢查各項(xiàng)配置,主要是端口,數(shù)據(jù)庫。

默認(rèn)用戶名admin,密碼 123456。

可以顯式加上一行配置:

xxl.job.login.username=admin
xxl.job.login.password=123456

1.1.2. 編譯打包

如果通過jar包方式部署運(yùn)行,需要先編譯打包。在xxl-job-admin目錄下執(zhí)行命令:

mvn package -Dmaven.test.skip=true

1.1.3. 啟動(dòng)工程

運(yùn)行調(diào)度中心,xxl-job\xxl-job-admin\target目錄下

java -jar xxl-job-admin-2.2.1-SNAPSHOT.jar

或者直接運(yùn)行Spring Boot根目錄下的XxlJobAdminApplication啟動(dòng)類。調(diào)度器是帶界面的,訪問:http://127.0.0.1:8080/xxl-job-admin

主要功能:

分布式定時(shí)任務(wù)調(diào)度xxl-job,學(xué)習(xí)筆記,分布式

為了保證可用性,調(diào)度中心可以做集群部署,需要滿足幾個(gè)條件:

· DB配置保持一致;

· 集群機(jī)器時(shí)鐘保持一致(單機(jī)集群忽視);

· 建議:推薦通過nginx為調(diào)度中心集群做負(fù)載均衡,分配域名。調(diào)度中心訪問、執(zhí)行器回調(diào)配置、調(diào)用API服務(wù)等操作均通過該域名進(jìn)行。

1.4. 創(chuàng)建執(zhí)行器

執(zhí)行器負(fù)責(zé)任務(wù)的具體執(zhí)行,分配線程。執(zhí)行器需要注冊到調(diào)度中心,這樣調(diào)度器才知道怎么選擇執(zhí)行器,或者說做路由。執(zhí)行器的執(zhí)行結(jié)果,也需要通過回調(diào)的方式通知調(diào)度器。

xxl-job提供了6個(gè)執(zhí)行器的demo,非常地貼心,這里選用Spring Boot。

分布式定時(shí)任務(wù)調(diào)度xxl-job,學(xué)習(xí)筆記,分布式

1.1.1. 修改配置

/xxl-job/xxl-job-executor-samples/xxl-job-executor-sample-springboot/src/main/resources/application.properties

主要是修改了日志目錄,還有admin的端口號。

xxl.job.admin.addresses=http://127.0.0.1:7391/xxl-job-admin
xxl.job.executor.logpath=E:/dev_logs/xxl-job/jobhandler

配置類會(huì)在com.xxl.job.executor.core.config.XxlJobConfig用到。

1.1.1. 編譯打包

如果通過jar包方式部署運(yùn)行,需要先編譯打包。

在xxl-job\xxl-job-executor-samples\xxl-job-executor-sample-springboot目錄下:

mvn package -Dmaven.test.skip=true

如果這個(gè)目錄無法打包,就在根目錄下打包。

1.1.2. 啟動(dòng)工程

運(yùn)行執(zhí)行器,上述路徑的target目錄下:

java -jar xxl-job-admin-2.2.1-SNAPSHOT.jar

或者直接運(yùn)行XxlJobExecutorApplication啟動(dòng)類。

可以做集群部署, 這兩項(xiàng)配置要一致:

xxl.job.admin.addresses=
xxl.job.executor.appname=

1.1. 添加任務(wù)

指揮官有了,工人也有了,下面就可以派事情給工人做了,也就是創(chuàng)建任務(wù)。

登錄調(diào)度中心,打開任務(wù)管理界面,新增任務(wù)。

分布式定時(shí)任務(wù)調(diào)度xxl-job,學(xué)習(xí)筆記,分布式

1.1.1. 路由策略

路由策略指的是一個(gè)任務(wù)選擇哪個(gè)執(zhí)行器去執(zhí)行,Quartz只能隨機(jī)負(fù)載。當(dāng)執(zhí)行器做集群部署的時(shí)候才有意義。Xxl-job提供了豐富的路由策略,包括:

策略 參數(shù)值 詳細(xì)含義
第一個(gè) FIRST 固定選擇第一個(gè)機(jī)器
最后一個(gè) LAST 固定選擇最后一個(gè)機(jī)器
輪詢 ROUND 依次選擇執(zhí)行
隨機(jī) RANDOM 隨機(jī)選擇在線的機(jī)器
一致性HASH CONSISTENT_HASH 每個(gè)任務(wù)按照Hash算法固定選擇某一臺機(jī)器,且所有任務(wù)均勻散列在不同機(jī)器上
最不經(jīng)常使用 LEAST_FREQUENTLY_USED 使用頻率最低的機(jī)器優(yōu)先被選舉
最近最久未使用 LEAST_RECENTLY_USED 最久未使用的機(jī)器優(yōu)先被選舉
故障轉(zhuǎn)移 FAILOVER 按照順序依次進(jìn)行心跳檢測,第一個(gè)心跳檢測成功的機(jī)器選定為目標(biāo)執(zhí)行器并發(fā)起調(diào)度
忙碌轉(zhuǎn)移 BUSYOVER 按照順序依次進(jìn)行空閑檢測,第一個(gè)空閑檢測成功的機(jī)器選定為目標(biāo)執(zhí)行器并發(fā)起調(diào)度
分片廣播 SHARDING_BROADCAST 廣播觸發(fā)對應(yīng)集群中所有機(jī)器執(zhí)行一次任務(wù),同時(shí)系統(tǒng)自動(dòng)傳遞分片參數(shù);可根據(jù)分片參數(shù)開發(fā)分片任務(wù)

1.1.2. 運(yùn)行模式

在xxl-job中,不僅支持運(yùn)行預(yù)先編寫好的任務(wù)類,還可以直接輸入代碼或者腳本運(yùn)行(上代碼都不用審核了??)。

運(yùn)行任務(wù)類,這種方式就叫做BEAN模式,需要指定任務(wù)類,這個(gè)任務(wù)類就叫做JobHandler,是在執(zhí)行器端編寫的。

運(yùn)行代碼或者腳本,叫做GLUE模式,支持Java、Shell、Python、PHP、Nodejs、PowerShell,這個(gè)時(shí)候代碼是直接維護(hù)在調(diào)度器這邊的。

1.1.3. 阻塞處理策略

阻塞處理策略,指的是任務(wù)的一次運(yùn)行還沒有結(jié)束的時(shí)候,下一次調(diào)度的時(shí)間又到了,這個(gè)時(shí)候怎么處理。

策略 參數(shù)值 詳細(xì)含義
單機(jī)串行,默認(rèn) SERIAL_EXECUTION 調(diào)度請求進(jìn)入單機(jī)執(zhí)行器后,調(diào)度請求進(jìn)入FIFO隊(duì)列并以串行方式運(yùn)行
丟棄后續(xù)調(diào)度 DISCARD_LATER 調(diào)度請求進(jìn)入單機(jī)執(zhí)行器后,發(fā)現(xiàn)執(zhí)行器存在運(yùn)行的調(diào)度任務(wù),本次請求將會(huì)被丟棄并標(biāo)記為失敗
覆蓋之前調(diào)度 COVER_EARLY 調(diào)度請求進(jìn)入單機(jī)執(zhí)行器后,發(fā)現(xiàn)執(zhí)行器存在運(yùn)行的調(diào)度任務(wù),將會(huì)終止運(yùn)行中的調(diào)度任務(wù)并清空隊(duì)列,然后運(yùn)行本地調(diào)度任務(wù)

1、SERIAL_EXECUTION(單機(jī)串行,默認(rèn)):對當(dāng)前線程不做任何處理,并在當(dāng)前線程的隊(duì)列里增加一個(gè)執(zhí)行任務(wù)(一次只執(zhí)行一個(gè)任務(wù))。

2、DISCARD_LATER(丟棄后續(xù)調(diào)度):如果當(dāng)前線程阻塞,后續(xù)任務(wù)不再執(zhí)行,直接返回失?。ㄗ枞筒辉賵?zhí)行了)。

3、COVER_EARLY(覆蓋之前調(diào)度):創(chuàng)建一個(gè)移除原因,新建一個(gè)線程去執(zhí)行后續(xù)任務(wù)(殺掉當(dāng)前線程)。

1.1.4. 子任務(wù)

如果需要在本任務(wù)執(zhí)行結(jié)束并且執(zhí)行成功的時(shí)候觸發(fā)另外一個(gè)任務(wù),那么就可以把另外的任務(wù)作為本任務(wù)的子任務(wù)運(yùn)行。

因?yàn)槊總€(gè)任務(wù)都擁有一個(gè)唯一的任務(wù)ID(任務(wù)ID可以從任務(wù)列表獲取),只需要把JobId填上就可以了。

比如:下載對賬文件的任務(wù)成功以后,開始解析文件入庫。入庫成功以后,開始對賬。

這樣,多個(gè)任務(wù)就實(shí)現(xiàn)了串行調(diào)度。

1.2. 任務(wù)操作

分布式定時(shí)任務(wù)調(diào)度xxl-job,學(xué)習(xí)筆記,分布式

1. Xxl-job 任務(wù)詳解

1.1. Spring Boot任務(wù)類型

com.xxl.job.executor.service.jobhandler.SampleXxlJob

開發(fā)步驟:

1、在Spring Bean實(shí)例中,開發(fā)Job方法,方式格式要求為 “public ReturnT<String> execute(String param)”

2、為Job方法添加注解 “@XxlJob(value=“自定義jobhandler名稱”, init = “JobHandler初始化方法”, destroy = “JobHandler銷毀方法”)”,注解value值對應(yīng)的是調(diào)度中心新建任務(wù)的JobHandler屬性的值。

3、執(zhí)行日志:需要通過 "XxlJobLogger.log" 打印執(zhí)行日志;

**demoJobHandler:**簡單示例任務(wù),任務(wù)內(nèi)部模擬耗時(shí)任務(wù)邏輯,用戶可在線體驗(yàn)Rolling Log等功能;

**shardingJobHandler:**分片示例任務(wù),任務(wù)內(nèi)部模擬處理分片參數(shù),可參考熟悉分片任務(wù);

**commandJobHandler:**命令行任務(wù);

**httpJobHandler:**通用HTTP任務(wù)Handler;業(yè)務(wù)方只需要提供HTTP鏈接等信息即可,不限制語言、平臺。

/**
 * 分片廣播任務(wù)
 * 直接由路由策略--> 分片廣播
 * @author xuxueli 2017-07-25 20:56:50
 * 如果想實(shí)現(xiàn)分片策略可以在此業(yè)務(wù)類中進(jìn)行實(shí)現(xiàn)邏輯
 */
public class ShardingJobHandler extends IJobHandler {
	@Override
	public ReturnT<String> execute(String param) throws Exception {

		// 分片參數(shù)
		ShardingUtil.ShardingVO shardingVO = ShardingUtil.getShardingVo();
		XxlJobLogger.log("分片參數(shù):當(dāng)前分片序號 = {}, 總分片數(shù) = {}", shardingVO.getIndex(), shardingVO.getTotal());
		// 業(yè)務(wù)邏輯
		for (int i = 0; i < shardingVO.getTotal(); i++) {
			if (i == shardingVO.getIndex()) {
				XxlJobLogger.log("第 {} 片, 命中分片開始處理", i);
			} else {
				XxlJobLogger.log("第 {} 片, 忽略", i);
			}
		}
		return SUCCESS;
	}
}
/**
 * 命令行任務(wù)
 *
 * @author xuxueli 2018-09-16 03:48:34
 */
public class CommandJobHandler extends IJobHandler {

    @Override
    public ReturnT<String> execute(String param) throws Exception {
        String command = param;
        int exitValue = -1;

        BufferedReader bufferedReader = null;
        try {
            // command process
            Process process = Runtime.getRuntime().exec(command);
            BufferedInputStream bufferedInputStream = new BufferedInputStream(process.getInputStream());
            bufferedReader = new BufferedReader(new InputStreamReader(bufferedInputStream));

            // command log
            String line;
            while ((line = bufferedReader.readLine()) != null) {
                XxlJobLogger.log(line);
            }

            // command exit
            process.waitFor();
            exitValue = process.exitValue();
        } catch (Exception e) {
            XxlJobLogger.log(e);
        } finally {
            if (bufferedReader != null) {
                bufferedReader.close();
            }
        }

        if (exitValue == 0) {
            return IJobHandler.SUCCESS;
        } else {
            return new ReturnT<String>(IJobHandler.FAIL.getCode(), "command exit value("+exitValue+") is failed");
        }
    }
}
/**
 * 跨平臺Http任務(wù)
 *
 * @author xuxueli 2018-09-16 03:48:34
 */
public class HttpJobHandler extends IJobHandler {

	@Override
	public ReturnT<String> execute(String param) throws Exception {

		// param parse
		if (param==null || param.trim().length()==0) {
			XxlJobLogger.log("param["+ param +"] invalid.");
			return ReturnT.FAIL;
		}
		String[] httpParams = param.split("\n");
		String url = null;
		String method = null;
		String data = null;
		for (String httpParam: httpParams) {
			if (httpParam.startsWith("url:")) {
				url = httpParam.substring(httpParam.indexOf("url:") + 4).trim();
			}
			if (httpParam.startsWith("method:")) {
				method = httpParam.substring(httpParam.indexOf("method:") + 7).trim().toUpperCase();
			}
			if (httpParam.startsWith("data:")) {
				data = httpParam.substring(httpParam.indexOf("data:") + 5).trim();
			}
		}

		// param valid
		if (url==null || url.trim().length()==0) {
			XxlJobLogger.log("url["+ url +"] invalid.");
			return ReturnT.FAIL;
		}
		if (method==null || !Arrays.asList("GET", "POST").contains(method)) {
			XxlJobLogger.log("method["+ method +"] invalid.");
			return ReturnT.FAIL;
		}

		// request
		HttpURLConnection connection = null;
		BufferedReader bufferedReader = null;
		try {
			// connection
			URL realUrl = new URL(url);
			connection = (HttpURLConnection) realUrl.openConnection();

			// connection setting
			connection.setRequestMethod(method);
			connection.setDoOutput(true);
			connection.setDoInput(true);
			connection.setUseCaches(false);
			connection.setReadTimeout(5 * 1000);
			connection.setConnectTimeout(3 * 1000);
			connection.setRequestProperty("connection", "Keep-Alive");
			connection.setRequestProperty("Content-Type", "application/json;charset=UTF-8");
			connection.setRequestProperty("Accept-Charset", "application/json;charset=UTF-8");

			// do connection
			connection.connect();

			// data
			if (data!=null && data.trim().length()>0) {
				DataOutputStream dataOutputStream = new DataOutputStream(connection.getOutputStream());
				dataOutputStream.write(data.getBytes("UTF-8"));
				dataOutputStream.flush();
				dataOutputStream.close();
			}

			// valid StatusCode
			int statusCode = connection.getResponseCode();
			if (statusCode != 200) {
				throw new RuntimeException("Http Request StatusCode(" + statusCode + ") Invalid.");
			}

			// result
			bufferedReader = new BufferedReader(new InputStreamReader(connection.getInputStream(), "UTF-8"));
			StringBuilder result = new StringBuilder();
			String line;
			while ((line = bufferedReader.readLine()) != null) {
				result.append(line);
			}
			String responseMsg = result.toString();

			XxlJobLogger.log(responseMsg);
			return ReturnT.SUCCESS;
		} catch (Exception e) {
			XxlJobLogger.log(e);
			return ReturnT.FAIL;
		} finally {
			try {
				if (bufferedReader != null) {
					bufferedReader.close();
				}
				if (connection != null) {
					connection.disconnect();
				}
			} catch (Exception e2) {
				XxlJobLogger.log(e2);
			}
		}
	}
}

2. xxl-job架構(gòu)設(shè)計(jì)

xxl-job跟Quartz特性和部署方式的不同,本質(zhì)上是因?yàn)榧軜?gòu)設(shè)計(jì)有著很大的區(qū)別。

1.1. 設(shè)計(jì)思想

1.1.1. 調(diào)度與任務(wù)解耦

在Quartz中,調(diào)度邏輯和任務(wù)代碼是耦合在一起的。

而xxl-job把調(diào)度的動(dòng)作抽象和獨(dú)立出來,形成“調(diào)度中心”公共平臺。調(diào)度中心只負(fù)責(zé)發(fā)起調(diào)度請求,平臺自身并不承擔(dān)業(yè)務(wù)邏輯。

將任務(wù)抽象成分散的JobHandler,交由“執(zhí)行器”統(tǒng)一管理,“執(zhí)行器”負(fù)責(zé)接收調(diào)度請求并執(zhí)行對應(yīng)的JobHandler中業(yè)務(wù)邏輯。

因此,“調(diào)度”和“任務(wù)”兩部分可以相互解耦,提高系統(tǒng)整體穩(wěn)定性和擴(kuò)展性。

1.1.2. 全異步化& 輕量級

全異步化設(shè)計(jì):XXL-JOB系統(tǒng)中業(yè)務(wù)邏輯在遠(yuǎn)程執(zhí)行器執(zhí)行,觸發(fā)流程全異步化設(shè)計(jì)。相比直接在調(diào)度中心內(nèi)部執(zhí)行業(yè)務(wù)邏輯,極大的降低了調(diào)度線程占用時(shí)間;

異步調(diào)度:調(diào)度中心每次任務(wù)觸發(fā)時(shí)僅發(fā)送一次調(diào)度請求,該調(diào)度請求首先推送“異步調(diào)度隊(duì)列”,然后異步推送給遠(yuǎn)程執(zhí)行器。

異步執(zhí)行:執(zhí)行器會(huì)將請求存入“異步執(zhí)行隊(duì)列”并且立即響應(yīng)調(diào)度中心,異步運(yùn)行。

輕量級設(shè)計(jì):XXL-JOB調(diào)度中心中每個(gè)JOB邏輯非常 “輕”,在全異步化的基礎(chǔ)上,單個(gè)JOB一次運(yùn)行平均耗時(shí)基本在 “10ms” 之內(nèi)(基本為一次請求的網(wǎng)絡(luò)開銷);因此,可以保證使用有限的線程支撐大量的JOB并發(fā)運(yùn)行;

得益于上述兩點(diǎn)優(yōu)化,理論上默認(rèn)配置下的調(diào)度中心,單機(jī)能夠支撐5000 任務(wù)并發(fā)運(yùn)行穩(wěn)定運(yùn)行;

實(shí)際場景中,由于調(diào)度中心與執(zhí)行器網(wǎng)絡(luò)ping延遲不同、DB讀寫耗時(shí)不同、任務(wù)調(diào)度密集程度不同,會(huì)導(dǎo)致任務(wù)量上限會(huì)上下波動(dòng)。

如若需要支撐更多的任務(wù)量,可以通過調(diào)大調(diào)度線程數(shù)、降低調(diào)度中心與執(zhí)行器ping延遲和提升機(jī)器配置幾種方式優(yōu)化。

1.1.3. 均衡調(diào)度

調(diào)度中心在集群部署時(shí)會(huì)自動(dòng)進(jìn)行任務(wù)平均分配,觸發(fā)組件每次獲取與線程池?cái)?shù)量(調(diào)度中心支持自定義調(diào)度線程池大?。┫嚓P(guān)數(shù)量的任務(wù),避免大量任務(wù)集中在單個(gè)調(diào)度中心集群節(jié)點(diǎn)。

1.2. 系統(tǒng)組成

分布式定時(shí)任務(wù)調(diào)度xxl-job,學(xué)習(xí)筆記,分布式

整體上分為兩個(gè)模塊:

1.1.4. 調(diào)度模塊(調(diào)度中心)

負(fù)責(zé)管理調(diào)度信息,按照調(diào)度配置發(fā)出調(diào)度請求,自身不承擔(dān)業(yè)務(wù)代碼。調(diào)度系統(tǒng)與任務(wù)解耦,提高了系統(tǒng)可用性和穩(wěn)定性,同時(shí)調(diào)度系統(tǒng)性能不再受限于任務(wù)模塊;

調(diào)度中心支持可視化、簡單且動(dòng)態(tài)的管理調(diào)度信息,包括任務(wù)新建,更新,刪除,GLUE開發(fā)和任務(wù)報(bào)警等,所有上述操作都會(huì)實(shí)時(shí)生效,同時(shí)支持監(jiān)控調(diào)度結(jié)果以及執(zhí)行日志,支持執(zhí)行器Failover。

1.1.5. 執(zhí)行模塊(執(zhí)行器)

負(fù)責(zé)接收調(diào)度請求并執(zhí)行任務(wù)邏輯。任務(wù)模塊專注于任務(wù)的執(zhí)行等操作,開發(fā)和維護(hù)更加簡單和高效;

接收“調(diào)度中心”的執(zhí)行請求、終止請求和日志請求等。

從整體來看,xxl-job架構(gòu)依賴較少,功能強(qiáng)大,簡約而不簡單,方便部署,易于使用。

3. xxl-job原理分析

源碼部分

當(dāng)執(zhí)行器集群部署的時(shí)候,調(diào)度器需要為任務(wù)執(zhí)行選擇執(zhí)行器。所以,執(zhí)行器在啟動(dòng)的時(shí)候,必須先注冊到調(diào)度中心,保存在數(shù)據(jù)庫。

1.1. 執(zhí)行器啟動(dòng)與注冊

分布式定時(shí)任務(wù)調(diào)度xxl-job,學(xué)習(xí)筆記,分布式
分布式定時(shí)任務(wù)調(diào)度xxl-job,學(xué)習(xí)筆記,分布式

https://blog.csdn.net/oushitian/article/details/87938682

https://blog.csdn.net/RabbitInTheGrass/article/details/106918236

執(zhí)行器的注冊與發(fā)現(xiàn)有兩種方式:

1、一種是執(zhí)行器啟動(dòng)的時(shí)候,主動(dòng)到調(diào)度中心注冊,并定時(shí)發(fā)送心跳,保持續(xù)約。執(zhí)行器正常關(guān)閉時(shí),也主動(dòng)告知調(diào)度中心注銷掉。這種方式叫做主動(dòng)注冊。(調(diào)度中心類似于注冊中心eureka,nacos,rocketmq的namesrv

2、如果執(zhí)行器宕機(jī)或者網(wǎng)絡(luò)出問題了,調(diào)度中心就不知道執(zhí)行器的情況,如果把任務(wù)路由給一個(gè)不可用的執(zhí)行器執(zhí)行,就會(huì)導(dǎo)致任務(wù)執(zhí)行失敗。

所以,調(diào)度中心本身也需要不斷地對執(zhí)行器進(jìn)行探活。調(diào)度中心會(huì)啟動(dòng)一個(gè)專門的后臺線程,定時(shí)調(diào)用執(zhí)行器接口,如果發(fā)現(xiàn)異常就下線掉。
下面從執(zhí)行器的源碼去驗(yàn)證一下。

首先,一個(gè)Spring Boot的項(xiàng)目啟動(dòng)從哪里入手?

從配置類XxlJobConfig出發(fā),這里用到了配置的參數(shù)。

配置類定義了一個(gè)XxlJobSpringExecutor,會(huì)在啟動(dòng)掃描配置類的時(shí)候創(chuàng)建執(zhí)行器。XxlJobSpringExecutor繼承了XxlJobExecutor

父類實(shí)現(xiàn)了SmartInitializingSingleton接口,在對象初始化的時(shí)候會(huì)調(diào)用afterSingletonsInstantiated()方法,這里面父類的start()方法。
分布式定時(shí)任務(wù)調(diào)度xxl-job,學(xué)習(xí)筆記,分布式

這里面做了幾件事:

// 初始化日志路徑
XxlJobFileAppender.initLogPath(logPath);

// 創(chuàng)建調(diào)度器的客戶端
initAdminBizList(adminAddresses, accessToken);

// 初始化日志清理線程
JobLogFileCleanThread.getInstance().start(logRetentionDays);

// 初始化Trigger回調(diào)線程
TriggerCallbackThread.getInstance().start();

// 初始化執(zhí)行器服務(wù)器
initEmbedServer(address, ip, port, appname, accessToken);

分布式定時(shí)任務(wù)調(diào)度xxl-job,學(xué)習(xí)筆記,分布式

initAdminBizList創(chuàng)建調(diào)度器客戶端,是執(zhí)行器用來連接調(diào)度器的。

Trigger回調(diào)線程用來處理任務(wù)執(zhí)行完畢后的回調(diào),為什么需要回調(diào)。

從initEmbedServer方法進(jìn)入執(zhí)行器的創(chuàng)建,到embedServer.start。叫做embedServer是因?yàn)镾pring Boot里面是用的內(nèi)置的Tomcat啟動(dòng)的。

embedServer = new EmbedServer();
embedServer.start(address, port, appname, accessToken);

分布式定時(shí)任務(wù)調(diào)度xxl-job,學(xué)習(xí)筆記,分布式

在這個(gè)start方法里面,最后有一個(gè)thread.start(),也就是調(diào)用了線程的run方法。線程是上面new出來的。在run方法里面,創(chuàng)建了一個(gè)名字叫bizThreadPool 的ThreadPoolExecutor,也就是業(yè)務(wù)線程的線程池。

ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(

然后啟動(dòng)了一個(gè)Netty包的ServerBootstrap,然后啟動(dòng)服務(wù)器。

ServerBootstrap bootstrap = new ServerBootstrap();

在這里面要把執(zhí)行器注冊到調(diào)度中心。

startRegistry(appname, address);

分布式定時(shí)任務(wù)調(diào)度xxl-job,學(xué)習(xí)筆記,分布式

到了ExecutorRegistryThread,在start方法里面最后啟動(dòng)了這個(gè)線程:

ExecutorRegistryThread.getInstance().start(appname, address);

分布式定時(shí)任務(wù)調(diào)度xxl-job,學(xué)習(xí)筆記,分布式

registryThread.start();也就是執(zhí)行了這個(gè)創(chuàng)建的線程的run方法。

首先拿到調(diào)度器的列表,它有可能是集群部署的。

for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {

分布式定時(shí)任務(wù)調(diào)度xxl-job,學(xué)習(xí)筆記,分布式

然后挨個(gè)注冊上去,調(diào)用的是AdminBizClient的registry方法(這個(gè)類是core包里面的):

public ReturnT<String> registry(RegistryParam registryParam) {
	return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, timeout, registryParam, String.class);
}

分布式定時(shí)任務(wù)調(diào)度xxl-job,學(xué)習(xí)筆記,分布式

調(diào)用了HTTP的接口,實(shí)際地址是:

http://127.0.0.1:7391/xxl-job-admin/ api/registry

在舊的版本中用的是XXL-RPC,后來改成了Restful的API。

請求的是com.xxl.job.admin.controller.JobApiController的api方法,這里有一個(gè)分支:

if ("registry".equals(uri)) {
RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);
	return adminBiz.registry(registryParam);
}
這個(gè)時(shí)候會(huì)調(diào)用到AdminBizImpl的registryUpdate方法:

分布式定時(shí)任務(wù)調(diào)度xxl-job,學(xué)習(xí)筆記,分布式

//注冊中心也是類似這么設(shè)計(jì)的
int ret = xxlJobRegistryDao.registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
        
//小于1說明記錄不存在,插入記錄
xxlJobRegistryDao.registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());

這個(gè)接口方法是沒有實(shí)現(xiàn)類的——其實(shí)就是MyBatis的Mapper,把執(zhí)行器保存到數(shù)據(jù)庫。

XxlJobRegistryMapper.xml

<update id="registryUpdate" >
  UPDATE xxl_job_registry SET `update_time` = #{updateTime}
        WHERE `registry_group` = #{registryGroup}
          AND `registry_key` = #{registryKey}
          AND `registry_value` = #{registryValue}
    </update>

<insert id="registrySave" >
	INSERT INTO xxl_job_registry( `registry_group` , `registry_key` , `registry_value`, `update_time`)
VALUES( #{registryGroup} , #{registryKey} , #{registryValue}, #{updateTime})
</insert>

后臺線程探活這一塊,在調(diào)度器的代碼中:

JobRegistryMonitorHelper.getInstance().start();

1.2. 調(diào)度器啟動(dòng)與任務(wù)執(zhí)行(調(diào)度中心如何啟動(dòng)的)

執(zhí)行器啟動(dòng)好以后,工人就準(zhǔn)備干活了,接下來就看一下指揮官上崗以后是怎么指揮工人的。實(shí)際上是先啟動(dòng)調(diào)度器再啟動(dòng)執(zhí)行器,但是因?yàn)檎{(diào)度的流程涉及到執(zhí)行器,所以先分析執(zhí)行器。

下面看看調(diào)度器是如何啟動(dòng)的,任務(wù)是如何得到執(zhí)行的。

1.1.1. 從配置類入手

Spring Boot的工程,一樣從配置類XxlJobAdminConfig入手。它實(shí)現(xiàn)了InitializingBean接口,會(huì)在初始化的時(shí)候調(diào)用afterPropertiesSet方法:

public void afterPropertiesSet() throws Exception {
adminConfig = this;
xxlJobScheduler = new XxlJobScheduler();
xxlJobScheduler.init();
}

分布式定時(shí)任務(wù)調(diào)度xxl-job,學(xué)習(xí)筆記,分布式

init方法里面做了幾件事情:
分布式定時(shí)任務(wù)調(diào)度xxl-job,學(xué)習(xí)筆記,分布式

// 任務(wù)注冊監(jiān)控器
JobRegistryMonitorHelper.getInstance().start();

// 任務(wù)調(diào)度失敗的監(jiān)控器,失敗重試,失敗郵件發(fā)送
JobFailMonitorHelper.getInstance().start();

// 任務(wù)結(jié)果丟失處理
JobLosedMonitorHelper.getInstance().start();

// trigger pool啟動(dòng)
JobTriggerPoolHelper.toStart();

// log report啟動(dòng)
JobLogReportHelper.getInstance().start();

// start-schedule
JobScheduleHelper.getInstance().start();

JobRegistryMonitorHelper做的事情是不停地更新注冊表,把超時(shí)的執(zhí)行器剔除。每隔30秒執(zhí)行一次:
分布式定時(shí)任務(wù)調(diào)度xxl-job,學(xué)習(xí)筆記,分布式

TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);

JobTriggerPoolHelper創(chuàng)建了兩個(gè)線程池,一個(gè)快的線程池,一個(gè)慢的線程池,作用后面就會(huì)講到。

這里主要關(guān)注的是調(diào)度器(指揮官)如何啟動(dòng)的,進(jìn)入JobScheduleHelper的start方法,這段方法總體上看起來是這樣的:
分布式定時(shí)任務(wù)調(diào)度xxl-job,學(xué)習(xí)筆記,分布式

public void start(){
// schedule thread
scheduleThread = new Thread(...);
scheduleThread.setDaemon(true);
scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");
scheduleThread.start();

// ring thread
ringThread = new Thread(...);
ringThread.setDaemon(true);
ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");
ringThread.start();
}

也就是創(chuàng)建并且啟動(dòng)了兩個(gè)后臺線程,一個(gè)是調(diào)度線程,一個(gè)是時(shí)間輪線程。先從第一個(gè)線程開始說起。

1.1.2. 調(diào)度器線程

這里創(chuàng)建了一個(gè)scheduleThread線程,后面調(diào)用了start方法,也就是會(huì)進(jìn)入run方法。
分布式定時(shí)任務(wù)調(diào)度xxl-job,學(xué)習(xí)筆記,分布式

scheduleThread的run方法中,先隨機(jī)睡眠4-5秒,為什么?為了防止執(zhí)行器集中啟動(dòng)出現(xiàn)過多的資源競爭。

TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );

然后計(jì)算預(yù)讀取的任務(wù)數(shù),這里默認(rèn)是6000個(gè)。

int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;

后面是一個(gè)while循環(huán),也就是調(diào)度器重復(fù)不斷地在做的事情。
分布式定時(shí)任務(wù)調(diào)度xxl-job,學(xué)習(xí)筆記,分布式

1.1.3. 獲取任務(wù)鎖

第一步是獲取數(shù)據(jù)庫的排他鎖,因?yàn)樗械墓?jié)點(diǎn)連接到的數(shù)據(jù)庫是同一個(gè)實(shí)例,所以這里是一個(gè)分布式環(huán)境的鎖。也就是后面的過程是互斥的,如果有多個(gè)調(diào)度器的服務(wù),同一時(shí)間只能有一個(gè)調(diào)度器在獲取任務(wù)信息:

preparedStatement = conn.prepareStatement(  "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );

獲取的是job_lock表的lock_name=schedule_lock這一行數(shù)據(jù)的行鎖。

如果加鎖沒有成功,說明其他調(diào)度中心在加載任務(wù)了,只能等其他節(jié)點(diǎn)提交事務(wù)或者回滾事務(wù),釋放鎖以后才能獲取鎖。

獲取鎖成功后查詢?nèi)蝿?wù):

<select id="scheduleJobQuery" parameterType="java.util.HashMap" resultMap="XxlJobInfo">
SELECT <include refid="Base_Column_List" />
FROM xxl_job_info AS t
WHERE t.trigger_status = 1
and t.trigger_next_time <![CDATA[ <= ]]> #{maxNextTime}
ORDER BY id ASC
LIMIT #{pagesize}
</select>

這個(gè)SQL的含義:從任務(wù)表查詢狀態(tài)是1,并且下次觸發(fā)時(shí)間小于{maxNextTime}的任務(wù);{maxNextTime}=nowTime(當(dāng)前時(shí)間) + PRE_READ_MS(5秒),也就是說查詢5秒鐘之內(nèi)需要觸發(fā)的任務(wù):

分布式定時(shí)任務(wù)調(diào)度xxl-job,學(xué)習(xí)筆記,分布式

1.1.4. 調(diào)度任務(wù)

這里根據(jù)任務(wù)的觸發(fā)時(shí)間分成了三種情況。

這里假設(shè)任務(wù)的下次觸發(fā)時(shí)間(TriggerNextTime)是9點(diǎn)0分30秒,2秒鐘觸發(fā)一次。

分布式定時(shí)任務(wù)調(diào)度xxl-job,學(xué)習(xí)筆記,分布式

第一種情況就是當(dāng)前時(shí)間已經(jīng)是9點(diǎn)0分35秒以后了。

如果nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS,也就是觸發(fā)時(shí)間已經(jīng)過期5秒以上,那就不能調(diào)度了(misfire了),讓它到下次觸發(fā)的時(shí)間再跑,這里只需要更新下次觸發(fā)時(shí)間。

什么時(shí)候會(huì)超時(shí)?比如你的查詢非常慢,或者你查詢到等待觸發(fā)的任務(wù)以后,debug停在上面很久才走到時(shí)間判斷。

第二種情況(正常情況):nowTime > jobInfo.getTriggerNextTime(),已經(jīng)過了觸發(fā)時(shí)間,但是沒有超過5秒,時(shí)間是9點(diǎn)0分30秒到9點(diǎn)0分35秒之間。

這里要做的事情有四步:

1、觸發(fā)任務(wù)

2、更新下次觸發(fā)時(shí)間

一次觸發(fā)完成之后,來一次預(yù)讀,看看再下次的觸發(fā)時(shí)間是不是滿足:

nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()

下次觸發(fā)時(shí)間是9點(diǎn)0分32秒,現(xiàn)在時(shí)間大于9點(diǎn)0分27秒(距離下次觸發(fā)時(shí)間不足5秒了):

3、丟入時(shí)間輪

4、觸發(fā)完了再把時(shí)間更新為下次更新時(shí)間

這里重點(diǎn)有兩個(gè),觸發(fā)的時(shí)候做了什么。丟入時(shí)間輪做了什么。

第三種情況:還沒到9點(diǎn)0分30秒。

1、丟入時(shí)間輪

2、刷新一下下次觸發(fā)時(shí)間,因?yàn)檫€沒觸發(fā),實(shí)際上時(shí)間沒變

所以,這里要重點(diǎn)關(guān)注一下,任務(wù)觸發(fā)的時(shí)候,是怎么觸發(fā)的。丟入時(shí)間輪,又是一個(gè)什么操作。
分布式定時(shí)任務(wù)調(diào)度xxl-job,學(xué)習(xí)筆記,分布式

1.1.5. 任務(wù)觸發(fā)

分布式定時(shí)任務(wù)調(diào)度xxl-job,學(xué)習(xí)筆記,分布式

從JobTriggerPoolHelper的trigger方法進(jìn)入,又到了JobTriggerPoolHelper的addTrigger方法。
分布式定時(shí)任務(wù)調(diào)度xxl-job,學(xué)習(xí)筆記,分布式

這里設(shè)計(jì)了兩個(gè)線程池:fastTriggerPool和slowTriggerPool,如果1分鐘內(nèi)過期了10次,就使用慢的線程池。
分布式定時(shí)任務(wù)調(diào)度xxl-job,學(xué)習(xí)筆記,分布式

ThreadPoolExecutor triggerPool_ = fastTriggerPool;
AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId);
if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) {      // job-timeout 10 times in 1 min
triggerPool_ = slowTriggerPool;
}

這里相當(dāng)于做了一個(gè)線程池隔離,即使有很多慢的任務(wù),也只能把慢任務(wù)的線程池耗光(秒?。。?。

什么樣的任務(wù)會(huì)使用慢的線程池來執(zhí)行呢?JobTriggerPoolHelper中addTrigger的末尾:如果這次執(zhí)行超過了500ms,就給它標(biāo)記一下,超過10次,它就要被丟到下等艙了。

if (cost > 500) {       // ob-timeout threshold 500ms
AtomicInteger timeoutCount = jobTimeoutCountMap.putIfAbsent(jobId, new AtomicInteger(1));
if (timeoutCount != null) {
timeoutCount.incrementAndGet();
}
}

線程池選擇好以后,execute一下,也就是分配線程來執(zhí)行觸發(fā)任務(wù)。
分布式定時(shí)任務(wù)調(diào)度xxl-job,學(xué)習(xí)筆記,分布式

進(jìn)入XxlJobTrigger的trigger方法:
分布式定時(shí)任務(wù)調(diào)度xxl-job,學(xué)習(xí)筆記,分布式

XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam, addressList);

在XxlJobTrigger的trigger方法中,先拿到任務(wù)信息,如果方法參數(shù)failRetryCount>0,就用參數(shù)值,否則用Job定義的failRetryCount。這里傳進(jìn)來的是-1。

XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);

拿到失敗重試次數(shù)和組別:

int finalFailRetryCount = failRetryCount>=0?failRetryCount:jobInfo.getExecutorFailRetryCount();
XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());

先不考慮廣播分片的情況,分片的原理后面再分析。直接走到末尾的else,processTrigger:

processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);

分布式定時(shí)任務(wù)調(diào)度xxl-job,學(xué)習(xí)筆記,分布式

前面是獲取一些參數(shù),然后記錄日志,初始化Trigger參數(shù)。

然后獲取路由,把結(jié)果放入routeAddressResult。如果是廣播分片,所有的節(jié)點(diǎn)都要參與負(fù)載,否則要根據(jù)策略獲取執(zhí)行器地址。

不同的路由策略,獲取路由的方式也不一樣,這里是典型的策略模式:
分布式定時(shí)任務(wù)調(diào)度xxl-job,學(xué)習(xí)筆記,分布式

routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());

分布式定時(shí)任務(wù)調(diào)度xxl-job,學(xué)習(xí)筆記,分布式

回顧一下:

路由參數(shù) 翻譯 詳細(xì)含義
FIRST 第一個(gè) 固定選擇第一個(gè)機(jī)器
LAST 最后一個(gè) 固定選擇最后一個(gè)機(jī)器
ROUND 輪詢 依次選擇執(zhí)行
RANDOM 隨機(jī) 隨機(jī)選擇在線的機(jī)器
CONSISTENT_HASH 一致性HASH 每個(gè)任務(wù)按照Hash算法固定選擇某一臺機(jī)器,且所有任務(wù)均勻散列在不同機(jī)器上
LEAST_FREQUENTLY_USED LRU最不經(jīng)常使用 使用頻率最低的機(jī)器優(yōu)先被選舉
LEAST_RECENTLY_USED LRU最近最久未使用 最久未使用的機(jī)器優(yōu)先被選舉
FAILOVER 故障轉(zhuǎn)移 按照順序依次進(jìn)行心跳檢測,第一個(gè)心跳檢測成功的機(jī)器選定為目標(biāo)執(zhí)行器并發(fā)起調(diào)度
BUSYOVER 忙碌轉(zhuǎn)移 按照順序依次進(jìn)行空閑檢測,第一個(gè)空閑檢測成功的機(jī)器選定為目標(biāo)執(zhí)行器并發(fā)起調(diào)度
SHARDING_BROADCAST 分片廣播 廣播觸發(fā)對應(yīng)集群中所有機(jī)器執(zhí)行一次任務(wù),同時(shí)系統(tǒng)自動(dòng)傳遞分片參數(shù);可根據(jù)分片參數(shù)開發(fā)分片任務(wù)

如果沒有啟動(dòng)執(zhí)行器,那就拿不到執(zhí)行器地址。

分布式定時(shí)任務(wù)調(diào)度xxl-job,學(xué)習(xí)筆記,分布式

拿到執(zhí)行器地址以后,runExecutor觸發(fā)遠(yuǎn)程的執(zhí)行器:

triggerResult = runExecutor(triggerParam, address);

分布式定時(shí)任務(wù)調(diào)度xxl-job,學(xué)習(xí)筆記,分布式

分布式定時(shí)任務(wù)調(diào)度xxl-job,學(xué)習(xí)筆記,分布式
分布式定時(shí)任務(wù)調(diào)度xxl-job,學(xué)習(xí)筆記,分布式

這里調(diào)用的是ExecutorBizClient的run方法:

public ReturnT<String> run(TriggerParam triggerParam) {
return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class);
}

分布式定時(shí)任務(wù)調(diào)度xxl-job,學(xué)習(xí)筆記,分布式

這里就調(diào)用了執(zhí)行器的遠(yuǎn)程接口(http://192.168.44.1:9999/run),執(zhí)行器接收到調(diào)用請求怎么處理后面再說。

參數(shù)內(nèi)容:
TriggerParam{jobId=2, executorHandler='', executorParams='', executorBlockStrategy='SERIAL_EXECUTION', executorTimeout=0, logId=10337, logDateTime=1601783382002, glueType='GLUE_GROOVY', glueSource='package com.xxl.job.service.handler;

import com.xxl.job.core.log.XxlJobLogger;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.handler.IJobHandler;

public class DemoGlueJobHandler extends IJobHandler {

@Override
public ReturnT<String> execute(String param) throws Exception {
XxlJobLogger.log("qingshan job, Hello World.");
return ReturnT.SUCCESS;
}

}
', glueUpdatetime=1601359733000, broadcastIndex=0, broadcastTotal=1}

回顧一下,這里說的是第二種情況:

分布式定時(shí)任務(wù)調(diào)度xxl-job,學(xué)習(xí)筆記,分布式

一共四步,第一步結(jié)束了:

1、觸發(fā)任務(wù)

2、更新下次觸發(fā)時(shí)間

3、丟入時(shí)間輪

4、觸發(fā)完了再把時(shí)間更新為下次更新時(shí)間

第二步和第四步非常簡單,都是操作數(shù)據(jù)庫。

第三步,丟入時(shí)間輪,什么是時(shí)間輪?為什么要丟入時(shí)間輪?

1.1.6. 時(shí)間輪

要回答這個(gè)問題,先從Java中最原始的任務(wù)調(diào)度的方法說起。

給你一批任務(wù)(假設(shè)有1000個(gè)任務(wù)),都是不同的時(shí)間執(zhí)行的,時(shí)間精確到秒,你怎么實(shí)現(xiàn)對所有的任務(wù)的調(diào)度?

第一種思路是啟動(dòng)一個(gè)線程,每秒鐘對所有的任務(wù)進(jìn)行遍歷,找出執(zhí)行時(shí)間跟當(dāng)前時(shí)間匹配的,執(zhí)行它。如果任務(wù)數(shù)量太大,遍歷和比較所有任務(wù)會(huì)比較浪費(fèi)時(shí)間。

分布式定時(shí)任務(wù)調(diào)度xxl-job,學(xué)習(xí)筆記,分布式

第二個(gè)思路,把這些任務(wù)進(jìn)行排序,執(zhí)行時(shí)間近(先觸發(fā))的放在前面。

分布式定時(shí)任務(wù)調(diào)度xxl-job,學(xué)習(xí)筆記,分布式

用Java代碼怎么實(shí)現(xiàn)呢?

JDK包里面自帶了一個(gè)Timer工具類(java.util包下),可以實(shí)現(xiàn)延時(shí)任務(wù)(例如30分鐘以后觸發(fā)),也可以實(shí)現(xiàn)周期性任務(wù)(例如每1小時(shí)觸發(fā)一次)。

它的本質(zhì)是一個(gè)優(yōu)先隊(duì)列(TaskQueue),和一個(gè)執(zhí)行任務(wù)的線程(TimerThread)。

public class Timer {
	private final TaskQueue queue = new TaskQueue();
	private final TimerThread thread = new TimerThread(queue);
	public Timer(String name, boolean isDaemon) {
		thread.setName(name);
		thread.setDaemon(isDaemon);
		thread.start();
	}
}

在這個(gè)優(yōu)先隊(duì)列中,最先需要執(zhí)行的任務(wù)排在優(yōu)先隊(duì)列的第一個(gè)。然后 TimerThread 不斷地拿第一個(gè)任務(wù)的執(zhí)行時(shí)間和當(dāng)前時(shí)間做對比。如果時(shí)間到了先看看這個(gè)任務(wù)是不是周期性執(zhí)行的任務(wù),如果是則修改當(dāng)前任務(wù)時(shí)間為下次執(zhí)行的時(shí)間,如果不是周期性任務(wù)則將任務(wù)從優(yōu)先隊(duì)列中移除。最后執(zhí)行任務(wù)。

但是Timer是單線程的,在很多場景下不能滿足業(yè)務(wù)需求。

在JDK1.5之后,引入了一個(gè)支持多線程的任務(wù)調(diào)度工具ScheduledThreadPoolExecutor用來替代TImer,它是幾種常用的線程池之一??纯礃?gòu)造函數(shù),里面是一個(gè)延遲隊(duì)列DelayedWorkQueue,也是一個(gè)優(yōu)先隊(duì)列。

public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}

優(yōu)先隊(duì)列的插入和刪除的時(shí)間復(fù)雜度是O(logn),當(dāng)數(shù)據(jù)量大的時(shí)候,頻繁的入堆出堆性能不是很好。

這里先考慮對所有的任務(wù)進(jìn)行分組,把相同執(zhí)行時(shí)刻的任務(wù)放在一起。比如這里,數(shù)組里面的一個(gè)下標(biāo)就代表1秒鐘。它就會(huì)變成一個(gè)數(shù)組加鏈表的數(shù)據(jù)結(jié)構(gòu)。分組以后遍歷和比較的時(shí)間會(huì)減少一些。

分布式定時(shí)任務(wù)調(diào)度xxl-job,學(xué)習(xí)筆記,分布式

但是還是有問題,如果任務(wù)數(shù)量非常大,而且時(shí)間都不一樣,或者有執(zhí)行時(shí)間非常遙遠(yuǎn)的任務(wù),那這個(gè)數(shù)組長度是不是要非常地長?比如有個(gè)任務(wù)2個(gè)月之后執(zhí)行,從現(xiàn)在開始計(jì)算,它的下標(biāo)是5253120。

所以長度肯定不能是無限的,只能是固定長度的。比如固定長度是60,一個(gè)格子代表1秒(現(xiàn)在叫做一個(gè)bucket槽),一圈可以表示60秒。遍歷的線程只要一個(gè)格子一個(gè)格子的獲取任務(wù),并且執(zhí)行就OK了。

固定長度的數(shù)組怎么用來表示超出最大長度的時(shí)間呢?可以用循環(huán)數(shù)組。

比如一個(gè)循環(huán)數(shù)組長度60,可以表示60秒。60秒以后執(zhí)行的任務(wù)怎么放進(jìn)去?只要除以60,用得到的余數(shù),放到對應(yīng)的格子就OK了。比如90%60=30,它放在第30個(gè)格子。這里就有了輪次的概念,第90秒的任務(wù)是第二輪的時(shí)候才執(zhí)行。

分布式定時(shí)任務(wù)調(diào)度xxl-job,學(xué)習(xí)筆記,分布式

這時(shí)候,時(shí)間輪的概念已經(jīng)出來了。

如果任務(wù)數(shù)量太多,相同時(shí)刻執(zhí)行的任務(wù)很多,會(huì)導(dǎo)致鏈表變得非常長。這里可以進(jìn)一步對這個(gè)時(shí)間輪做一個(gè)改造,做一個(gè)多層的時(shí)間輪。

比如:最內(nèi)層60個(gè)格子,每個(gè)格子1秒;外層60個(gè)格子,每個(gè)格子1分;再外層24個(gè)格子,每個(gè)格子1小時(shí)。最內(nèi)層走一圈,外層走一格。這時(shí)候時(shí)間輪就跟時(shí)鐘更像了。隨著時(shí)間流動(dòng),任務(wù)會(huì)降級,外層的任務(wù)會(huì)慢慢地向內(nèi)層移動(dòng)。

時(shí)間輪任務(wù)插入和刪除時(shí)間復(fù)雜度都為O(1),應(yīng)用范圍非常廣泛,更適合任務(wù)數(shù)很大的延時(shí)場景。Dubbo、Netty、Kafka中都有實(shí)現(xiàn)。

xxl-job中的時(shí)間輪是怎么實(shí)現(xiàn)的?回到JobScheduleHelper的start方法:

放入時(shí)間輪有這么兩步:
分布式定時(shí)任務(wù)調(diào)度xxl-job,學(xué)習(xí)筆記,分布式

int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
pushTimeRing(ringSecond, jobInfo.getId());

ringSecond是0-59的秒數(shù)值(millionSeconds是毫秒數(shù))。

把它想象成一個(gè)表盤的秒針指數(shù)。放入時(shí)間輪的這一段代碼:

// push async ring
List<Integer> ringItemData = ringData.get(ringSecond);
	if (ringItemData == null) {
		ringItemData = new ArrayList<Integer>();
		ringData.put(ringSecond, ringItemData);
	}
ringItemData.add(jobId);

這個(gè)ringData是一個(gè)ConcurrentHashMap,key是Integer,放的是ringSecond(0-59)。Value是List<Integer>,里面放的是jobId。

到這里為止,JobScheduleHelper的start方法的前一半就分析完了。接下來是ringThread線程,看看時(shí)間輪的任務(wù)是怎么拿出來執(zhí)行的。

1.1.7. 時(shí)間輪線程ringThread

分布式定時(shí)任務(wù)調(diào)度xxl-job,學(xué)習(xí)筆記,分布式

在初始化的時(shí)候先對齊秒數(shù):休眠當(dāng)前秒數(shù)模以1000的余數(shù),意思是下一個(gè)正秒運(yùn)行。

TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis()%1000 );

然后進(jìn)入一個(gè)while循環(huán)。獲取當(dāng)前秒數(shù):

// 避免處理耗時(shí)太長,跨過刻度,向前校驗(yàn)一個(gè)刻度;

int nowSecond = Calendar.getInstance().get(Calendar.SECOND);

注釋:根據(jù)當(dāng)前秒數(shù)刻度和前一個(gè)刻度進(jìn)行時(shí)間輪的任務(wù)獲取

for (int i = 0; i < 2; i++) {
	List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
	if (tmpData != null) {
		ringItemData.addAll(tmpData);
	}
}

(nowSecond+60-k)%60跟nowSecond-k的結(jié)果一模一樣,也就是當(dāng)前秒數(shù),和前一秒。比如當(dāng)前秒數(shù)是40,就獲取40和39的任務(wù)。從ringData里面拿出來,放進(jìn)ringItemData,這里面存的是這兩秒需要觸發(fā)的所有任務(wù)的jobId。

接下來就是觸發(fā)任務(wù)了。

JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);

又調(diào)用了JobTriggerPoolHelper的addTrigger。

在XxlJobTrigger的trigger方法中,調(diào)用了processTrigger,又調(diào)用了runExecutor

runResult = executorBiz.run(triggerParam);

這里實(shí)現(xiàn)類是ExecutorBizClient,發(fā)起了一個(gè)HTTP的請求。

public ReturnT<String> run(TriggerParam triggerParam) {
return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class);
}

最終的URL地址是執(zhí)行器的9999端口:http://192.168.44.1:9999/run

跟上面一樣。也就是說,放進(jìn)時(shí)間輪等待觸發(fā)的任務(wù),也會(huì)通過遠(yuǎn)程請求,讓執(zhí)行器執(zhí)行任務(wù)。

1.1.8. 執(zhí)行器處理遠(yuǎn)程調(diào)用,回調(diào)

在業(yè)務(wù)實(shí)例這邊,執(zhí)行器啟動(dòng)9999端口監(jiān)聽的時(shí)候,在EmbedHttpServerHandler的channelRead0方法中,會(huì)創(chuàng)建線程池bizThreadPool,process方法處理URI的訪問。

if ("/run".equals(uri)) {
TriggerParam triggerParam = GsonTool.fromJson(requestData, TriggerParam.class);
return executorBiz.run(triggerParam);
}

這個(gè)時(shí)候調(diào)用的是core包的ExecutorBizImpl的run方法。

第一步,先拿到任務(wù)的JobThread(表示有沒有線程正在執(zhí)行這個(gè)JobId的任務(wù)):

JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;

如果有線程,再拿到j(luò)obHandler。什么是jobHandler?

在SpringBoot的工程里面,jobHandler就是加了@XxlJob注解的任務(wù)方法(一個(gè)任務(wù)一個(gè)方法)。其他的四個(gè)框架中的使用,需要自己編寫Handler(任務(wù)類)繼承IJobHandler。這個(gè)IjobHandler接口意思跟Quartz里面的Job接口是一樣的,這里面必須要覆蓋父類的execute方法。

中間是對于jobThread和jobHandler的判斷。對于bean、GROOVY、其他腳本類型的任務(wù),處理不一樣?;驹瓌t就是必須要有一個(gè)Handler,而且跟之前的Handler必須相同。

如果當(dāng)前任務(wù)正在運(yùn)行(根據(jù)JobId能夠找到JobThread),需要根據(jù)配置的策略采取不同的措施,比如:

1、DISCARD_LATER(丟棄后續(xù)調(diào)度):如果當(dāng)前線程阻塞,后續(xù)任務(wù)不再執(zhí)行,直接返回失敗(阻塞就不再執(zhí)行了)。

if (jobThread.isRunningOrHasQueue()) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
}

2、COVER_EARLY(覆蓋之前調(diào)度):創(chuàng)建一個(gè)移除原因,新建一個(gè)線程去執(zhí)行后續(xù)任務(wù)(殺掉當(dāng)前線程)。

if (jobThread.isRunningOrHasQueue()) {
removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();

jobThread = null;
}

3、SERIAL_EXECUTION(單機(jī)串行,默認(rèn)):對當(dāng)前線程不做任何處理,并在當(dāng)前線程的隊(duì)列里增加一個(gè)執(zhí)行任務(wù)(一次只執(zhí)行一個(gè)任務(wù))。

最后,調(diào)用JobThread的pushTriggerQueue方法把Trigger放入隊(duì)列。

ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);

這個(gè)隊(duì)列是什么?TriggerQueue是什么?LinkedBlockingQueue。

執(zhí)行器中單個(gè)任務(wù)處理線程一次只能執(zhí)行一個(gè)任務(wù)。

JobThread在創(chuàng)建的時(shí)候就會(huì)啟動(dòng),啟動(dòng)就會(huì)進(jìn)入run方法的死循環(huán),不斷地從隊(duì)列里面拿任務(wù):

triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS);

最終調(diào)用到Handler(任務(wù)類)的execute方法:

return handler.execute(triggerParamTmp.getExecutorParams());

在finally方法中調(diào)用了回調(diào)方法,告知調(diào)度器執(zhí)行結(jié)果:

if(triggerParam != null) {
// callback handler info
if (!toStop) {
// commonm
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), executeResult));
} else {
// is killed
ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [job running, killed]");
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogDateTime(), stopResult));
}
}

放入一個(gè)隊(duì)列。在TriggerCallbackThread的后臺線程的run方法里面,調(diào)用doCallback方法,連接到調(diào)度器,寫入調(diào)度結(jié)果。

分布式定時(shí)任務(wù)調(diào)度xxl-job,學(xué)習(xí)筆記,分布式
分布式定時(shí)任務(wù)調(diào)度xxl-job,學(xué)習(xí)筆記,分布式

總結(jié)一下整體調(diào)度流程:

1、調(diào)度中心獲取任務(wù)鎖,查詢?nèi)蝿?wù),根據(jù)情況觸發(fā)或者放入時(shí)間輪

2、觸發(fā)任務(wù)需要先獲取路由地址,然后調(diào)用執(zhí)行器接口

3、執(zhí)行器接收到調(diào)用請求,通過JobThread執(zhí)行任務(wù),并且回調(diào)(callback)調(diào)度器的接口

1.3. 任務(wù)分片原理

XxlJobTrigger的trigger方法:

int[] shardingParam = null;
if (executorShardingParam!=null){
String[] shardingArr = executorShardingParam.split("/");
if (shardingArr.length==2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1])) {
shardingParam = new int[2];
shardingParam[0] = Integer.valueOf(shardingArr[0]);
shardingParam[1] = Integer.valueOf(shardingArr[1]);
}
}
if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)
&& group.getRegistryList()!=null && !group.getRegistryList().isEmpty()
&& shardingParam==null) {
for (int i = 0; i < group.getRegistryList().size(); i++) {
processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());
}
}

然后拿到分片參數(shù):sharding param。這個(gè)sharding param大家還記得是什么么?

最好是設(shè)計(jì)一個(gè)跟業(yè)務(wù)無關(guān)的分片字段,加上索引用它來獲取數(shù)據(jù)的分片信息。

用/來分割。

如果只有兩個(gè),并且都是數(shù)字,把他們轉(zhuǎn)換為整形。

如果是廣播任務(wù),則在所有節(jié)點(diǎn)上processTrigger。

1.4. 手動(dòng)觸發(fā)一次任務(wù)

com.xxl.job.admin.controller.JobInfoController

@RequestMapping("/trigger")
@ResponseBody
//@PermissionLimit(limit = false)
public ReturnT<String> triggerJob(int id, String executorParam, String addressList) {
// force cover job param
if (executorParam == null) {
executorParam = "";
}

JobTriggerPoolHelper.trigger(id, TriggerTypeEnum.MANUAL, -1, null, executorParam, addressList);
return ReturnT.SUCCESS;
}
之后就跟調(diào)度器自動(dòng)觸發(fā)任務(wù)的流程一樣了。

1.5. 任務(wù)H A & Failover

https://blog.csdn.net/Royal_lr/article/details/100113760

這里要解決兩個(gè)關(guān)鍵的問題。第一個(gè),調(diào)度器怎么知道任務(wù)執(zhí)行失敗了。第二個(gè),執(zhí)行失敗以后,怎么處理。

XxlJobTrigger的processTrigger方法:

routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());

分布式定時(shí)任務(wù)調(diào)度xxl-job,學(xué)習(xí)筆記,分布式

4. xxl-job二次開發(fā)

https://blog.csdn.net/m0_37527542/article/details/104468785文章來源地址http://www.zghlxwxcb.cn/news/detail-838169.html

到了這里,關(guān)于分布式定時(shí)任務(wù)調(diào)度xxl-job的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • xxl-Job分布式任務(wù)調(diào)度 入門

    xxl-Job分布式任務(wù)調(diào)度 入門

    我們可以先思考一下業(yè)務(wù)場景的解決方案: 某電商系統(tǒng)需要在每天上午10點(diǎn),下午3點(diǎn),晚上8點(diǎn)發(fā)放一批優(yōu)惠券。 某銀行系統(tǒng)需要在信用卡到期還款日的前三天進(jìn)行短信提醒。 某財(cái)務(wù)系統(tǒng)需要在每天凌晨0:10結(jié)算前一天的財(cái)務(wù)數(shù)據(jù),統(tǒng)計(jì)匯總。 12306會(huì)根據(jù)車次的不同,設(shè)置某

    2024年02月03日
    瀏覽(22)
  • 【分布式任務(wù)調(diào)度】XXL-JOB的任務(wù)調(diào)度實(shí)現(xiàn)原理(四)

    【分布式任務(wù)調(diào)度】XXL-JOB的任務(wù)調(diào)度實(shí)現(xiàn)原理(四)

    XXL-JOB專題歷史文章列表: XXL-JOB調(diào)度中心集群部署配置(一) XXL-JOB執(zhí)行器配置及定時(shí)任務(wù)的創(chuàng)建(二) XXL-JOB調(diào)度中心對執(zhí)行器的上下線感知實(shí)現(xiàn)原理(三) 本篇的主要內(nèi)容是XXL-JOB的任務(wù)調(diào)度流程及其實(shí)現(xiàn)原理,包含了兩個(gè)部分: 調(diào)度中心如何進(jìn)行任務(wù)調(diào)度 執(zhí)行器執(zhí)行任

    2024年02月16日
    瀏覽(27)
  • Java -- XXL-JOB分布式任務(wù)調(diào)度平臺

    Java -- XXL-JOB分布式任務(wù)調(diào)度平臺

    XXL-JOB是一個(gè)分布式任務(wù)調(diào)度平臺,其核心設(shè)計(jì)目標(biāo)是開發(fā)迅速、學(xué)習(xí)簡單、輕量級、易擴(kuò)展?,F(xiàn)已開放源代碼并接入多家公司線上產(chǎn)品線,開箱即用 xxl是xxl-job的開發(fā)者大眾點(diǎn)評的【許雪里】名稱的拼音開頭 官網(wǎng)地址 分布式任務(wù)調(diào)度平臺XXL-JOB 文檔地址 中文文檔 English Docu

    2024年02月11日
    瀏覽(21)
  • 【分布式任務(wù)調(diào)度】(一)XXL-JOB調(diào)度中心集群部署配置

    【分布式任務(wù)調(diào)度】(一)XXL-JOB調(diào)度中心集群部署配置

    XXL-JOB是一款輕量級的分布式任務(wù)調(diào)度中間件,默認(rèn)支持6000個(gè)定時(shí)任務(wù),如果生產(chǎn)環(huán)境的任務(wù)數(shù)量在這個(gè)范圍內(nèi),可以選擇使用 XXL-JOB。 XXL-JOB由Quartz這款老牌的任務(wù)調(diào)度中間件演化而來,相對來說,具備以下優(yōu)勢: 操作更簡單,學(xué)習(xí)成本更低 使用異步化調(diào)度,性能更好 有配

    2024年02月16日
    瀏覽(24)
  • XXL-JOB中間件【實(shí)現(xiàn)分布式任務(wù)調(diào)度】

    XXL-JOB中間件【實(shí)現(xiàn)分布式任務(wù)調(diào)度】

    目錄 1:XXL-JOB介紹 2:搭建XXL-JOB 2.1:調(diào)度中心 2.2:執(zhí)行器 2.3:執(zhí)行任務(wù) 3:分片廣播 XXL-JOB是一個(gè)輕量級分布式任務(wù)調(diào)度平臺,其核心設(shè)計(jì)目標(biāo)是開發(fā)迅速、學(xué)習(xí)簡單、輕量級、易擴(kuò)展?,F(xiàn)已開放源代碼并接入多家公司線上產(chǎn)品線,開箱即用。 官網(wǎng):https://www.xuxueli.com/xxl-

    2024年02月03日
    瀏覽(20)
  • spring boot + xxl-job 分布式任務(wù)調(diào)度

    spring boot + xxl-job 分布式任務(wù)調(diào)度

    1、任務(wù)調(diào)度 1.1、什么是任務(wù)調(diào)度 我們可以先思考一下下面業(yè)務(wù)場景的解決方案: 某電商系統(tǒng)需要在每天上午10點(diǎn),下午3點(diǎn),晚上8點(diǎn)發(fā)放一批優(yōu)惠券。 某財(cái)務(wù)系統(tǒng)需要在每天上午10點(diǎn)前結(jié)算前一天的賬單數(shù)據(jù),統(tǒng)計(jì)匯總。 某電商平臺每天凌晨3點(diǎn),要對訂單中的無效訂單進(jìn)行

    2024年02月09日
    瀏覽(29)
  • 分布式定時(shí)任務(wù)-XXL-JOB-教程+實(shí)戰(zhàn)

    分布式定時(shí)任務(wù)-XXL-JOB-教程+實(shí)戰(zhàn)

    1.定時(shí)任務(wù)認(rèn)識 1.1.什么是定時(shí)任務(wù) 定時(shí)任務(wù)是按照指定時(shí)間周期運(yùn)行任務(wù)。使用場景為在某個(gè)固定時(shí)間點(diǎn)執(zhí)行,或者周期性的去執(zhí)行某個(gè)任務(wù),比如:每天晚上24點(diǎn)做數(shù)據(jù)匯總,定時(shí)發(fā)送短信等。 1.2.常見定時(shí)任務(wù)方案 While + Sleep : 通過循環(huán)加休眠的方式定時(shí)執(zhí)行 Timer和Time

    2024年02月16日
    瀏覽(31)
  • 初識輕量級分布式任務(wù)調(diào)度平臺 xxl-job

    初識輕量級分布式任務(wù)調(diào)度平臺 xxl-job

    大家好,這里是 Rocky 編程日記 ,喜歡后端架構(gòu)及中間件源碼,目前正在閱讀 xxl-job 源碼。同時(shí)也把自己學(xué)習(xí)該 xxl-job 筆記,代碼分享出來,供大家學(xué)習(xí)交流,如若筆記中有不對的地方,那一定是當(dāng)時(shí)我的理解還不夠,希望你能及時(shí)提出。 如果對于該筆記存在很多疑惑,歡迎

    2024年02月10日
    瀏覽(2161)
  • 基于docker的分布式任務(wù)調(diào)度系統(tǒng)xxl-job搭建

    基于docker的分布式任務(wù)調(diào)度系統(tǒng)xxl-job搭建

    本文所使用的操作系統(tǒng)為: CentOS-7-x86_64-DVD-2009 xxl-job 依賴 mysql,所以必須要安裝mysql才行! 訪問以下鏈接:https://hub.docker.com/_/mysql/ 尋找自己需要的MySQL版本拉取即可 1.下載鏡像 這里未指定版本號,默認(rèn)拉取的是最新MySQL鏡像 2.導(dǎo)入zip包 下載xxljob項(xiàng)目,查看releases版本 https:

    2024年02月20日
    瀏覽(21)
  • 分布式任務(wù)調(diào)度平臺XXL-JOB學(xué)習(xí)筆記-helloworld運(yùn)行

    分布式任務(wù)調(diào)度平臺XXL-JOB學(xué)習(xí)筆記-helloworld運(yùn)行

    環(huán)境:win10 eclipse java17 mysql8.0.17 xxl-job 2.4 源碼:https://github.com/xuxueli/xxl-job/ 導(dǎo)入時(shí)按Existing Maven Projects導(dǎo)入,先導(dǎo)入xxl-job-admin(管理平臺)和xxl-job-executor-sample-springboot(通過springboot管理的執(zhí)行器實(shí)例)。 如果導(dǎo)入時(shí)速度非常慢,或者報(bào)錯(cuò)如 Plugin ‘org.apache.maven.plugins:maven-

    2024年02月13日
    瀏覽(24)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包