国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

【源碼分析】XXL-JOB的執(zhí)行器的注冊流程

這篇具有很好參考價值的文章主要介紹了【源碼分析】XXL-JOB的執(zhí)行器的注冊流程。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

目的:分析xxl-job執(zhí)行器的注冊過程

流程:

  1. 獲取執(zhí)行器中所有被注解(@xxlJjob)修飾的handler
  2. 執(zhí)行器注冊過程
  3. 執(zhí)行器中任務(wù)執(zhí)行過程

版本:xxl-job 2.3.1

建議:下載xxl-job源碼,按流程圖debug調(diào)試,看堆棧信息并按文章內(nèi)容理解執(zhí)行流程。

完整流程圖:

查找Handler任務(wù)

部分流程圖:

首先啟動管理臺界面(服務(wù)XxlJobAdminApplication),然后啟動項目中給的執(zhí)行器實例(SpringBoot);

這個方法是掃描項目中使用@xxlJob注解的所有handler方法。接著往下走

private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {
    if (applicationContext == null) {
        return;
    }
    //獲取該項目中所有的bean,然后遍歷
    String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);
    for (String beanDefinitionName : beanDefinitionNames) {
        Object bean = applicationContext.getBean(beanDefinitionName);

        Map<Method, XxlJob> annotatedMethods = null;   // referred to :org.springframework.context.event.EventListenerMethodProcessor.processBean
        try {
            annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),
                    new MethodIntrospector.MetadataLookup<XxlJob>() {
                        //注意點★
                        @Override
                        public XxlJob inspect(Method method) {
                            return AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class);
                        }
                    });
        } catch (Throwable ex) {
            logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", ex);
        }
        //沒有跳過本次循環(huán)繼續(xù)
        if (annotatedMethods==null || annotatedMethods.isEmpty()) {
            continue;
        }
    	//獲取了當(dāng)前執(zhí)行器中所有@xxl-job的方法,獲取方法以及對應(yīng)的初始化和銷毀方法
        for (Map.Entry<Method, XxlJob> methodXxlJobEntry : annotatedMethods.entrySet()) {
            Method executeMethod = methodXxlJobEntry.getKey();
            XxlJob xxlJob = methodXxlJobEntry.getValue();
            // regist
            registJobHandler(xxlJob, bean, executeMethod);
        }
    }
}

Spring案例執(zhí)行器中有5個handler:

XxlJobExecutor.registJobHandler()中部分源碼

String name = xxlJob.value();
//make and simplify the variables since they'll be called several times later
Class<?> clazz = bean.getClass();
String methodName = executeMethod.getName();
if (name.trim().length() == 0) {
    throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + clazz + "#" + methodName + "] .");
}
if (loadJobHandler(name) != null) {
    throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.");
}

然后進(jìn)行遍歷注冊;開始進(jìn)行名字判斷:

  1. 判斷bean名字是否為空
  2. 判斷bean是否被注冊了(存在了)

loadJobHandler校驗方式會去該方法中查找:當(dāng)bean注冊完成后時存放到jobHandlerRepository一個map類型中;

private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();
public static IJobHandler loadJobHandler(String name){
    return jobHandlerRepository.get(name);
}

executeMethod.setAccessible(true);它實現(xiàn)了修改對象訪問權(quán)限的功能,參數(shù)為true,則表示允許調(diào)用方在使用反射時忽略Java語言的訪問控制檢查.

往后走會判斷該注解的生命周期方法(init和destroy)

  1. 未設(shè)置生命周期,則直接開始注冊
//注意MethodJobHandler,后面會用到
registJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod));
//添加執(zhí)行器名字及對應(yīng)的hob方法信息(當(dāng)前類、方法、init和destroy屬性)
public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){
    logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);
    return jobHandlerRepository.put(name, jobHandler);
}
  1. 有生命周期,設(shè)置init和destroy方法權(quán)限
if (xxlJob.init().trim().length() > 0) {
    try {
        initMethod = clazz.getDeclaredMethod(xxlJob.init());
        initMethod.setAccessible(true);
    } catch (NoSuchMethodException e) {
        throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + clazz + "#" + methodName + "] .");
    }
}
if (xxlJob.destroy().trim().length() > 0) {
    try {
        destroyMethod = clazz.getDeclaredMethod(xxlJob.destroy());
        destroyMethod.setAccessible(true);
    } catch (NoSuchMethodException e) {
        throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + clazz + "#" + methodName + "] .");
    }
}

首先檢查@XxlJob注解中的init屬性是否存在且不為空。如果存在,則嘗試獲取該類中名為init的方法,并將其設(shè)置為可訪問狀態(tài),以便后續(xù)調(diào)用。

