国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

初識輕量級分布式任務調度平臺 xxl-job

這篇具有很好參考價值的文章主要介紹了初識輕量級分布式任務調度平臺 xxl-job。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

前言

大家好,這里是 Rocky 編程日記 ,喜歡后端架構及中間件源碼,目前正在閱讀 xxl-job 源碼。同時也把自己學習該 xxl-job筆記,代碼分享出來,供大家學習交流,如若筆記中有不對的地方,那一定是當時我的理解還不夠,希望你能及時提出。

如果對于該筆記存在很多疑惑,歡迎和我交流討論
最后也感謝您的閱讀,點贊,關注,收藏~

前人述備矣,我只是知識的搬運工

xxl-job 源碼均在個人的開源項目中, 源代碼倉庫地址: https://gitee.com/Rocky-BCRJ/xxl-job.git

初識輕量級分布式任務調度平臺 xxl-job

官方文檔: https://www.xuxueli.com/xxl-job/

xxl-job的目錄結構

  • xxl-job-admin : 是后臺管理頁面
  • xxl-job-core : 項目的核心包
  • xxl-job-executor-sample (項目使用示例)
    • xxl-job-executor-sample-frameless : 不使用框架的接入方式案例
    • xxl-job-executor-sample-springboot : springboot接入方案案例
  • doc : 項目文檔和sql

項目依賴 (父 pom.xml)

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<groupId>com.xuxueli</groupId>
	<artifactId>xxl-job</artifactId>
	<version>2.4.0-SNAPSHOT</version>
	<packaging>pom</packaging>

	<name>${project.artifactId}</name>
	<description>A distributed task scheduling framework.</description>
	<url>https://www.xuxueli.com/</url>

	<modules>
		<module>xxl-job-core</module>
		<module>xxl-job-admin</module>
		<module>xxl-job-executor-samples</module>
    </modules>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
		<maven.compiler.encoding>UTF-8</maven.compiler.encoding>
		<maven.compiler.source>1.8</maven.compiler.source>
		<maven.compiler.target>1.8</maven.compiler.target>
		<maven.test.skip>true</maven.test.skip>

		<netty-all.version>4.1.63.Final</netty-all.version>
		<gson.version>2.9.0</gson.version>

		<spring.version>5.3.20</spring.version>
		<spring-boot.version>2.6.7</spring-boot.version>

		<mybatis-spring-boot-starter.version>2.2.2</mybatis-spring-boot-starter.version>
		<mysql-connector-java.version>8.0.29</mysql-connector-java.version>

		<slf4j-api.version>1.7.36</slf4j-api.version>
		<junit-jupiter.version>5.8.2</junit-jupiter.version>
		<javax.annotation-api.version>1.3.2</javax.annotation-api.version>

		<groovy.version>3.0.10</groovy.version>

		<maven-source-plugin.version>3.2.1</maven-source-plugin.version>
		<maven-javadoc-plugin.version>3.4.0</maven-javadoc-plugin.version>
		<maven-gpg-plugin.version>3.0.1</maven-gpg-plugin.version>
	</properties>
	
</project>	

xxl-job-admin 啟動

  • 從 xxl-job 文件 doc 目錄下 執(zhí)行項目中的 SQL, 生成庫表操作
  • 更改該模塊下數(shù)據(jù)庫鏈接,修改日志文件路徑,打包,啟動項目
  • 瀏覽器輸入 http://localhost:8080/xxl-job-admin/
  • 訪問之后登錄,賬號 : admin 密碼: 123456

xxl-job-executor-sample (項目使用示例)

xxl-job-executor-sample-frameless : 不使用框架的接入方式案例

  • 項目只依賴了 xxl-job-core

        <dependencies>
    
            <!-- slf4j -->
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-log4j12</artifactId>
                <version>${slf4j-api.version}</version>
            </dependency>
            <!-- junit -->
            <dependency>
                <groupId>org.junit.jupiter</groupId>
                <artifactId>junit-jupiter-engine</artifactId>
                <version>${junit-jupiter.version}</version>
                <scope>test</scope>
            </dependency>
    
            <!-- xxl-job-core -->
            <dependency>
                <groupId>com.xuxueli</groupId>
                <artifactId>xxl-job-core</artifactId>
                <version>${project.parent.version}</version>
            </dependency>
    
        </dependencies>
    
  • 關于 xxl-job 的核心配置文件

    ### xxl-job admin address list, such as "http://address" or "http://address01,http://address02"
    xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin
    
    ### xxl-job, access token
    xxl.job.accessToken=default_token
    
    ### xxl-job executor appname
    xxl.job.executor.appname=xxl-job-executor-sample
    ### xxl-job executor registry-address: default use address to registry , otherwise use ip:port if address is null
    xxl.job.executor.address=
    ### xxl-job executor server-info
    xxl.job.executor.ip=
    xxl.job.executor.port=9998
    ### xxl-job executor log-path
    xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler
    ### xxl-job executor log-retention-days
    xxl.job.executor.logretentiondays=30
    
    

xxl-job-executor-sample-springboot : springboot接入方案案例

xxl-job執(zhí)行器器啟動流程分析

在項目代碼 FramelessApplication 類的 main 中

FrameLessXxlJobConfig.getInstance().initXxlJobExecutor();

