目的:分析xxl-job執(zhí)行器的注冊過程
流程:
- 獲取執(zhí)行器中所有被注解(
@xxlJjob
)修飾的handler
- 執(zhí)行器注冊過程
- 執(zhí)行器中任務(wù)執(zhí)行過程
版本:xxl-job 2.3.1
建議:下載xxl-job
源碼,按流程圖debug
調(diào)試,看堆棧信息并按文章內(nèi)容理解執(zhí)行流程。
完整流程圖:
查找Handler任務(wù)
部分流程圖:
首先啟動管理臺界面(服務(wù)XxlJobAdminApplication
),然后啟動項目中給的執(zhí)行器實例(SpringBoot)
;
這個方法是掃描項目中使用@xxlJob
注解的所有handler方法。接著往下走
private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {
if (applicationContext == null) {
return;
}
//獲取該項目中所有的bean,然后遍歷
String[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);
for (String beanDefinitionName : beanDefinitionNames) {
Object bean = applicationContext.getBean(beanDefinitionName);
Map<Method, XxlJob> annotatedMethods = null; // referred to :org.springframework.context.event.EventListenerMethodProcessor.processBean
try {
annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),
new MethodIntrospector.MetadataLookup<XxlJob>() {
//注意點★
@Override
public XxlJob inspect(Method method) {
return AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class);
}
});
} catch (Throwable ex) {
logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", ex);
}
//沒有跳過本次循環(huán)繼續(xù)
if (annotatedMethods==null || annotatedMethods.isEmpty()) {
continue;
}
//獲取了當(dāng)前執(zhí)行器中所有@xxl-job的方法,獲取方法以及對應(yīng)的初始化和銷毀方法
for (Map.Entry<Method, XxlJob> methodXxlJobEntry : annotatedMethods.entrySet()) {
Method executeMethod = methodXxlJobEntry.getKey();
XxlJob xxlJob = methodXxlJobEntry.getValue();
// regist
registJobHandler(xxlJob, bean, executeMethod);
}
}
}
在Spring
案例執(zhí)行器中有5個handler
:
XxlJobExecutor.registJobHandler()中部分源碼
String name = xxlJob.value();
//make and simplify the variables since they'll be called several times later
Class<?> clazz = bean.getClass();
String methodName = executeMethod.getName();
if (name.trim().length() == 0) {
throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + clazz + "#" + methodName + "] .");
}
if (loadJobHandler(name) != null) {
throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.");
}
然后進(jìn)行遍歷注冊;開始進(jìn)行名字判斷:
- 判斷bean名字是否為空
- 判斷bean是否被注冊了(存在了)
loadJobHandler
校驗方式會去該方法中查找:當(dāng)bean注冊完成后時存放到jobHandlerRepository
一個map
類型中;
private static ConcurrentMap<String, IJobHandler> jobHandlerRepository = new ConcurrentHashMap<String, IJobHandler>();
public static IJobHandler loadJobHandler(String name){
return jobHandlerRepository.get(name);
}
executeMethod.setAccessible(true);
它實現(xiàn)了修改對象訪問權(quán)限的功能,參數(shù)為true,則表示允許調(diào)用方在使用反射時忽略Java語言的訪問控制檢查.
往后走會判斷該注解的生命周期方法(init和destroy
)
- 未設(shè)置生命周期,則直接開始注冊
//注意MethodJobHandler,后面會用到
registJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod));
//添加執(zhí)行器名字及對應(yīng)的hob方法信息(當(dāng)前類、方法、init和destroy屬性)
public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){
logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);
return jobHandlerRepository.put(name, jobHandler);
}
- 有生命周期,設(shè)置init和destroy方法權(quán)限
if (xxlJob.init().trim().length() > 0) {
try {
initMethod = clazz.getDeclaredMethod(xxlJob.init());
initMethod.setAccessible(true);
} catch (NoSuchMethodException e) {
throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + clazz + "#" + methodName + "] .");
}
}
if (xxlJob.destroy().trim().length() > 0) {
try {
destroyMethod = clazz.getDeclaredMethod(xxlJob.destroy());
destroyMethod.setAccessible(true);
} catch (NoSuchMethodException e) {
throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + clazz + "#" + methodName + "] .");
}
}
首先檢查@XxlJob
注解中的init
屬性是否存在且不為空。如果存在,則嘗試獲取該類中名為init
的方法,并將其設(shè)置為可訪問狀態(tài),以便后續(xù)調(diào)用。
同理,代碼接下來也檢查了@XxlJob
注解中的destroy
屬性是否存在且不為空,如果是,則獲取該類中名為destroy
的方法,并設(shè)置其為可訪問狀態(tài)。
在這個過程中,如果某個方法不存在或者無法被訪問,則會拋出NoSuchMethodException
異常,并且使用throw new RuntimeException
將其包裝并拋出一個運(yùn)行時異常。這樣做的目的是為了提醒開發(fā)人員在任務(wù)處理器類中正確地設(shè)置init和destroy
屬性,并確保方法名稱與屬性值一致。
執(zhí)行器的注冊過程
部分流程圖:
public void afterSingletonsInstantiated() {
// init JobHandler Repository
/*initJobHandlerRepository(applicationContext);*/
// init JobHandler Repository (for method)
initJobHandlerMethodRepository(applicationContext);
// refresh GlueFactory
GlueFactory.refreshInstance(1);
// super start
try {
super.start();
} catch (Exception e) {
throw new RuntimeException(e);
}
}
在掃描完執(zhí)行器中所有的任務(wù)后,開始進(jìn)行執(zhí)行器注冊XxlJobSpringExecutor中的super.start()
方法。
在初始化執(zhí)行服務(wù)器啟動之前,進(jìn)行了四種操作,初始化日志、初始化adminBizList
地址(可視化管理臺地址)、初始化日志清除、初始化回調(diào)線程等。
這里需要注意的是第二步初始化地址,在初始化服務(wù)器啟動的時候需要用到。
private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {
// fill ip port
port = port>0?port: NetUtil.findAvailablePort(9999);
ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();
// generate address
if (address==null || address.trim().length()==0) {
String ip_port_address = IpUtil.getIpPort(ip, port); // registry-address:default use address to registry , otherwise use ip:port if address is null
address = "http://{ip_port}/".replace("{ip_port}", ip_port_address);
}
// accessToken
if (accessToken==null || accessToken.trim().length()==0) {
logger.warn(">>>>>>>>>>> xxl-job accessToken is empty. To ensure system security, please set the accessToken.");
}
// start
embedServer = new EmbedServer();
embedServer.start(address, port, appname, accessToken);
}
繼續(xù)到initEmbedServer
,開始初始化ip地址和端口等,需要明白的是,這一步的參數(shù)獲取方式其實是第一步讀取**XxlJobConfig**
獲得的;進(jìn)行ip的校驗和拼接等操作,開始進(jìn)行真正的注冊。
創(chuàng)建一個嵌入式的HTTP服務(wù)器,將當(dāng)前執(zhí)行器信息(包含應(yīng)用名稱和IP地址端口等)注冊到注冊中心,注冊方式的實現(xiàn)在ExecutorRegistryThread
中實現(xiàn)。
校驗名字和注冊中心,如果注冊中心不可用,則等待一段時間后重新嘗試連接。
// registry
while (!toStop) {
try {
RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);
for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {
try {
ReturnT<String> registryResult = adminBiz.registry(registryParam);
if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {
registryResult = ReturnT.SUCCESS;
logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
break;
} else {
logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});
}
} catch (Exception e) {
logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);
}
}
} catch (Exception e) {
if (!toStop) {
logger.error(e.getMessage(), e);
}
}
try {
//心跳檢測,默認(rèn)30s
if (!toStop) {
TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
}
} catch (InterruptedException e) {
if (!toStop) {
logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:{}", e.getMessage());
}
}
}
開啟一個新線程,首先構(gòu)建注冊參數(shù)(包含執(zhí)行器分組、執(zhí)行器名字、執(zhí)行器本地地址及端口號),遍歷注冊中心地址,開始進(jìn)行執(zhí)行器注冊,注冊方式通過發(fā)送http的post請求。
@Override
public ReturnT<String> registry(RegistryParam registryParam) {
return XxlJobRemotingUtil.postBody(addressUrl + "api/registry", accessToken, timeout, registryParam, String.class);
}
在debug
的過程中,XxlJobRemotingUtil
執(zhí)行到int statusCode = connection.getResponseCode();
才會跳轉(zhuǎn)到JobApiController.api
中的注冊地址.
// services mapping
if ("callback".equals(uri)) {
List<HandleCallbackParam> callbackParamList = GsonTool.fromJson(data, List.class, HandleCallbackParam.class);
return adminBiz.callback(callbackParamList);
} else if ("registry".equals(uri)) {
RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);
return adminBiz.registry(registryParam);
} else if ("registryRemove".equals(uri)) {
RegistryParam registryParam = GsonTool.fromJson(data, RegistryParam.class);
return adminBiz.registryRemove(registryParam);
} else {
return new ReturnT<String>(ReturnT.FAIL_CODE, "invalid request, uri-mapping("+ uri +") not found.");
}
最后進(jìn)入到JobRegistryHelper.registry()
方法中完成數(shù)據(jù)庫的入庫和更新操作。
通過更新語句判斷該執(zhí)行器是否注冊,結(jié)果小于1,那么保存注冊器信息,并向注冊中心發(fā)送一個請求,更新當(dāng)前執(zhí)行器所屬的應(yīng)用名稱、執(zhí)行器名稱和 IP 地址等信息,否則跳過。
public ReturnT<String> registry(RegistryParam registryParam) {
//.......
// async execute
registryOrRemoveThreadPool.execute(new Runnable() {
@Override
public void run() {
//更新注冊表信息
int ret = XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registryUpdate(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
if (ret < 1) {
//保存執(zhí)行器注冊信息
XxlJobAdminConfig.getAdminConfig().getXxlJobRegistryDao().registrySave(registryParam.getRegistryGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue(), new Date());
// fresh 刷新執(zhí)行器狀態(tài)
freshGroupRegistryInfo(registryParam);
}
}
});
return ReturnT.SUCCESS;
}
至此執(zhí)行器的注冊流程分析完成。
執(zhí)行器中的任務(wù)執(zhí)行過程
部分流程圖:
執(zhí)行器中的任務(wù)流程比較簡單,如果執(zhí)行器啟動的話,那么每次執(zhí)行任務(wù)是通過JobThread
通過Cron
表達(dá)式進(jìn)行操作的。
通過handler.execute()
進(jìn)行執(zhí)行,是在框架內(nèi)部通過反射機(jī)制調(diào)用作業(yè)處理器對象 handler
中的 execute()
方法實現(xiàn)的。在這個過程中,handler 對象表示被加載的作業(yè)處理器,并且已經(jīng)調(diào)用了init()
方法進(jìn)行初始化。
method.invoke()
方法使用反射機(jī)制調(diào)用指定對象 target
中的方法 method
。在這個方法中,target
表示作業(yè)處理器對象,method
表示作業(yè)處理器中的 execute()
方法。文章來源:http://www.zghlxwxcb.cn/news/detail-421596.html
通過上述方法,獲取到SampleXxlJob.demoJobHandler
的任務(wù),然后開始進(jìn)行任務(wù)邏輯操作。文章來源地址http://www.zghlxwxcb.cn/news/detail-421596.html
到了這里,關(guān)于【源碼分析】XXL-JOB的執(zhí)行器的注冊流程的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!