同理,代碼接下來也檢查了@XxlJob注解中的destroy屬性是否存在且不為空,如果是,則獲取該類中名為destroy的方法,并設(shè)置其為可訪問狀態(tài)。

在這個過程中,如果某個方法不存在或者無法被訪問,則會拋出NoSuchMethodException異常,并且使用throw new RuntimeException將其包裝并拋出一個運(yùn)行時異常。這樣做的目的是為了提醒開發(fā)人員在任務(wù)處理器類中正確地設(shè)置init和destroy屬性,并確保方法名稱與屬性值一致。

執(zhí)行器的注冊過程

部分流程圖:

public void afterSingletonsInstantiated() {

    // init JobHandler Repository
    /*initJobHandlerRepository(applicationContext);*/

    // init JobHandler Repository (for method)
    initJobHandlerMethodRepository(applicationContext);

    // refresh GlueFactory
    GlueFactory.refreshInstance(1);

    // super start
    try {
        super.start();
    } catch (Exception e) {
        throw new RuntimeException(e);
    }
}

在掃描完執(zhí)行器中所有的任務(wù)后,開始進(jìn)行執(zhí)行器注冊XxlJobSpringExecutor中的super.start() 方法。

在初始化執(zhí)行服務(wù)器啟動之前,進(jìn)行了四種操作,初始化日志、初始化adminBizList地址(可視化管理臺地址)、初始化日志清除、初始化回調(diào)線程等。

這里需要注意的是第二步初始化地址,在初始化服務(wù)器啟動的時候需要用到。

private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {

    // fill ip port
    port = port>0?port: NetUtil.findAvailablePort(9999);
    ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();

    // generate address
    if (address==null || address.trim().length()==0) {
        String ip_port_address = IpUtil.getIpPort(ip, port);   // registry-address:default use address to registry , otherwise use ip:port if address is null
        address = "http://{ip_port}/".replace("{ip_port}", ip_port_address);
    }

    // accessToken
    if (accessToken==null || accessToken.trim().length()==0) {
        logger.warn(">>>>>>>>>>> xxl-job accessToken is empty. To ensure system security, please set the accessToken.");
    }

    // start
    embedServer = new EmbedServer();
    embedServer.start(address, port, appname, accessToken);
}

繼續(xù)到initEmbedServer,開始初始化ip地址和端口等,需要明白的是,這一步的參數(shù)獲取方式其實是第一步讀取**XxlJobConfig**獲得的;進(jìn)行ip的校驗和拼接等操作,開始進(jìn)行真正的注冊。

創(chuàng)建一個嵌入式的HTTP服務(wù)器,將當(dāng)前執(zhí)行器信息(包含應(yīng)用名稱和IP地址端口等)注冊到注冊中心,注冊方式的實現(xiàn)在ExecutorRegistryThread中實現(xiàn)。

校驗名字和注冊中心,如果注冊中心不可用,則等待一段時間后重新嘗試連接。

// registry
while (!toStop) {
    try {
        RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
        for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
            try {
                ReturnT<String> registryResult = adminBiz.registry(registryParam);
                if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
                    registryResult = ReturnT.SUCCESS;
                    logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                    break;
                } else {
                    logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
                }
            } catch (Exception e) {
                logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);
            }

        }
    } catch (Exception e) {
        if (!toStop) {
            logger.error(e.getMessage(), e);
        }

    }

    try {
        //心跳檢測,默認(rèn)30s
        if (!toStop) {
            TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
        }
    } catch (InterruptedException e) {
        if (!toStop) {
            logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:{}", e.getMessage());
        }
    }
}

開啟一個新線程,首先構(gòu)建注冊參數(shù)(包含執(zhí)行器分組、執(zhí)行器名字、執(zhí)行器本地地址及端口號),遍歷注冊中心地址,開始進(jìn)行執(zhí)行器注冊,注冊方式通過發(fā)送http的post請求。

@Override
public ReturnT<String> registry(RegistryParam registryParam) {
    return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, timeout, registryParam, String.class);
}

debug的過程中,XxlJobRemotingUtil 執(zhí)行到int statusCode = connection.getResponseCode();才會跳轉(zhuǎn)到JobApiController.api中的注冊地址.

// services mapping
if ("callback".equals(uri)) {
    List<HandleCallbackParam> callbackParamList = GsonTool.fromJson(data, List.class, HandleCallbackParam.class);
    return adminBiz.callback(callbackParamList);
} else if ("registry".equals(uri)) {
    RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);
    return adminBiz.registry(registryParam);
} else if ("registryRemove".equals(uri)) {
    RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);
    return adminBiz.registryRemove(registryParam);
} else {
    return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");
}