上述代碼即為啟動任務執(zhí)行器的代碼。進入到 FrameLessXxlJobConfig#initXxlJobExecutor()方法中

    /**
     * init
     * 初始化 XxlJobSimpleExecutor 執(zhí)行器
     */
    public void initXxlJobExecutor() {

        // load executor prop 
        // 從配置文件(xxl-job-executor.properties)中加載配置 放到 Properties
        Properties xxlJobProp = loadProperties("xxl-job-executor.properties");

        // init executor 
        // 創(chuàng)建普通的任務執(zhí)行器
        xxlJobExecutor = new XxlJobSimpleExecutor();
        xxlJobExecutor.setAdminAddresses(xxlJobProp.getProperty("xxl.job.admin.addresses"));
        xxlJobExecutor.setAccessToken(xxlJobProp.getProperty("xxl.job.accessToken"));
        xxlJobExecutor.setAppname(xxlJobProp.getProperty("xxl.job.executor.appname"));
        xxlJobExecutor.setAddress(xxlJobProp.getProperty("xxl.job.executor.address"));
        xxlJobExecutor.setIp(xxlJobProp.getProperty("xxl.job.executor.ip"));
        xxlJobExecutor.setPort(Integer.valueOf(xxlJobProp.getProperty("xxl.job.executor.port")));
        xxlJobExecutor.setLogPath(xxlJobProp.getProperty("xxl.job.executor.logpath"));
        xxlJobExecutor.setLogRetentionDays(Integer.valueOf(xxlJobProp.getProperty("xxl.job.executor.logretentiondays")));

        // registry job bean
        // 注冊定時任務的bean, 將 SampleXxlJob 加入到定時任務里去
        xxlJobExecutor.setXxlJobBeanList(Arrays.asList(new SampleXxlJob()));

        // start executor
        try {
            // 啟動執(zhí)行器
            xxlJobExecutor.start();
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
    }

SampleXxlJob 類中有一系列由 @XxlJob注解修飾的方法。這些 @XxlJob注解修飾的方法就是定時任務。我們再來看一下執(zhí)行器的啟動start方法:

    @Override
    public void start() {

        // init JobHandler Repository (for method)
        // 初始化任務處理器
        initJobHandlerMethodRepository(xxlJobBeanList);

        // super start
        try {
            // 調用父類的 start
            super.start();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

調用父類的 start方法啟動執(zhí)行器,父類的start方法如下:

 /**
     * 開始
     * <p>
     *     1.初始化日志路徑
     *     2.初始化admin的客戶端
     *     3.初始化日志清理線程
     *     4.初始化回調線程池
     *     5.初始化執(zhí)行器服務
     * </p>
     * @throws Exception 異常
     */
    public void start() throws Exception {

        // init logpath
        // 初始化日志路徑
        XxlJobFileAppender.initLogPath(logPath);

        // init invoker, admin-client
        // 初始化admin的客戶端
        initAdminBizList(adminAddresses, accessToken);

        // init JobLogFileCleanThread
        // 初始化日志清理線程
        JobLogFileCleanThread.getInstance().start(logRetentionDays);

        // init TriggerCallbackThread
        // 初始化回調線程池
        TriggerCallbackThread.getInstance().start();

        // init executor-server
        // 初始化執(zhí)行器服務
        initEmbedServer(address, ip, port, appname, accessToken);
    }

初始化日志路徑

/**
	 * 初始化日志路徑
	 * <p>
	 *     首先創(chuàng)建了保存日志的文件目錄
	 *     然后在創(chuàng)建保存腳本的文件目錄
	 * </p>
	 * @param logPath 日志路徑
	 */
	public static void initLogPath(String logPath){
		// init
		if (logPath!=null && logPath.trim().length()>0) {
			logBasePath = logPath;
		}
		// mk base dir
		// 創(chuàng)建父類目錄
		File logPathDir = new File(logBasePath);
		if (!logPathDir.exists()) {
			logPathDir.mkdirs();
		}
		logBasePath = logPathDir.getPath();

		// mk glue dir
		// 創(chuàng)建腳本代碼目錄
		File glueBaseDir = new File(logPathDir, "gluesource");
		if (!glueBaseDir.exists()) {
			glueBaseDir.mkdirs();
		}
		glueSrcPath = glueBaseDir.getPath();
	}

初始化admin的客戶端

/**
     * 初始化admin的客戶端
     *
     * @param adminAddresses 管理地址
     * @param accessToken    訪問令牌
     * @throws Exception 異常
     */
    private void initAdminBizList(String adminAddresses, String accessToken) throws Exception {
        // 遍歷 調度器 的地址
        if (adminAddresses!=null && adminAddresses.trim().length()>0) {
            // 以逗號分隔
            for (String address: adminAddresses.trim().split(",")) {
                if (address!=null && address.trim().length()>0) {
                    // 初始化admin客戶端
                    AdminBiz adminBiz = new AdminBizClient(address.trim(), accessToken);

                    if (adminBizList == null) {
                        adminBizList = new ArrayList<AdminBiz>();
                    }
                    // 保存到list中
                    adminBizList.add(adminBiz);
                }
            }
        }
    }

初始化日志清理線程

啟動一個線程localThread,用來清理過期的日志文件。localThread的run方法一直執(zhí)行,首先獲取所有的日志文件目錄,日志文件形式如logPath/yyyy-MM-dd/9999.log,獲取logPath/yyyy-MM-dd/目錄下的所有日志文件,然后判斷日志文件是否已經過期,過期時間是配置的,如果當前時間減去日志文件創(chuàng)建時間(yyyy-MM-dd)大于配置的日志清理天數(shù),說明日志文件已經過期,一般配置只保存30天的日志,30天以前的日志都刪除掉。

初始化執(zhí)行器服務

啟動了一個netty服務器,用于執(zhí)行器接收admin的http請求。主要接收admin發(fā)送的空閑檢測請求、運行定時任務的請求、停止運行定時任務的請求、獲取日志的請求。最后a還向dmin注冊了執(zhí)行器,注冊執(zhí)行器是調用AdminBizClient的registry方法注冊的,AdminBizClient的registry方法通過http將注冊請求轉發(fā)給admin服務的AdminBizImpl類的registry方法,AdminBizImpl類的registry方法將注冊請求保存在數(shù)據(jù)庫中。

執(zhí)行器服務接收admin服務的請求, 交給ExecutorBiz接口處理,ExecutorBiz接口有五個方法,分別是beat(心跳檢測)、idleBeat(空閑檢測)、run(運行定時任務)、kill(停止運行任務)、log(獲取日志)。ExecutorBiz接口有兩個實現(xiàn):ExecutorBizClient和ExecutorBizImpl,ExecutorBizClient是執(zhí)行器客戶端,ExecutorBizImpl執(zhí)行器服務端。admin服務通過ExecutorBizClient類的方法通過http將請求轉發(fā)給執(zhí)行器服務的ExecutorBizImpl對應的方法。

調度中心啟動流程分析

創(chuàng)建調度器以及對調度器進行初始化

    /**
     * 創(chuàng)建調度器以及對調度器進行初始化
     * @throws Exception 異常
     */
    @Override
    public void afterPropertiesSet() throws Exception {
        adminConfig = this;
        // 新建調度器
        xxlJobScheduler = new XxlJobScheduler();
        // 調度器初始化
        xxlJobScheduler.init();
    }

調度器初始化

 /**
     * 調度器初始化
     * <p>
     *     1.國際化初始化
     *     2.觸發(fā)器線程池創(chuàng)建
     *     3.注冊監(jiān)控器啟動
     *     4.失敗監(jiān)控器啟動
     *     5.丟失監(jiān)控器啟動
     *     6.日志任務啟動
     *     7.調度啟動
     * </p>
     *
     * @throws Exception 異常
     */
    public void init() throws Exception {
        // init i18n 初始化國際化
        initI18n();

        // admin trigger pool start 觸發(fā)器線程池創(chuàng)建
        JobTriggerPoolHelper.toStart();

        // admin registry monitor run 注冊監(jiān)控器啟動
        JobRegistryHelper.getInstance().start();

        // admin fail-monitor run 失敗監(jiān)控器啟動
        JobFailMonitorHelper.getInstance().start();

        // admin lose-monitor run ( depend on JobTriggerPoolHelper )
        // 丟失監(jiān)控器啟動
        JobCompleteHelper.getInstance().start();

        // admin log report start
        // 日志報告啟動
        JobLogReportHelper.getInstance().start();

        // start-schedule  ( depend on JobTriggerPoolHelper )
        // 調度啟動
        JobScheduleHelper.getInstance().start();

        logger.info(">>>>>>>>> init xxl-job admin success.");
    }

國際化初始化

    /**
     * init i18n
     * 國際化初始化
     */
    private void initI18n(){
        for (ExecutorBlockStrategyEnum item:ExecutorBlockStrategyEnum.values()) {
            item.setTitle(I18nUtil.getString("jobconf_block_".concat(item.name())));
        }
    }

ExecutorBlockStrategyEnum是執(zhí)行阻塞策略枚舉,主要有單機串行、丟棄后續(xù)調度、覆蓋之前調度三種策略,initI18n方法就是設置執(zhí)行策略的title值。

I18nUtil.getString方法就是根據(jù)配置讀取resources/il8n/目錄下的其中一個文件,該目錄下有message_en.properties、message_zh_CN.properties、message_zh_TC.properties三個文件,分別為英語、中文簡體、中文繁體是屬性文件。

I18nUtil.getString方法獲取到執(zhí)行阻塞策略的值賦值給title.

觸發(fā)器線程池創(chuàng)建

public static void toStart() {
        helper.start();
    }

    public void start(){
        // 快速觸發(fā)線程
        fastTriggerPool = new ThreadPoolExecutor(
                10,
                XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax(),
                60L,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(1000),
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode());
                    }
                });
        // 慢速觸發(fā)線程池
        slowTriggerPool = new ThreadPoolExecutor(
                10,
                XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax(),
                60L,
                TimeUnit.SECONDS,
                new LinkedBlockingQueue<Runnable>(2000),
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode());
                    }
                });
    }

