作者:Mars醬
聲明:本文章由Mars醬編寫,部分內(nèi)容來源于網(wǎng)絡(luò),如有疑問請聯(lián)系本人。
轉(zhuǎn)載:歡迎轉(zhuǎn)載,轉(zhuǎn)載前先請聯(lián)系我!
前言
ElasticJob 是面向互聯(lián)網(wǎng)生態(tài)和海量任務(wù)的分布式調(diào)度解決方案。 它通過彈性調(diào)度、資源管控、以及任務(wù)治理的功能,打造一個(gè)適用于互聯(lián)網(wǎng)場景的分布式調(diào)度解決方案,并通過開放的架構(gòu)設(shè)計(jì),提供多元化的任務(wù)生態(tài)。 它的各個(gè)產(chǎn)品使用統(tǒng)一的任務(wù) API,開發(fā)者僅需一次開發(fā),即可隨意部署。
架構(gòu)
elasticjob由兩個(gè)相互獨(dú)立的子項(xiàng)目 ElasticJob-Lite 和 ElasticJob-Cloud 組成組成,這是ElasticJob-Lite 的架構(gòu)圖:
從架構(gòu)圖可以看到,左上角App1和App2兩個(gè)業(yè)務(wù)模塊中的Elastic-Job往zk中注冊了信息,右邊的Elastic-Job-Lite是監(jiān)聽了zk的,因此,整個(gè)任務(wù)的調(diào)度是由zk來完成的。下面的console通過Rest API去獲取zk中的信息,得到調(diào)度數(shù)據(jù)和日志,并存盤。
這是ElasticJob-Cloud的架構(gòu)圖:
ElasticJob-Cloud的調(diào)度是依賴Mesos的,從架構(gòu)圖的理解,Mesos和zk結(jié)合做好任務(wù)調(diào)度,再分發(fā)給Mesos的代理并執(zhí)行。
功能和特性
以下是ElasticJob的特性優(yōu)點(diǎn)
- 支持任務(wù)在分布式場景下的分片和高可用
- 能夠水平擴(kuò)展任務(wù)的吞吐量和執(zhí)行效率
- 任務(wù)處理能力隨資源配備彈性伸縮
- 優(yōu)化任務(wù)和資源調(diào)度
- 相同任務(wù)聚合至相同的執(zhí)行器統(tǒng)一處理
- 動(dòng)態(tài)調(diào)配追加資源至新分配的任務(wù)
- 失效轉(zhuǎn)移
- 錯(cuò)過任務(wù)重新執(zhí)行
- 分布式環(huán)境下任務(wù)自動(dòng)診斷和修復(fù)
- 基于有向無環(huán)圖 (DAG) 的任務(wù)依賴
- 基于有向無環(huán)圖 (DAG) 的任務(wù)項(xiàng)目依賴
- 可擴(kuò)展的任務(wù)類型統(tǒng)一接口
- 支持豐富的任務(wù)類型庫–包括數(shù)據(jù)流、腳本、HTTP、文件、大數(shù)據(jù)
- 易于對接業(yè)務(wù)任務(wù)–兼容 Spring IOC
- 任務(wù)管控端
- 任務(wù)事件追蹤
- 注冊中心管理
入門角色
既然這么多優(yōu)點(diǎn),我們就入門試試吧。入門elasticjob-lite也繼承了Quartz框架,同樣的很簡單,只要三個(gè)角色:
SimpleJob
:任務(wù)主體。如果用過Quartz,那么應(yīng)該能夠理解這個(gè),基本上和Quartz的Job接口類似,只要實(shí)現(xiàn)一個(gè)execute方法就行了,入門用這個(gè)就行;
JobConfiguration
:任務(wù)配置。同樣的可以理解為類似Quartz框架中的Trigger,最重要的就是配置任務(wù)的執(zhí)行頻率;
ScheduleJobBootstrap
:調(diào)度主體。這個(gè)一樣,參考Quartz框架中的Scheduler對象,它把任務(wù)和配置結(jié)合起來,任務(wù)按照配置中的頻率執(zhí)行。
寫個(gè)例子
我們創(chuàng)建這三種角色,首先創(chuàng)建任務(wù)主體:
import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;
/**
* (這個(gè)類的說明)
*
* @author mars醬
*/
public class MarsSimpleJob implements SimpleJob {
@Override
public void execute(final ShardingContext shardingContext) {
System.out.printf("Item: %s | Time: %s | Thread: %s | %s%n",
shardingContext.getShardingItem(),
new SimpleDateFormat("HH:mm:ss").format(new Date()),
Thread.currentThread().getId(),
"就是這么簡單~");
}
}
再創(chuàng)建任務(wù)配置:
import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.tracing.api.TracingConfiguration;
import javax.sql.DataSource;
import java.util.Objects;
/**
* (這個(gè)類的說明)
*
* @author mars醬
*/
public class JobConfigurationBuilder {
public static JobConfiguration buildJobConfiguration(String jobName, String cronExpression, TracingConfiguration<DataSource> tracingConfig) {
JobConfiguration.Builder builder = JobConfiguration.newBuilder(jobName, 3)
.cron(cronExpression)
.shardingItemParameters("0=a,1=b,2=c");
if (Objects.nonNull(tracingConfig)) {
builder.addExtraConfigurations(tracingConfig);
}
return builder.build();
}
}
最后創(chuàng)建調(diào)度器,并執(zhí)行:
import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;
import org.apache.shardingsphere.elasticjob.lite.example.job.simple.JavaSimpleJob;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;
import org.apache.shardingsphere.elasticjob.tracing.api.TracingConfiguration;
import javax.sql.DataSource;
/**
* (這個(gè)類的說明)
*
* @author mars醬
*/
public final class SchedulerMain {
private static final int EMBED_ZOOKEEPER_PORT = 4181;
private static final String ZOOKEEPER_CONNECTION_STRING = "localhost:" + EMBED_ZOOKEEPER_PORT;
private static final String JOB_NAMESPACE = "elasticjob-marsz-lite-java";
// CHECKSTYLE:OFF
public static void main(final String[] args) {
// 內(nèi)嵌zk服務(wù)
EmbedZookeeperServer.start(EMBED_ZOOKEEPER_PORT);
CoordinatorRegistryCenter regCenter = setUpRegistryCenter();
// 簡單作業(yè)
setUpSimpleJob(regCenter, null);
}
private static CoordinatorRegistryCenter setUpRegistryCenter() {
ZookeeperConfiguration zkConfig = new ZookeeperConfiguration(ZOOKEEPER_CONNECTION_STRING, JOB_NAMESPACE);
CoordinatorRegistryCenter result = new ZookeeperRegistryCenter(zkConfig);
result.init();
return result;
}
private static void setUpSimpleJob(final CoordinatorRegistryCenter regCenter, final TracingConfiguration<DataSource> tracingConfig) {
new ScheduleJobBootstrap(regCenter,
new MarsSimpleJob(),
JobConfigurationBuilder.buildJobConfiguration("marsSimpleJob", "0/5 * * * * ?", tracingConfig)).schedule();
}
}
運(yùn)行的效果:
截圖中Item
是處理的分片項(xiàng),Thread
是當(dāng)前線程的id,看到了Quartz框架的影子…。
任務(wù)執(zhí)行流程
既然能成功運(yùn)行,我們看看內(nèi)部的處理邏輯吧。Mars醬本機(jī)并沒有安裝zk,所以copy了官方的例子,在程序運(yùn)行前先啟用了一個(gè)內(nèi)嵌的zk服務(wù):
EmbedZookeeperServer.start(EMBED_ZOOKEEPER_PORT);
這個(gè)只能在模擬的時(shí)候使用,千萬不能拿去放生產(chǎn)環(huán)境。接下來就是注冊中心的配置了,我們需要的是CoordinatorRegistryCenter對象:
private static CoordinatorRegistryCenter setUpRegistryCenter() {
ZookeeperConfiguration zkConfig = new ZookeeperConfiguration(ZOOKEEPER_CONNECTION_STRING, JOB_NAMESPACE);
CoordinatorRegistryCenter result = new ZookeeperRegistryCenter(zkConfig);
result.init();
return result;
}
好了,zk的部分處理完成,下面就是直接SchedulerJobBootstrap的部分了。
ScheduleJobBootstrap初始化
ScheduleJobBootstrap的初始化在例子中需要三個(gè)參數(shù):
CoordinatorRegistryCenter
:這個(gè)是協(xié)調(diào)用的注冊中心。是一個(gè)接口類,它的實(shí)現(xiàn)在ElasticJob里面只有一個(gè)ZookeeperRegisterCenter對象,未來是不是會(huì)支持其他的注冊中心呢?
ElasticJob
: Mars醬理解為任務(wù)對象。但是ElasticJob這個(gè)對象本身是個(gè)空接口,有兩個(gè)子接口SimpleJob
和DataflowJob
,前者M(jìn)ars醬的理解是和Quartz中的Job對象類似,只要實(shí)現(xiàn)execute函數(shù)就行,后者有需要實(shí)現(xiàn)兩個(gè)接口,一個(gè)fetchData
獲取數(shù)據(jù),一個(gè)processData
處理數(shù)據(jù)。所以,ElasticJob這個(gè)接口留空,是為了還有其他擴(kuò)展吧?
JobConfiguration
:彈性任務(wù)配置項(xiàng)。構(gòu)建這個(gè)對象不能直接設(shè)置,只能用buider的方式構(gòu)建。需要配置的屬性很多,但是核心屬性大致就是幾個(gè):任務(wù)名稱、分片數(shù)、執(zhí)行頻率、分片參數(shù)。JobConfiguration的所有屬性如下:
屬性名 | 說明 |
---|---|
String jobName | 任務(wù)名稱 |
String cron | cron表達(dá)式 |
String timeZone | 任務(wù)運(yùn)行的時(shí)區(qū) |
int shardingTotalCount | 任務(wù)分片總數(shù) |
String shardingItemParameters | 分片序號和參數(shù),多個(gè)鍵值對之間用逗號分隔,從0開始,但是不能大于或等于任務(wù)分片的總數(shù) |
String jobParameter | 任務(wù)自定義任務(wù)參數(shù) |
boolean monitorExecution | 是否監(jiān)聽執(zhí)行 |
boolean failover | 是否啟用故障轉(zhuǎn)移。開啟表示如果任務(wù)在一次任務(wù)執(zhí)行中途宕機(jī),允許將該次未完成的任務(wù)在另一任務(wù)節(jié)點(diǎn)上補(bǔ)償執(zhí)行 |
boolean misfire | 不發(fā)火。哈哈,其實(shí)是是否開啟錯(cuò)過任務(wù)重新執(zhí)行 |
int maxTimeDiffSeconds | 最大時(shí)差 |
int reconcileIntervalMinutes | 間隔時(shí)長 |
String jobShardingStrategyType | 任務(wù)分片策略類型,總共三種 |
String jobExecutorServiceHandlerType | 任務(wù)執(zhí)行程序服務(wù)處理程序類型 |
String jobErrorHandlerType | 任務(wù)錯(cuò)誤處理類型 |
Collection jobListenerTypes | 任務(wù)監(jiān)聽類型 |
Collection extraConfigurations | 附加配置信息 |
String description | 任務(wù)描述 |
Properties props | 擴(kuò)展用屬性值 |
boolean disabled | 是否禁用 |
boolean overwrite | 是否覆蓋 |
String label | 標(biāo)簽 |
boolean staticSharding | 是否支持靜態(tài)分片 |
ScheduleJobBootstrap執(zhí)行
同樣的,例子中的MarsSimpleJob的execute函數(shù),最終會(huì)被ElasticJob框架調(diào)用,我們按照被執(zhí)行的反向順序往上找。MarsSimpleJob是繼承SimpleJob
的, 而SimpleJob
的execute函數(shù)是被SimpleJobExecutor
所調(diào)用:
/**
* Simple job executor.
*/
public final class SimpleJobExecutor implements ClassedJobItemExecutor<SimpleJob> {
@Override
public void process(final SimpleJob elasticJob, final JobConfiguration jobConfig, final JobFacade jobFacade, final ShardingContext shardingContext) {
// 這里調(diào)用execute函數(shù)
elasticJob.execute(shardingContext);
}
@Override
public Class<SimpleJob> getElasticJobClass() {
return SimpleJob.class;
}
}
再繼續(xù)往上找,process的核心流程就是在ElasticJobExecutor
里面了,調(diào)用process的部分在ElasticJobExcutor
中幾個(gè)重載的process方法調(diào)用的,兩個(gè)process函數(shù)完成不同的功能,調(diào)用SimpleExecutor的process部分是這樣:
@SuppressWarnings("unchecked")
private void process(final JobConfiguration jobConfig, final ShardingContexts shardingContexts, final int item, final JobExecutionEvent startEvent) {
jobFacade.postJobExecutionEvent(startEvent);
log.trace("Job '{}' executing, item is: '{}'.", jobConfig.getJobName(), item);
JobExecutionEvent completeEvent;
try {
// 這里調(diào)用SimpleJobExecutor的process
jobItemExecutor.process(elasticJob, jobConfig, jobFacade, shardingContexts.createShardingContext(item));
completeEvent = startEvent.executionSuccess();
log.trace("Job '{}' executed, item is: '{}'.", jobConfig.getJobName(), item);
jobFacade.postJobExecutionEvent(completeEvent);
// CHECKSTYLE:OFF
} catch (final Throwable cause) {
// CHECKSTYLE:ON
completeEvent = startEvent.executionFailure(ExceptionUtils.transform(cause));
jobFacade.postJobExecutionEvent(completeEvent);
itemErrorMessages.put(item, ExceptionUtils.transform(cause));
JobErrorHandler jobErrorHandler = executorContext.get(JobErrorHandler.class);
jobErrorHandler.handleException(jobConfig.getJobName(), cause);
}
}
上面這個(gè)process負(fù)責(zé)最終任務(wù)的執(zhí)行部分,由JobItemExecutor對象調(diào)用,SimpleJobExecutor被JobItemExecutor接口定義。整個(gè)這個(gè)proces由guava包的EventBus處理消息事件,執(zhí)行之前有startEvent,執(zhí)行完成有completeEvent,異常也有對應(yīng)的失敗event,方面架構(gòu)圖中存盤事件日志、ELK日志收集動(dòng)作。
調(diào)用這個(gè)process的部分,由另一個(gè)process完成,長這樣的:
private void process(final JobConfiguration jobConfig, final ShardingContexts shardingContexts, final ExecutionSource executionSource) {
Collection<Integer> items = shardingContexts.getShardingItemParameters().keySet();
if (1 == items.size()) {
int item = shardingContexts.getShardingItemParameters().keySet().iterator().next();
JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(IpUtils.getHostName(), IpUtils.getIp(), shardingContexts.getTaskId(), jobConfig.getJobName(), executionSource, item);
process(jobConfig, shardingContexts, item, jobExecutionEvent);
return;
}
CountDownLatch latch = new CountDownLatch(items.size());
for (int each : items) {
JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(IpUtils.getHostName(), IpUtils.getIp(), shardingContexts.getTaskId(), jobConfig.getJobName(), executionSource, each);
ExecutorService executorService = executorContext.get(ExecutorService.class);
if (executorService.isShutdown()) {
return;
}
// 提交給線程池執(zhí)行
executorService.submit(() -> {
try {
process(jobConfig, shardingContexts, each, jobExecutionEvent);
} finally {
latch.countDown();
}
});
}
try {
latch.await();
} catch (final InterruptedException ex) {
Thread.currentThread().interrupt();
}
}
上面這個(gè)process負(fù)責(zé)把分片參數(shù)依次組裝好,設(shè)置好JobExecutionEvent中的ip、主機(jī)名等參數(shù),然后放入線程池中去執(zhí)行。再往上,看現(xiàn)在這個(gè)process被調(diào)用的部分:
private void execute(final JobConfiguration jobConfig, final ShardingContexts shardingContexts, final ExecutionSource executionSource) {
if (shardingContexts.getShardingItemParameters().isEmpty()) {
jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format("Sharding item for job '%s' is empty.", jobConfig.getJobName()));
return;
}
// 往注冊中心注冊ShardingContexts信息
jobFacade.registerJobBegin(shardingContexts);
String taskId = shardingContexts.getTaskId();
// 發(fā)送跟蹤日志,標(biāo)記任務(wù)正在運(yùn)行
jobFacade.postJobStatusTraceEvent(taskId, State.TASK_RUNNING, "");
try {
// 調(diào)用process
process(jobConfig, shardingContexts, executionSource);
} finally {
// TODO Consider increasing the status of job failure, and how to handle the overall loop of job failure
// 告知注冊中心任務(wù)完成
jobFacade.registerJobCompleted(shardingContexts);
if (itemErrorMessages.isEmpty()) {
// 沒有失敗信息,通知任務(wù)完成
jobFacade.postJobStatusTraceEvent(taskId, State.TASK_FINISHED, "");
} else {
// 否則通知失敗
jobFacade.postJobStatusTraceEvent(taskId, State.TASK_ERROR, itemErrorMessages.toString());
itemErrorMessages.clear();
}
}
}
方法execute從注冊中心注冊ShardingContext信息,并發(fā)送跟蹤日志事件,然后調(diào)用process,最后發(fā)送跟蹤消息標(biāo)記任務(wù)完成。再有一個(gè)重載的execute方法調(diào)用上面這個(gè)execute方法,如下:
public void execute() {
// job的配置信息
JobConfiguration jobConfig = jobFacade.loadJobConfiguration(true);
executorContext.reloadIfNecessary(jobConfig);
JobErrorHandler jobErrorHandler = executorContext.get(JobErrorHandler.class);
try {
jobFacade.checkJobExecutionEnvironment();
} catch (final JobExecutionEnvironmentException cause) {
jobErrorHandler.handleException(jobConfig.getJobName(), cause);
}
// 這里有玄機(jī)
ShardingContexts shardingContexts = jobFacade.getShardingContexts();
// 發(fā)送時(shí)間消息總線
jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, String.format("Job '%s' execute begin.", jobConfig.getJobName()));
if (jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())) {
jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(),
State.TASK_FINISHED,
String.format(
"Previous job '%s' - shardingItems '%s' is still running, misfired job will start after previous job completed.",
jobConfig.getJobName(),
shardingContexts.getShardingItemParameters().keySet()));
return;
}
try {
// 任務(wù)執(zhí)行的前置流程
jobFacade.beforeJobExecuted(shardingContexts);
//CHECKSTYLE:OFF
} catch (final Throwable cause) {
//CHECKSTYLE:ON
jobErrorHandler.handleException(jobConfig.getJobName(), cause);
}
// 調(diào)用上面的execute方法
execute(jobConfig, shardingContexts, ExecutionSource.NORMAL_TRIGGER);
while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) {
jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet());
execute(jobConfig, shardingContexts, ExecutionSource.MISFIRE);
}
// 故障轉(zhuǎn)移
jobFacade.failoverIfNecessary();
try {
// 任務(wù)執(zhí)行的后置流程
jobFacade.afterJobExecuted(shardingContexts);
//CHECKSTYLE:OFF
} catch (final Throwable cause) {
//CHECKSTYLE:ON
jobErrorHandler.handleException(jobConfig.getJobName(), cause);
}
}
這個(gè)execute就由Quartz的JobRunShell調(diào)用了,Quartz的調(diào)用的過程在 Java | 一分鐘掌握定時(shí)任務(wù) | 6 - Quartz定時(shí)任務(wù) - 掘金 (juejin.cn) 里面還好Mars醬分析過了。
執(zhí)行流程總結(jié)
那么,追蹤完源代碼,大致的流程就應(yīng)該是如下:
1.組裝基本參數(shù)(任務(wù)、頻率等) -> 2. ScheduleJobBootstrap初始化 -> 3.配置任務(wù)屬性 -> 4.設(shè)置各種facade -> 5.初始化ElasticJobExecutor -> 6.調(diào)用scheduler執(zhí)行任務(wù) -> 7.獲取任務(wù)執(zhí)行器(SimpleJobExecutor) -> 8.各種校驗(yàn)邏輯 -> 9. 處理分片參數(shù) -> 10. 設(shè)置任務(wù)為運(yùn)行狀態(tài) -> 11. 提交任務(wù)到線程池 -> 12.執(zhí)行任務(wù) -> 13.處理任務(wù)后續(xù)邏輯
任務(wù)的調(diào)度過程由zk完成,取決于zk的任務(wù)調(diào)度策略吧?如果一臺機(jī)器的定時(shí)運(yùn)行時(shí)掛了,zk會(huì)轉(zhuǎn)移到另一臺運(yùn)行中的機(jī)器中去。-- Mars醬
分片的策略
任務(wù)的分片策略,用于將任務(wù)在分布式環(huán)境下分解成為任務(wù)使用。
SPI 名稱 | 詳細(xì)說明 |
---|---|
JobShardingStrategy | 作業(yè)分片策略接口 |
已知實(shí)現(xiàn)類 | 詳細(xì)說明 |
---|---|
AverageAllocationJobShardingStrategy | 根據(jù)分片項(xiàng)平均分片 |
OdevitySortByNameJobShardingStrategy | 根據(jù)任務(wù)名稱哈希值的奇偶數(shù)決定按照任務(wù)服務(wù)器 IP 升序或是降序的方式分片 |
RoundRobinByNameJobShardingStrategy | 根據(jù)任務(wù)名稱輪詢分片 |
那么任務(wù)的分片策略在哪里使用的呢?就在代碼中注釋的“這里有玄機(jī)”那行。在getShardingContexts的方法中會(huì)調(diào)用ShardingService,它會(huì)去獲取JobConfiguration
中配置的分片策略方式:
public void shardingIfNecessary() {
List<JobInstance> availableJobInstances = instanceService.getAvailableJobInstances();
if (!isNeedSharding() || availableJobInstances.isEmpty()) {
return;
}
if (!leaderService.isLeaderUntilBlock()) {
blockUntilShardingCompleted();
return;
}
waitingOtherShardingItemCompleted();
JobConfiguration jobConfig = configService.load(false);
int shardingTotalCount = jobConfig.getShardingTotalCount();
log.debug("Job '{}' sharding begin.", jobName);
jobNodeStorage.fillEphemeralJobNode(ShardingNode.PROCESSING, "");
resetShardingInfo(shardingTotalCount);
// 獲取任務(wù)分片策略
JobShardingStrategy jobShardingStrategy = JobShardingStrategyFactory.getStrategy(jobConfig.getJobShardingStrategyType());
jobNodeStorage.executeInTransaction(getShardingResultTransactionOperations(jobShardingStrategy.sharding(availableJobInstances, jobName, shardingTotalCount)));
log.debug("Job '{}' sharding complete.", jobName);
}
如果不設(shè)置,默認(rèn)使用的是平均分片策略。文章來源:http://www.zghlxwxcb.cn/news/detail-462374.html
總結(jié)
這,大抵就是ElasticJob的工作原理了吧。文章來源地址http://www.zghlxwxcb.cn/news/detail-462374.html
到了這里,關(guān)于Java | 一分鐘掌握定時(shí)任務(wù) | 7 - ElasticJob分布式定時(shí)任務(wù)的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!