最后進(jìn)入到JobRegistryHelper.registry()方法中完成數(shù)據(jù)庫的入庫和更新操作。

通過更新語句判斷該執(zhí)行器是否注冊,結(jié)果小于1,那么保存注冊器信息,并向注冊中心發(fā)送一個請求,更新當(dāng)前執(zhí)行器所屬的應(yīng)用名稱、執(zhí)行器名稱和 IP 地址等信息,否則跳過。

public ReturnT<String> registry(RegistryParam registryParam) {
	//.......
    // async execute
    registryOrRemoveThreadPool.execute(new Runnable() {
        @Override
        public void run() {
            //更新注冊表信息
            int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
            if (ret < 1) {
                //保存執(zhí)行器注冊信息
                XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());

                // fresh 刷新執(zhí)行器狀態(tài)
                freshGroupRegistryInfo(registryParam);
            }
        }
    });

    return ReturnT.SUCCESS;
}

至此執(zhí)行器的注冊流程分析完成。

執(zhí)行器中的任務(wù)執(zhí)行過程

部分流程圖:

執(zhí)行器中的任務(wù)流程比較簡單,如果執(zhí)行器啟動的話,那么每次執(zhí)行任務(wù)是通過JobThread通過Cron 表達(dá)式進(jìn)行操作的。

通過handler.execute()進(jìn)行執(zhí)行,是在框架內(nèi)部通過反射機(jī)制調(diào)用作業(yè)處理器對象 handler 中的 execute() 方法實現(xiàn)的。在這個過程中,handler 對象表示被加載的作業(yè)處理器,并且已經(jīng)調(diào)用了init()方法進(jìn)行初始化。

method.invoke() 方法使用反射機(jī)制調(diào)用指定對象 target 中的方法 method。在這個方法中,target 表示作業(yè)處理器對象,method 表示作業(yè)處理器中的 execute() 方法。

通過上述方法,獲取到SampleXxlJob.demoJobHandler的任務(wù),然后開始進(jìn)行任務(wù)邏輯操作。文章來源地址http://www.zghlxwxcb.cn/news/detail-421596.html