觸發(fā)器線程池創(chuàng)建調用了JobTriggerPoolHelper類的start方法,start方法創(chuàng)建了兩個線程池、fastTriggerPool為快速線程池、slowTriggerPool為慢速線程池,都是采用阻塞隊列LinkedBlockingQueue,快速線程池的阻塞隊列大小為1000,慢速線程池的阻塞隊列大小為2000。

快速線程池、慢速線程池在什么時候被用來調度任務呢?
默認是用快速調度器調度任務的,當緩存中等待被調度的同一個任務的數(shù)量大于10的時候,就用慢速調度器調度任務。

注冊監(jiān)控器啟動

public void start(){

		// for registry or remove
		// 調度任務注冊線程池
		registryOrRemoveThreadPool = new ThreadPoolExecutor(
				2,
				10,
				30L,
				TimeUnit.SECONDS,
				new LinkedBlockingQueue<Runnable>(2000),
				new ThreadFactory() {
					@Override
					public Thread newThread(Runnable r) {
						return new Thread(r, "xxl-job, admin JobRegistryMonitorHelper-registryOrRemoveThreadPool-" + r.hashCode());
					}
				},
				new RejectedExecutionHandler() {
					@Override
					public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
						r.run();
						logger.warn(">>>>>>>>>>> xxl-job, registry or remove too fast, match threadpool rejected handler(run now).");
					}
				});

		// for monitor
		// 注冊監(jiān)控器線程
		registryMonitorThread = new Thread(new Runnable() {
			@Override
			public void run() {
				while (!toStop) {
					try {
						// auto registry group
						List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);
						if (groupList!=null && !groupList.isEmpty()) {

							// remove dead address (admin/executor)
							List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());
							if (ids!=null && ids.size()>0) {
								XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);
							}

							// fresh online address (admin/executor)
							HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();
							List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());
							if (list != null) {
								for (XxlJobRegistry item: list) {
									if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {
										String appname = item.getRegistryKey();
										List<String> registryList = appAddressMap.get(appname);
										if (registryList == null) {
											registryList = new ArrayList<String>();
										}

										if (!registryList.contains(item.getRegistryValue())) {
											registryList.add(item.getRegistryValue());
										}
										appAddressMap.put(appname, registryList);
									}
								}
							}

							// fresh group address
							for (XxlJobGroup group: groupList) {
								List<String> registryList = appAddressMap.get(group.getAppname());
								String addressListStr = null;
								if (registryList!=null && !registryList.isEmpty()) {
									Collections.sort(registryList);
									StringBuilder addressListSB = new StringBuilder();
									for (String item:registryList) {
										addressListSB.append(item).append(",");
									}
									addressListStr = addressListSB.toString();
									addressListStr = addressListStr.substring(0, addressListStr.length()-1);
								}
								group.setAddressList(addressListStr);
								group.setUpdateTime(new Date());

								XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);
							}
						}
					} catch (Exception e) {
						if (!toStop) {
							logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
						}
					}
					try {
						TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
					} catch (InterruptedException e) {
						if (!toStop) {
							logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
						}
					}
				}
				logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop");
			}
		});
		registryMonitorThread.setDaemon(true);
		registryMonitorThread.setName("xxl-job, admin JobRegistryMonitorHelper-registryMonitorThread");
		registryMonitorThread.start();
	}

JobRegistryHelper的 start 創(chuàng)建了一個調度任務注冊線程池

registryOrRemoveThreadPool以及注冊監(jiān)控器線程registryMonitorThread,調度任務注冊線程池用來執(zhí)行調度任務的注冊,注冊監(jiān)控器線程用來監(jiān)控執(zhí)行器的機器是否下線。

然后將registryMonitorThread設置為守護線程,最后啟動registryMonitorThread線程,開始監(jiān)控執(zhí)行器的機器。

registryMonitorThread線程的run方法的代碼被省略,接下來分析下run方法的具體邏輯:

// 注冊監(jiān)控器線程
		registryMonitorThread = new Thread(new Runnable() {
			@Override
			public void run() {
				while (!toStop) {
					try {
						// auto registry group
						// 自動注冊的執(zhí)行器列表
						List<XxlJobGroup> groupList = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().findByAddressType(0);
						if (groupList!=null && !groupList.isEmpty()) {

							// remove dead address (admin/executor)
							// 獲取已經下線的機器地址記錄
							List<Integer> ids = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findDead(RegistryConfig.DEAD_TIMEOUT, new Date());
							if (ids!=null && ids.size()>0) {
								// 刪除已經下線的注冊
								XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().removeDead(ids);
							}

							// fresh online address (admin/executor) 刷新在線的機器
							HashMap<String, List<String>> appAddressMap = new HashMap<String, List<String>>();
							// 查詢存活的執(zhí)行器注冊機器
							List<XxlJobRegistry> list = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().findAll(RegistryConfig.DEAD_TIMEOUT, new Date());
							if (list != null) {
								for (XxlJobRegistry item: list) {
									//如果是執(zhí)行器,將同一個應用的調度任務放在list中
									if (RegistryConfig.RegistType.EXECUTOR.name().equals(item.getRegistryGroup())) {
										String appname = item.getRegistryKey();
										List<String> registryList = appAddressMap.get(appname);
										if (registryList == null) {
											registryList = new ArrayList<String>();
										}

										if (!registryList.contains(item.getRegistryValue())) {
											registryList.add(item.getRegistryValue());
										}
										appAddressMap.put(appname, registryList);
									}
								}
							}

							// fresh group address
							// 遍歷自動注冊的執(zhí)行器列表
							for (XxlJobGroup group: groupList) {
								List<String> registryList = appAddressMap.get(group.getAppname());
								String addressListStr = null;
								if (registryList!=null && !registryList.isEmpty()) {
									Collections.sort(registryList);
									StringBuilder addressListSB = new StringBuilder();
									// 執(zhí)行器地址拼接
									for (String item:registryList) {
										addressListSB.append(item).append(",");
									}
									addressListStr = addressListSB.toString();
									addressListStr = addressListStr.substring(0, addressListStr.length()-1);
								}
								// 設置地址
								group.setAddressList(addressListStr);
								// 設置注冊更新時間
								group.setUpdateTime(new Date());
								// 更新注冊的執(zhí)行器地址
								XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().update(group);
							}
						}
					} catch (Exception e) {
						if (!toStop) {
							logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
						}
					}
					try {
						TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
					} catch (InterruptedException e) {
						if (!toStop) {
							logger.error(">>>>>>>>>>> xxl-job, job registry monitor thread error:{}", e);
						}
					}
				}
				logger.info(">>>>>>>>>>> xxl-job, job registry monitor thread stop");
			}
		});

run()方法一直執(zhí)行,直到服務停止,主要做了兩件事:

  1. 將已經下線的執(zhí)行器的記錄從數(shù)據(jù)庫中刪除
  2. 將還在線的執(zhí)行器機器記錄重新設置執(zhí)行器地址以及更新執(zhí)行器的時間,然后更新數(shù)據(jù)庫的記錄。

怎么判定執(zhí)行器已經下線了?如果數(shù)據(jù)庫中的update_time字段小于當前時間減去死亡期限,那么說明已經執(zhí)行器在死亡期限沒有進行更新時間,就判定已經下線了。

執(zhí)行器在啟動的時候,會啟動一個執(zhí)行器線程不斷的執(zhí)行注冊任務,執(zhí)行器任務會更新update_time字段。

失敗監(jiān)控器啟動

public void start(){
        monitorThread = new Thread(new Runnable() {

            @Override
            public void run() {
                //代碼省略
            }
        });
        monitorThread.setDaemon(true);
        monitorThread.setName("xxl-job, admin JobFailMonitorHelper");
        monitorThread.start();
}

上述代碼創(chuàng)建了一個名字為monitorThread的線程,并設為守護線程,然后啟動這個線程。線程的run方法的代碼被省略,run方法的代碼如下:

			@Override
			public void run() {

				// monitor
				while (!toStop) {
					try {
						// 獲取失敗任務日志, 最多1000條
						List<Long> failLogIds = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().findFailJobLogIds(1000);
						if (failLogIds!=null && !failLogIds.isEmpty()) {
							// 遍歷失敗日志
							for (long failLogId: failLogIds) {
								// lock log
								// 將默認(0)告警狀態(tài)設置為鎖定狀態(tài)(-1)
								int lockRet = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, 0, -1);
								if (lockRet < 1) {
									continue;
								}
								XxlJobLog log = XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().load(failLogId);
								// 獲取任務信息
								XxlJobInfo info = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(log.getJobId());

								// 1、fail retry monitor
								// 如果失敗重試次數(shù)大于0
								if (log.getExecutorFailRetryCount() > 0) {
									// 觸發(fā)任務執(zhí)行
									JobTriggerPoolHelper.trigger(log.getJobId(), TriggerTypeEnum.RETRY, (log.getExecutorFailRetryCount()-1), log.getExecutorShardingParam(), log.getExecutorParam(), null);
									String retryMsg = "<br><br><span style=\"color:#F39C12;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_type_retry") +"<<<<<<<<<<< </span><br>";
									log.setTriggerMsg(log.getTriggerMsg() + retryMsg);
									// 更新觸發(fā)日志
									XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(log);
								}

								// 2、fail alarm monitor
								int newAlarmStatus = 0;		// 告警狀態(tài):0-默認、-1=鎖定狀態(tài)、1-無需告警、2-告警成功、3-告警失敗
								// 如果告警右鍵不為null
								if (info != null) {
									// 告警
									boolean alarmResult = XxlJobAdminConfig.getAdminConfig().getJobAlarmer().alarm(info, log);
									newAlarmStatus = alarmResult?2:3;
								} else {
									newAlarmStatus = 1;
								}
								// 將鎖定(-1)的日志更新為新的告警狀態(tài)
								XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateAlarmStatus(failLogId, -1, newAlarmStatus);
							}
						}

					} catch (Exception e) {
						if (!toStop) {
							logger.error(">>>>>>>>>>> xxl-job, job fail monitor thread error:{}", e);
						}
					}

                    try {
                        TimeUnit.SECONDS.sleep(10);
                    } catch (Exception e) {
                        if (!toStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }

                }

				logger.info(">>>>>>>>>>> xxl-job, job fail monitor thread stop");

			}

run方法一直運行,直到線程停止。run方法的首先從數(shù)據(jù)庫中獲取失敗的調度任務日志列表,每次最多一千條。遍歷失敗的調度任務日志列表,首先將失敗的調度任務日志進行鎖定,暫停給告警郵件發(fā)送告警信息。如果調度任務的失敗重試次數(shù)大于0,觸發(fā)任務執(zhí)行,更新任務日志信息。當郵件不為空時,觸發(fā)告警信息,最后將鎖定的日志狀態(tài)更新為告警狀態(tài)。

日志任務啟動

主要做了兩件事:

  1. 統(tǒng)計當前時間前三天的觸發(fā)任務的數(shù)量、運行中的任務的數(shù)量、成功的任務數(shù)量、任務失敗的數(shù)量,然后保存在數(shù)據(jù)庫中。
  2. 根據(jù)配置的保存日志的過期時間,將已經過期的日志從數(shù)據(jù)庫中查出來,然后清理過期的日志。日志任務啟動是創(chuàng)意了一個線程,然后一直在后臺運行。

調度啟動

調度啟動創(chuàng)建了兩個線程,一個線程是用于不斷從數(shù)據(jù)庫把5秒內要執(zhí)行的任務讀出,立即觸發(fā)或者放到時間輪等待觸發(fā),一個是用于觸發(fā)任務。

總結

調用中心啟動就是啟動springboot項目。

在啟動的過程中加載XxlJobAdminConfig配置類,在配置類中,會進行一系列的初始化工作,加載配置信息,創(chuàng)建以及初始化一系列化線程在后臺一直異步運行,提高了性能。

定時任務執(zhí)行流程分析-客戶端觸發(fā)

客戶端觸發(fā)是記錄觸發(fā)的日志、準備觸發(fā)參數(shù)觸發(fā)遠程服務器的執(zhí)行。

trigger方法準備觸發(fā)任務

    /**
     * trigger job
     *  <p>
     *      1.根據(jù)任務id從數(shù)據(jù)庫中獲取執(zhí)行的任務
     *      2.根據(jù)任務組名字從數(shù)據(jù)庫中獲取任務組,如果地址不為空,覆蓋原來的地址列表,設置觸發(fā)類型為手動觸發(fā)。
     *      3.判斷路由策略,如果是分片廣播,遍歷地址列表,觸發(fā)所有的機器,否則只觸發(fā)一臺機器。
     *      4.分片廣播是要觸發(fā)所有的機器并行處理任務。
     *  </p>
     *  
     * @param jobId                 工作id
     * @param triggerType           觸發(fā)類型
     * @param failRetryCount        失敗重試計數(shù)
     * 			>=0: use this param
     * 			<0: use param from job info config         
     * @param executorShardingParam 遺囑執(zhí)行人切分參數(shù)
     * @param executorParam         執(zhí)行器參數(shù)
     *          null: use job param
     *          not null: cover job param
     * @param addressList           地址列表
     *          null: use executor addressList
     *          not null: cover
     */
    public static void trigger(int jobId,
                               TriggerTypeEnum triggerType,
                               int failRetryCount,
                               String executorShardingParam,
                               String executorParam,
                               String addressList) {

        // load data
        // 從數(shù)據(jù)庫中獲取任務
        XxlJobInfo jobInfo = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().loadById(jobId);
        if (jobInfo == null) {
            logger.warn(">>>>>>>>>>>> trigger fail, jobId invalid,jobId={}", jobId);
            return;
        }
        // 設置執(zhí)行參數(shù)
        if (executorParam != null) {
            jobInfo.setExecutorParam(executorParam);
        }
        // 重試次數(shù)
        int finalFailRetryCount = failRetryCount>=0?failRetryCount:jobInfo.getExecutorFailRetryCount();
        XxlJobGroup group = XxlJobAdminConfig.getAdminConfig().getXxlJobGroupDao().load(jobInfo.getJobGroup());
        // 如果地址不為空,覆蓋原來的地址列表
        // cover addressList
        if (addressList!=null && addressList.trim().length()>0) {
            group.setAddressType(1);
            group.setAddressList(addressList.trim());
        }

        // sharding param
        int[] shardingParam = null;
        // executorShardingParam不等于null
        if (executorShardingParam!=null){
            String[] shardingArr = executorShardingParam.split("/");
            if (shardingArr.length==2 && isNumeric(shardingArr[0]) && isNumeric(shardingArr[1])) {
                shardingParam = new int[2];
                shardingParam[0] = Integer.valueOf(shardingArr[0]);
                shardingParam[1] = Integer.valueOf(shardingArr[1]);
            }
        }
        // 分片廣播
        if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null)
                && group.getRegistryList()!=null && !group.getRegistryList().isEmpty()
                && shardingParam==null) {
            // 并行處理
            for (int i = 0; i < group.getRegistryList().size(); i++) {
                // 處理觸發(fā)
                processTrigger(group, jobInfo, finalFailRetryCount, triggerType, i, group.getRegistryList().size());
            }
        } else {
            if (shardingParam == null) {
                shardingParam = new int[]{0, 1};
            }
            processTrigger(group, jobInfo, finalFailRetryCount, triggerType, shardingParam[0], shardingParam[1]);
        }

    }