到了這里,關(guān)于【源碼分析】XXL-JOB的執(zhí)行器的注冊流程的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 【2023】XXL-Job 具體通過docker 配置安裝容器,再通過springboot執(zhí)行注冊實現(xiàn)完整流程

    【2023】XXL-Job 具體通過docker 配置安裝容器,再通過springboot執(zhí)行注冊實現(xiàn)完整流程

    在平時的業(yè)務(wù)場景中,經(jīng)常有一些場景需要使用定時任務(wù),比如: 時間驅(qū)動的場景:某個時間點發(fā)送優(yōu)惠券,發(fā)送短信等等。 批量處理數(shù)據(jù):批量統(tǒng)計上個月的賬單,統(tǒng)計上個月銷售數(shù)據(jù)等等。 固定頻率的場景:每隔5分鐘需要執(zhí)行一次。 在Java中,傳統(tǒng)的定時任務(wù)實現(xiàn)方案

    2024年02月14日
    瀏覽(17)
  • 分布式定時任務(wù)系列8:XXL-job源碼分析之遠(yuǎn)程調(diào)用

    分布式定時任務(wù)系列8:XXL-job源碼分析之遠(yuǎn)程調(diào)用

    分布式定時任務(wù)系列1:XXL-job安裝 分布式定時任務(wù)系列2:XXL-job使用 分布式定時任務(wù)系列3:任務(wù)執(zhí)行引擎設(shè)計 分布式定時任務(wù)系列4:任務(wù)執(zhí)行引擎設(shè)計續(xù) 分布式定時任務(wù)系列5:XXL-job中blockingQueue的應(yīng)用 分布式定時任務(wù)系列6:XXL-job觸發(fā)日志過大引發(fā)的CPU告警 分布式定時任

    2024年01月21日
    瀏覽(79)
  • xxl-job重復(fù)執(zhí)行問題

    xxl-job重復(fù)執(zhí)行問題

    一,xxl-job服務(wù) 集群部署(兩臺) 二,問題描述 有一個job1min執(zhí)行一次,查詢一個中間表數(shù)據(jù),循環(huán)發(fā)生esb,得到結(jié)果,更新表。當(dāng)數(shù)據(jù)量比較大時,會出現(xiàn)數(shù)據(jù)重復(fù)發(fā)送了esb,推測job重復(fù)執(zhí)行了,業(yè)務(wù)耗時比較長,超過了1min,下一個job時間到了,再次查詢,取到了同樣的數(shù)

    2023年04月09日
    瀏覽(41)
  • xxl-job遠(yuǎn)程命令執(zhí)行漏洞復(fù)現(xiàn)

    xxl-job遠(yuǎn)程命令執(zhí)行漏洞復(fù)現(xiàn)

    XXL-JOB是一個分布式任務(wù)調(diào)度平臺,其核心設(shè)計目標(biāo)是開發(fā)迅速、學(xué)習(xí)簡單、輕量級、易擴(kuò)展?,F(xiàn)已開放源代碼并接入多家公司線上產(chǎn)品線,開箱即用。XXL-JOB分為admin和executor兩端,前者為后臺管理頁面,后者是任務(wù)執(zhí)行的客戶端。executor默認(rèn)沒有配置認(rèn)證,未授權(quán)的攻擊者可

    2024年02月11日
    瀏覽(16)
  • XXL-JOB 任務(wù)調(diào)度中心 后臺任意命令執(zhí)行漏洞

    XXL-JOB 任務(wù)調(diào)度中心 后臺任意命令執(zhí)行漏洞

    在日常開發(fā)中,經(jīng)常會用定時任務(wù)執(zhí)行某些不緊急又非常重要的事情,例如批量結(jié)算,計算當(dāng)日的訂單量,當(dāng)日的成本收入等。當(dāng)存在大量定時任務(wù)的時候,任務(wù)的管理也會成為一個比較頭痛的問題。xxl-job,就是一個比較成熟的分布式任務(wù)調(diào)度平臺。XXL-JOB 任務(wù)調(diào)度中心系統(tǒng)

    2024年02月08日
    瀏覽(20)
  • 機(jī)械臂末端執(zhí)行器匯總

    水果采摘機(jī)器人能夠提高作業(yè)效率,降低生產(chǎn)成本,實現(xiàn)自動化采摘。末端執(zhí)行器是采摘機(jī)器人實現(xiàn)采摘的關(guān)鍵部件,與采摘目標(biāo)直接接觸,能夠把果實從作物上分離,負(fù)責(zé)引導(dǎo)采摘機(jī)器人定位目標(biāo)果實進(jìn)行采摘,直接影響采摘機(jī)器人的作業(yè)效率和果實損傷率,在提高采摘機(jī)

    2024年02月08日
    瀏覽(23)
  • Mybatis執(zhí)行器(Executor)

    Mybatis執(zhí)行器(Executor)

    Executor Executor是MyBatis的核心接口之一,其中定義了數(shù)據(jù)庫操作的基本方法。在實際應(yīng)用中經(jīng)常涉及的SqlSession接口的功能,都是基于Executor接口實現(xiàn)的。 BaseExecutor BaseExecutor是一個實現(xiàn)了Executor接口的抽象類,它實現(xiàn)了Executor 接口的大部分方法。BaseExecutor 中主要提供了緩存管理和

    2024年04月16日
    瀏覽(19)
  • 造個輪子-任務(wù)調(diào)度執(zhí)行小框架-任務(wù)清單執(zhí)行器實現(xiàn)

    造個輪子-任務(wù)調(diào)度執(zhí)行小框架-任務(wù)清單執(zhí)行器實現(xiàn)

    okey,上一篇文章我們提到了,如何實現(xiàn)它的一個清單的一個代理。這里的話我們來捋一捋我們的這個執(zhí)行流程是啥: 所以的話,我們的我們這里今天要做的是這個執(zhí)行器的一個執(zhí)行。當(dāng)然這里的話,我們也是分兩個部分,因為這個執(zhí)行器的話,是分兩個部分的,一個是正常的

    2024年02月13日
    瀏覽(25)
  • Junit執(zhí)行器Runner探索之旅

    單元測試是每個程序員必備的技能,而Runner是每個單元測試類必有屬性。本文通過解讀Junit源碼,介紹junit中每個執(zhí)行器的使用方法,讓讀者在單元測試時,可以靈活的使用Runner執(zhí)行器。 在今年的敏捷團(tuán)隊建設(shè)中,京東物流通過Suite執(zhí)行器實現(xiàn)了一鍵自動化單元測試。Juint除了

    2024年02月08日
    瀏覽(24)
  • PgSQL-執(zhí)行器機(jī)制-Unique算子

    PgSQL-執(zhí)行器機(jī)制-Unique算子

    PgSQL-執(zhí)行器機(jī)制-Unique算子 PgSQL中輸出去重的元組有多種方法,比如通過HashAgg或者GroupAgg。這里我們介紹第三種方法,通過Unique算子來完成這個功能。當(dāng)然語句上可以是:select distinct(id1) from t; 執(zhí)行器執(zhí)行算子的函數(shù)都是ExecXXX,其中XXX代表某個算子。Unique算子的執(zhí)行是由函數(shù)

    2024年02月07日
    瀏覽(17)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包