processTrigger觸發(fā)任務

    /**
     * <p>
     *      獲取執(zhí)行阻塞策略
     *      獲取路由策略
     *      保存任務日志
     *      初始化觸發(fā)參數(shù)
     *      初始化執(zhí)行器的地址:如果路由策略是分片廣播,執(zhí)行地址就為第index的地址,否則從通過路由獲取執(zhí)行地址。
     *      觸發(fā)遠程執(zhí)行器,即觸發(fā)遠程的定時任務
     *      設置觸發(fā)信息并保存觸發(fā)日志
     * </p>
     * @param group                     job group, registry list may be empty
     * @param jobInfo
     * @param finalFailRetryCount
     * @param triggerType
     * @param index                     sharding index
     * @param total                     sharding index
     */
    private static void processTrigger(XxlJobGroup group, XxlJobInfo jobInfo, int finalFailRetryCount, TriggerTypeEnum triggerType, int index, int total){

        // param
        // 執(zhí)行阻塞策略
        ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(jobInfo.getExecutorBlockStrategy(), ExecutorBlockStrategyEnum.SERIAL_EXECUTION);  // block strategy
        // 路由策略
        ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null);    // route strategy
        // 分片廣播
        String shardingParam = (ExecutorRouteStrategyEnum.SHARDING_BROADCAST==executorRouteStrategyEnum)?String.valueOf(index).concat("/").concat(String.valueOf(total)):null;

        // 1、save log-id 保存日志
        XxlJobLog jobLog = new XxlJobLog();
        jobLog.setJobGroup(jobInfo.getJobGroup());
        jobLog.setJobId(jobInfo.getId());
        jobLog.setTriggerTime(new Date());
        XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().save(jobLog);
        logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());

        // 2、init trigger-param 初始化觸發(fā)參數(shù)
        TriggerParam triggerParam = new TriggerParam();
        triggerParam.setJobId(jobInfo.getId());
        triggerParam.setExecutorHandler(jobInfo.getExecutorHandler());
        triggerParam.setExecutorParams(jobInfo.getExecutorParam());
        triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy());
        triggerParam.setExecutorTimeout(jobInfo.getExecutorTimeout());
        triggerParam.setLogId(jobLog.getId());
        triggerParam.setLogDateTime(jobLog.getTriggerTime().getTime());
        triggerParam.setGlueType(jobInfo.getGlueType());
        triggerParam.setGlueSource(jobInfo.getGlueSource());
        triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
        triggerParam.setBroadcastIndex(index);
        triggerParam.setBroadcastTotal(total);

        // 3、init address 初始化地址
        String address = null;
        ReturnT<String> routeAddressResult = null;
        if (group.getRegistryList()!=null && !group.getRegistryList().isEmpty()) {
            if (ExecutorRouteStrategyEnum.SHARDING_BROADCAST == executorRouteStrategyEnum) {
                if (index < group.getRegistryList().size()) {
                    address = group.getRegistryList().get(index);
                } else {
                    address = group.getRegistryList().get(0);
                }
            } else {
                // 路由獲取地址
                routeAddressResult = executorRouteStrategyEnum.getRouter().route(triggerParam, group.getRegistryList());
                if (routeAddressResult.getCode() == ReturnT.SUCCESS_CODE) {
                    address = routeAddressResult.getContent();
                }
            }
        } else {
            routeAddressResult = new ReturnT<String>(ReturnT.FAIL_CODE, I18nUtil.getString("jobconf_trigger_address_empty"));
        }

        // 4、trigger remote executor 觸發(fā)遠程執(zhí)行器
        ReturnT<String> triggerResult = null;
        if (address != null) {
            triggerResult = runExecutor(triggerParam, address);
        } else {
            triggerResult = new ReturnT<String>(ReturnT.FAIL_CODE, null);
        }

        // 5、collection trigger info 觸發(fā)信息
        StringBuffer triggerMsgSb = new StringBuffer();
        triggerMsgSb.append(I18nUtil.getString("jobconf_trigger_type")).append(":").append(triggerType.getTitle());
        triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_admin_adress")).append(":").append(IpUtil.getIp());
        triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regtype")).append(":")
                .append( (group.getAddressType() == 0)?I18nUtil.getString("jobgroup_field_addressType_0"):I18nUtil.getString("jobgroup_field_addressType_1") );
        triggerMsgSb.append("<br>").append(I18nUtil.getString("jobconf_trigger_exe_regaddress")).append(":").append(group.getRegistryList());
        triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorRouteStrategy")).append(":").append(executorRouteStrategyEnum.getTitle());
        if (shardingParam != null) {
            triggerMsgSb.append("("+shardingParam+")");
        }
        triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorBlockStrategy")).append(":").append(blockStrategy.getTitle());
        triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_timeout")).append(":").append(jobInfo.getExecutorTimeout());
        triggerMsgSb.append("<br>").append(I18nUtil.getString("jobinfo_field_executorFailRetryCount")).append(":").append(finalFailRetryCount);

        triggerMsgSb.append("<br><br><span style=\"color:#00c0ef;\" > >>>>>>>>>>>"+ I18nUtil.getString("jobconf_trigger_run") +"<<<<<<<<<<< </span><br>")
                .append((routeAddressResult!=null&&routeAddressResult.getMsg()!=null)?routeAddressResult.getMsg()+"<br><br>":"").append(triggerResult.getMsg()!=null?triggerResult.getMsg():"");

        // 6、save log trigger-info 保存觸發(fā)日志
        jobLog.setExecutorAddress(address);
        jobLog.setExecutorHandler(jobInfo.getExecutorHandler());
        jobLog.setExecutorParam(jobInfo.getExecutorParam());
        jobLog.setExecutorShardingParam(shardingParam);
        jobLog.setExecutorFailRetryCount(finalFailRetryCount);
        //jobLog.setTriggerTime();
        jobLog.setTriggerCode(triggerResult.getCode());
        jobLog.setTriggerMsg(triggerMsgSb.toString());
        XxlJobAdminConfig.getAdminConfig().getXxlJobLogDao().updateTriggerInfo(jobLog);

        logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
    }

    public static ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
        ReturnT<String> runResult = null;
        try {
            // 獲取執(zhí)行器
            ExecutorBiz executorBiz = XxlJobScheduler.getExecutorBiz(address);
            // 執(zhí)行任務
            runResult = executorBiz.run(triggerParam);
        } catch (Exception e) {
            logger.error(">>>>>>>>>>> xxl-job trigger error, please check if the executor[{}] is running.", address, e);
            runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ThrowableUtil.toString(e));
        }
        // 返回執(zhí)行結果
        StringBuffer runResultSB = new StringBuffer(I18nUtil.getString("jobconf_trigger_run") + ":");
        runResultSB.append("<br>address:").append(address);
        runResultSB.append("<br>code:").append(runResult.getCode());
        runResultSB.append("<br>msg:").append(runResult.getMsg());

        runResult.setMsg(runResultSB.toString());
        return runResult;
    }

runExecutor方法通過 XxlJobScheduler.getExecutorBiz方法獲取執(zhí)行器ExecutorBiz,然后調用執(zhí)行器ExecutorBiz的run方法執(zhí)行任務。getExecutorBiz方法首先通過地址從executorBizRepository(map)獲取ExecutorBiz,如果獲取的ExecutorBiz不為null,則直接返回,否則,創(chuàng)建一個ExecutorBizClient保存在executorBizRepository中,然后將創(chuàng)建的ExecutorBizClient返回。

ExecutorBiz接口有兩個實現(xiàn),分別是ExecutorBizClient(執(zhí)行器客戶端)、ExecutorBizImpl(執(zhí)行器服務端),ExecutorBizClien類就是客戶端操作任務的類,ExecutorBizImpl就是服務端操作任務的類。ExecutorBiz接口有beat(心跳檢測)、idleBeat(空閑檢測)、run(執(zhí)行任務)、kill(停止任務)、log(打印日志)這些方法。

我們看看ExecutorBizClien的run方法:

public ReturnT<String> run(TriggerParam triggerParam) {
        return XxlJobRemotingUtil.postBody(addressUrl + "run", accessToken, timeout, triggerParam, String.class);
}

ExecutorBizClien的run方法比較簡單,就是調用http請求發(fā)送觸發(fā)參數(shù)觸發(fā)服務端的任務執(zhí)行,然后將結果返回給客戶端。請求的地址為addressUrl + “run”,當客戶端發(fā)送請求以后,ExecutorBizImpl的run方法將會接收請求處理,然后將處理的結果返回,這篇文章就講到這里,服務端執(zhí)行定時任務放到下一篇文章進行講解。

總結

客戶端觸發(fā)任務執(zhí)行,首先從數(shù)據(jù)庫中查詢出需要執(zhí)行的任務,然后做好任務執(zhí)行的準備,如日志的記錄、觸發(fā)參數(shù)的初始化、獲取執(zhí)行的地址等,然后發(fā)送http請求給服務端執(zhí)行任務,服務器將處理任務的結果返回給客戶端。客戶端觸發(fā)任務執(zhí)行,是通過http請求觸發(fā)任務執(zhí)行,如果請求丟失,那么就會錯過任務的執(zhí)行。

定時任務執(zhí)行流程分析-服務端執(zhí)行

執(zhí)行器啟動時,會初始化一個EmbedServer類,該類的start方法會啟動netty服務器。netty服務器會接收客戶端發(fā)送過來的http請求,當接收到觸發(fā)請求(請求路徑是/run)會交給EmbedServer類的process方法處理,process方法將會調用ExecutorBizImpl的run方法處理客戶端發(fā)送的觸發(fā)請求。

ExecutorBizImpl的run方法處理流程大致如下:

 /**
     * <p>
     *     1.加載任務處理器與任務執(zhí)行線程,校驗任務處理器與任務執(zhí)行線程
     *     2.執(zhí)行阻塞策略
     *     3.注冊任務
     *     4.保存觸發(fā)參數(shù)到緩存
     *
     * </p>
     *
     * @param triggerParam 觸發(fā)參數(shù)
     * @return {@link ReturnT}<{@link String}>
     */
    @Override
    public ReturnT<String> run(TriggerParam triggerParam) {
        // load old:jobHandler + jobThread
        // 加載舊的任務處理器和任務線程
        JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
        IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
        String removeOldReason = null;

        // valid:jobHandler + jobThread
        GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
        if (GlueTypeEnum.BEAN == glueTypeEnum) {

            // new jobhandler
            // new jobhandler 從緩存中加載任務處理器,根據(jù)處理器名字
            IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());

            // valid old jobThread
            // valid old jobThread 如果新的任務處理器與舊的任務處理器不同,將舊的任務處理器以及舊的任務線程gc
            if (jobThread!=null && jobHandler != newJobHandler) {
                // change handler, need kill old thread
                removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";

                jobThread = null;
                jobHandler = null;
            }

            // valid handler
            if (jobHandler == null) {
                jobHandler = newJobHandler;
                if (jobHandler == null) {
                    return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");
                }
            }

        } else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {

            // valid old jobThread
            if (jobThread != null &&
                    !(jobThread.getHandler() instanceof GlueJobHandler
                        && ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
                // change handler or gluesource updated, need kill old thread
                removeOldReason = "change job source or glue type, and terminate the old job thread.";

                jobThread = null;
                jobHandler = null;
            }

            // valid handler
            if (jobHandler == null) {
                try {
                    IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource());
                    jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime());
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                    return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());
                }
            }
        } else if (glueTypeEnum!=null && glueTypeEnum.isScript()) {

            // valid old jobThread
            if (jobThread != null &&
                    !(jobThread.getHandler() instanceof ScriptJobHandler
                            && ((ScriptJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
                // change script or gluesource updated, need kill old thread
                removeOldReason = "change job source or glue type, and terminate the old job thread.";

                jobThread = null;
                jobHandler = null;
            }

            // valid handler
            if (jobHandler == null) {
                jobHandler = new ScriptJobHandler(triggerParam.getJobId(), triggerParam.getGlueUpdatetime(), triggerParam.getGlueSource(), GlueTypeEnum.match(triggerParam.getGlueType()));
            }
        } else {
            return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");
        }

        // executor block strategy
        if (jobThread != null) {
            ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
            if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
                // discard when running
                if (jobThread.isRunningOrHasQueue()) {
                    return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
                }
            } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
                // kill running jobThread
                if (jobThread.isRunningOrHasQueue()) {
                    removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();

                    jobThread = null;
                }
            } else {
                // just queue trigger
            }
        }

        // replace thread (new or exists invalid)
        if (jobThread == null) {
            jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
        }

        // push data to queue
        ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
        return pushResult;
    }

run方法首先根據(jù)任務id從緩存jobThreadRepository(map)中獲取任務執(zhí)行線程jobThread,任務執(zhí)行線程jobThread保存著任務處理器jobHandler,然后進行校驗任務執(zhí)行線程以及任務處理器。在了解校驗過程之前,我們先了解下xxl-job定時任務的種類,xxll0job支持java、groovy、腳本(Shell、Python、PHP、NodeJs、PowerShell)的定時任務。

接下來檢驗任務執(zhí)行線程以及任務處理器,就是按照Java、groovy、腳本分別進行校驗。

當任務的種類是java時,根據(jù)任務處理器的名字從jobHandlerRepository(map)中獲取任務處理器,如果新的任務處理器與舊的任務處理器不同,將舊的任務處理器以及舊的任務線程設置為null,等待被java虛擬機gc掉,這樣做的目的是,如果已經重新設置了新的任務執(zhí)行線程和任務處理器,那么就舊的gc掉,不至于一直存在內存中。

如果任務的種類是groovy時,判斷任務執(zhí)行線程不等于null、任務處理器已經更改和groovy的代碼被更新了,那么就將舊的任務執(zhí)行線程和任務執(zhí)行器設置為null,等待被gc,如果任務處理器還是為null,那么新創(chuàng)建GlueJobHandler任務處理器。

如果是任務的種類是腳本類型,判斷任務執(zhí)行線程不等于null、任務處理器已經更改和腳本的代碼被更新了,那么就將舊的任務執(zhí)行線程和任務執(zhí)行器設置為null,等待被gc,如果任務處理器還是為null,那么新創(chuàng)建ScriptJobHandler任務處理器。

執(zhí)行阻塞策略

// 執(zhí)行阻塞策略
        if (jobThread != null) {
            // 阻塞策略
            ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
            if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
                // discard when running 如果任務正在執(zhí)行,直接返回結果,不再往下執(zhí)行任務
                if (jobThread.isRunningOrHasQueue()) {
                    return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
                }
            } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
                // kill running jobThread 覆蓋之前的
                if (jobThread.isRunningOrHasQueue()) {
                    removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();

                    jobThread = null;
                }
            } else {
                // just queue trigger
            }
        }

xxl-job 有三種阻塞隊列,分別為SERIAL_EXECUTION(單機串行)、DISCARD_LATER(丟棄)、COVER_EARLY(覆蓋之前的)。當阻塞策略為丟棄,則判斷該執(zhí)行線程是否正在執(zhí)行,如果是則直接返回結果,不再往下執(zhí)行任務了。當阻塞策略為覆蓋之前的,則判斷執(zhí)行線程是否正在執(zhí)行,如果是則殺掉原來的執(zhí)行線程。如果阻塞策略是這倆種之外,則不做什么。

注冊任務

        // replace thread (new or exists invalid) 
        // 如果任務線程等于null,注冊任務線程并啟動線程
        if (jobThread == null) {
            jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
        }

    public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){
        JobThread newJobThread = new JobThread(jobId, handler);
        newJobThread.start();
        logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler});

        JobThread oldJobThread = jobThreadRepository.put(jobId, newJobThread);	// putIfAbsent | oh my god, map's put method return the old value!!!
        if (oldJobThread != null) {
            oldJobThread.toStop(removeOldReason);
            oldJobThread.interrupt();
        }

        return newJobThread;
    }

如果任務線程等于null,注冊任務線程并啟動線程。registJobThread方法首先新建一個任務線程,并調用newJobThread的start方法啟動任務線程。然后加入jobThreadRepository進行緩存,當舊的oldJobThread不等于null,則停止掉舊的任務線程。

保存觸發(fā)參數(shù)到緩存

        // push data to queue
        // 保存觸發(fā)參數(shù)到緩存
        ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
        return pushResult;

	public ReturnT<String> pushTriggerQueue(TriggerParam triggerParam) {
		// avoid repeat
		if (triggerLogIdSet.contains(triggerParam.getLogId())) {
			logger.info(">>>>>>>>>>> repeate trigger job, logId:{}", triggerParam.getLogId());
			return new ReturnT<String>(ReturnT.FAIL_CODE, "repeate trigger job, logId:" + triggerParam.getLogId());
		}

		triggerLogIdSet.add(triggerParam.getLogId());
		triggerQueue.add(triggerParam);
        return ReturnT.SUCCESS;
	}

pushTriggerQueue方法判斷任務id是否已經存在triggerLogIdSet中,如果存在就直接返回結果,如果不存在就添加到triggerLogIdSet中,然后將觸發(fā)參數(shù)保存在triggerQueue隊列中。

xxl-job 執(zhí)行器路由選擇

現(xiàn)在的服務大部分都是微服務,在分布式環(huán)境中,服務在多臺服務器上部署。

xxl-job為了防止定時任務在同一時間內多臺服務運行定時任務,利用數(shù)據(jù)庫的悲觀鎖保證同一時間內只有一臺服務運行定時任務,在運行定時任務之前首先獲取到鎖(select lock for update),然后才運行定時任務,當任務運行完成時,釋放悲觀鎖,其他服務就可以去嘗試獲取鎖而執(zhí)行定時任務。

上述是xxl-job在分布式環(huán)境中如何保證同一時間只有一臺服務運行定時任務,那么如何從多臺服務中選出一臺服務來運行定時任務,這就設計到xxl-job執(zhí)行路由選擇的問題,接下來分析xxl-job是如何選擇執(zhí)行器的。

執(zhí)行器路由抽象類 ExecutorRouterroute 方法是選擇服務器地址的,決定哪一臺服務器來執(zhí)行定時任務。子列展示如下:

// com/xxl/job/admin/core/route/ExecutorRouteStrategyEnum.java
    /**
     * 執(zhí)行器地址列表的第一個列表
     */
    FIRST(I18nUtil.getString("jobconf_route_first"), new ExecutorRouteFirst()),
    /**
     * 執(zhí)行器地址列表的最后一個地址
     */
    LAST(I18nUtil.getString("jobconf_route_last"), new ExecutorRouteLast()),
    /**
     * 輪詢路由,輪詢選擇一個執(zhí)行器地址
     */
    ROUND(I18nUtil.getString("jobconf_route_round"), new ExecutorRouteRound()),
    /**
     * 隨機路由,隨機選擇一個執(zhí)行器地址
     */
    RANDOM(I18nUtil.getString("jobconf_route_random"), new ExecutorRouteRandom()),
    /**
     * 哈希一致性路由,通過哈希一致性算法選擇執(zhí)行器地址
     */
    CONSISTENT_HASH(I18nUtil.getString("jobconf_route_consistenthash"), new ExecutorRouteConsistentHash()),
    /**
     * 最不經常使用路由,使用頻率最低的執(zhí)行器地址
     */
    LEAST_FREQUENTLY_USED(I18nUtil.getString("jobconf_route_lfu"), new ExecutorRouteLFU()),
    /**
     * 最近最少使用(最近最久未使用路由,選擇最近最久未被使用的執(zhí)行器地址)
     */
    LEAST_RECENTLY_USED(I18nUtil.getString("jobconf_route_lru"), new ExecutorRouteLRU()),
    /**
     * 故障轉移路由,查找心跳正常的執(zhí)行器地址
     */
    FAILOVER(I18nUtil.getString("jobconf_route_failover"), new ExecutorRouteFailover()),
    /**
     * 忙碌轉移路由,從執(zhí)行器地址列表查找心跳正常的執(zhí)行器地址
     */
    BUSYOVER(I18nUtil.getString("jobconf_route_busyover"), new ExecutorRouteBusyover()),
    /**
     * 分片廣播
     */
    SHARDING_BROADCAST(I18nUtil.getString("jobconf_route_shard"), null);

Xxl-job 執(zhí)行器路由選擇可以去xxl-job的源碼倉庫觀看。

xxl-job定時任務執(zhí)行流程分析-任務執(zhí)行

在服務端執(zhí)行的流程中,將任務交給任務線程池JobThread執(zhí)行,JobThread的run方法主要做了幾件事:

  • 處理器的初始化
  • 任務的執(zhí)行
  • 銷毀清理工作

處理器的初始化

    	// init
    	try {
			handler.init();
		} catch (Throwable e) {
    		logger.error(e.getMessage(), e);
		}

處理器的初始化比較簡單,調用IJobHandler的init方法,IJobHandler是接口類型,有三種方法,分別是init(初始化方法)、execute(執(zhí)行方法)、destroy(銷毀)方法。IJobHandler接口將在下面具體分析。文章來源地址http://www.zghlxwxcb.cn/news/detail-495615.html

任務的執(zhí)行

銷毀清理工作

// callback trigger request in queue
		// 如果任務停止了,需要將隊列中的所有觸發(fā)刪除(所有定時任務刪除)
		while(triggerQueue !=null && triggerQueue.size()>0){
			// 從隊列中獲取觸發(fā)參數(shù)
			TriggerParam triggerParam = triggerQueue.poll();
			if (triggerParam!=null) {
				// is killed
				TriggerCallbackThread.pushCallBack(new HandleCallbackParam(
						triggerParam.getLogId(),
						triggerParam.getLogDateTime(),
						XxlJobContext.HANDLE_CODE_FAIL,
						stopReason + " [job not executed, in the job queue, killed.]")
				);
			}
		}

		// destroy
		// 執(zhí)行器的銷毀方法
		try {
			handler.destroy();
		} catch (Throwable e) {
			logger.error(e.getMessage(), e);
		}

到了這里,關于初識輕量級分布式任務調度平臺 xxl-job的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網!

本文來自互聯(lián)網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。如若轉載,請注明出處: 如若內容造成侵權/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經查實,立即刪除!

領支付寶紅包贊助服務器費用

相關文章

  • 初識Flask:Python輕量級Web框架入門教程

    Flask是一個用Python編寫的輕量級Web應用框架。由于其“微”性質,F(xiàn)lask在提供核心服務的同時,仍然提供了許多擴展的可能性。在這篇文章中,我們將從最基礎開始,學習如何使用Flask構建一個Web應用。 首先,你需要安裝Flask庫。使用pip進行安裝是最簡單的方式: 接著,我們來

    2024年02月14日
    瀏覽(432)
  • 深入了解IdleHandler,用來做優(yōu)化或者輕量級任務都是極好的

    深入了解IdleHandler,用來做優(yōu)化或者輕量級任務都是極好的

    關于作者:CSDN內容合伙人、技術專家, 從零開始做日活千萬級APP。 專注于分享各領域原創(chuàng)系列文章 ,擅長java后端、移動開發(fā)、人工智能等,希望大家多多支持。 我們繼續(xù)總結學習 Android 基礎知識 ,溫故知新。 IdleHandler 是 Handler 提供的一種充分利用CPU的機制, 主要是在

    2024年02月15日
    瀏覽(98)
  • git輕量級服務器gogs、gitea,非輕量級gitbucket

    git輕量級服務器gogs、gitea,非輕量級gitbucket

    本文來源:git輕量級服務器gogs、gitea,非輕量級gitbucket, 或 gitcode/gogs,gitea.md 結論: gogs、gitea很相似 確實輕, gitbucket基于java 不輕, 這三者都不支持組織樹(嵌套組織 nested group) 只能一層組織。 個人用,基于gogs、gitea,兩層結構樹 簡易辦法: 把用戶當成第一層節(jié)點、該用戶的

    2024年02月07日
    瀏覽(140)
  • 輕量靈動: 革新輕量級服務開發(fā)

    輕量靈動: 革新輕量級服務開發(fā)

    從 JDK 8 升級到 JDK 17 可以讓你的應用程序受益于新的功能、性能改進和安全增強。下面是一些 JDK 8 升級到 JDK 17 的最佳實戰(zhàn): 1.1、確定升級的必要性:首先,你需要評估你的應用程序是否需要升級到 JDK 17。查看 JDK 17 的新特性、改進和修復的 bug,以確定它們對你的應用程序

    2024年02月07日
    瀏覽(99)
  • 初識Redis之分布式

    初識Redis之分布式

    一.簡單介紹: ????????Redis是用來在 內存中 , 存儲數(shù)據(jù) 的, 他的初心是用來搞\\\' 消息中間件 \\\'(或者說消息隊列 很熟悉了吧~~),但是呢用的不多,他現(xiàn)在主要是用來做 數(shù)據(jù)庫,緩存 ????????用來存儲數(shù)據(jù), 為什么不直接存儲呢? ? Redis的優(yōu)勢就在于\\\' 分布式系統(tǒng) \\\' 二.分布式系統(tǒng)

    2024年02月10日
    瀏覽(92)
  • 輕量級 HTTP 請求組件

    Apache HttpClient 是著名的 HTTP 客戶端請求工具——現(xiàn)在我們模擬它打造一套簡單小巧的請求工具庫, 封裝 Java 類庫里面的 HttpURLConnection 對象來完成日常的 HTTP 請求,諸如 GET、HEAD、POST 等等,并嘗試應用 Java 8 函數(shù)式風格來制定 API。 組件源碼在:https://gitee.com/sp42_admin/ajaxjs/tr

    2024年02月01日
    瀏覽(100)
  • Kotlin 輕量級Android開發(fā)

    Kotlin 輕量級Android開發(fā)

    Kotlin 是一門運行在 JVM 之上的語言。 它由 Jetbrains 創(chuàng)建,而 Jetbrains 則是諸多強大的工具(如知名的 Java IDE IntelliJ IDEA )背后的公司。 Kotlin 是一門非常簡單的語言,其主要目標之一就是提供強大語言的同時又保持簡單且精簡的語法。 其主要特性如下所示: 輕量級:這一點對

    2024年02月07日
    瀏覽(903)
  • 108中超輕量級的加載動畫!

    大家好,我是【程序視點】小二哥! 今天要上的菜不是 Animate.js,也不是 Move.js,而是能提供108種加載動畫的庫: Whirl . 話不多說,直接來看例子。 以上只是冰山一角。whirl的CSS加載動畫集合中有108種選項供你挑選。選中喜歡的動畫后,點擊“Grab the CSS on Github!”。 將跳轉到

    2024年02月03日
    瀏覽(94)
  • Tomcat輕量級服務器

    Tomcat輕量級服務器

    目錄 1.常見系統(tǒng)架構? C-S架構 B-S架構 2.B-S架構系統(tǒng)的通信步驟 3.常見WEB服服務器軟件 4.Tomcat服務器的配置 下載安裝 環(huán)境變量配置 測試環(huán)境變量是否配置成功 測試Tomcat服務器是否配置成功? Tomcat窗口一閃而過的解決步驟 Tomcat解決亂碼 介紹: C-S架構即Client/Server(客戶端/服務

    2023年04月14日
    瀏覽(102)
  • C++輕量級單元測試框架

    單元測試是構建穩(wěn)定、高質量的程序、服務或系統(tǒng)的必不可少的一環(huán)。通過單元測試,我們可以在開發(fā)過程中及時發(fā)現(xiàn)和修復代碼中的問題,提高代碼的質量和可維護性。同時,單元測試也可以幫助我們更好地理解代碼的功能和實現(xiàn)細節(jié),從而更好地進行代碼重構和優(yōu)化。

    2023年04月25日
    瀏覽(89)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領取紅包,優(yōu)惠每天領

二維碼1

領取紅包

二維碼2

領紅包