国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Java | 一分鐘掌握定時(shí)任務(wù) | 7 - ElasticJob分布式定時(shí)任務(wù)

這篇具有很好參考價(jià)值的文章主要介紹了Java | 一分鐘掌握定時(shí)任務(wù) | 7 - ElasticJob分布式定時(shí)任務(wù)。希望對大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

作者:Mars醬

聲明:本文章由Mars醬編寫,部分內(nèi)容來源于網(wǎng)絡(luò),如有疑問請聯(lián)系本人。

轉(zhuǎn)載:歡迎轉(zhuǎn)載,轉(zhuǎn)載前先請聯(lián)系我!

前言

ElasticJob 是面向互聯(lián)網(wǎng)生態(tài)和海量任務(wù)的分布式調(diào)度解決方案。 它通過彈性調(diào)度、資源管控、以及任務(wù)治理的功能,打造一個(gè)適用于互聯(lián)網(wǎng)場景的分布式調(diào)度解決方案,并通過開放的架構(gòu)設(shè)計(jì),提供多元化的任務(wù)生態(tài)。 它的各個(gè)產(chǎn)品使用統(tǒng)一的任務(wù) API,開發(fā)者僅需一次開發(fā),即可隨意部署。

架構(gòu)

elasticjob由兩個(gè)相互獨(dú)立的子項(xiàng)目 ElasticJob-Lite 和 ElasticJob-Cloud 組成組成,這是ElasticJob-Lite 的架構(gòu)圖:

Java | 一分鐘掌握定時(shí)任務(wù) | 7 - ElasticJob分布式定時(shí)任務(wù)

從架構(gòu)圖可以看到,左上角App1和App2兩個(gè)業(yè)務(wù)模塊中的Elastic-Job往zk中注冊了信息,右邊的Elastic-Job-Lite是監(jiān)聽了zk的,因此,整個(gè)任務(wù)的調(diào)度是由zk來完成的。下面的console通過Rest API去獲取zk中的信息,得到調(diào)度數(shù)據(jù)和日志,并存盤。

這是ElasticJob-Cloud的架構(gòu)圖:

Java | 一分鐘掌握定時(shí)任務(wù) | 7 - ElasticJob分布式定時(shí)任務(wù)

ElasticJob-Cloud的調(diào)度是依賴Mesos的,從架構(gòu)圖的理解,Mesos和zk結(jié)合做好任務(wù)調(diào)度,再分發(fā)給Mesos的代理并執(zhí)行。

功能和特性

以下是ElasticJob的特性優(yōu)點(diǎn)

  • 支持任務(wù)在分布式場景下的分片和高可用
  • 能夠水平擴(kuò)展任務(wù)的吞吐量和執(zhí)行效率
  • 任務(wù)處理能力隨資源配備彈性伸縮
  • 優(yōu)化任務(wù)和資源調(diào)度
  • 相同任務(wù)聚合至相同的執(zhí)行器統(tǒng)一處理
  • 動(dòng)態(tài)調(diào)配追加資源至新分配的任務(wù)
  • 失效轉(zhuǎn)移
  • 錯(cuò)過任務(wù)重新執(zhí)行
  • 分布式環(huán)境下任務(wù)自動(dòng)診斷和修復(fù)
  • 基于有向無環(huán)圖 (DAG) 的任務(wù)依賴
  • 基于有向無環(huán)圖 (DAG) 的任務(wù)項(xiàng)目依賴
  • 可擴(kuò)展的任務(wù)類型統(tǒng)一接口
  • 支持豐富的任務(wù)類型庫–包括數(shù)據(jù)流、腳本、HTTP、文件、大數(shù)據(jù)
  • 易于對接業(yè)務(wù)任務(wù)–兼容 Spring IOC
  • 任務(wù)管控端
  • 任務(wù)事件追蹤
  • 注冊中心管理

入門角色

既然這么多優(yōu)點(diǎn),我們就入門試試吧。入門elasticjob-lite也繼承了Quartz框架,同樣的很簡單,只要三個(gè)角色:

SimpleJob:任務(wù)主體。如果用過Quartz,那么應(yīng)該能夠理解這個(gè),基本上和Quartz的Job接口類似,只要實(shí)現(xiàn)一個(gè)execute方法就行了,入門用這個(gè)就行;

JobConfiguration:任務(wù)配置。同樣的可以理解為類似Quartz框架中的Trigger,最重要的就是配置任務(wù)的執(zhí)行頻率;

ScheduleJobBootstrap:調(diào)度主體。這個(gè)一樣,參考Quartz框架中的Scheduler對象,它把任務(wù)和配置結(jié)合起來,任務(wù)按照配置中的頻率執(zhí)行。

寫個(gè)例子

我們創(chuàng)建這三種角色,首先創(chuàng)建任務(wù)主體:

import org.apache.shardingsphere.elasticjob.api.ShardingContext;
import org.apache.shardingsphere.elasticjob.simple.job.SimpleJob;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.List;

/**
 * (這個(gè)類的說明)
 *
 * @author mars醬
 */

public class MarsSimpleJob implements SimpleJob {
    
    @Override
    public void execute(final ShardingContext shardingContext) {
        System.out.printf("Item: %s | Time: %s | Thread: %s | %s%n",
               			  shardingContext.getShardingItem(), 
                          new SimpleDateFormat("HH:mm:ss").format(new Date()), 
                          Thread.currentThread().getId(), 
                          "就是這么簡單~");
    }
}

再創(chuàng)建任務(wù)配置:

import org.apache.shardingsphere.elasticjob.api.JobConfiguration;
import org.apache.shardingsphere.elasticjob.tracing.api.TracingConfiguration;

import javax.sql.DataSource;
import java.util.Objects;

/**
 * (這個(gè)類的說明)
 *
 * @author mars醬
 */

public class JobConfigurationBuilder {
    public static JobConfiguration buildJobConfiguration(String jobName, String cronExpression, TracingConfiguration<DataSource> tracingConfig) {

        JobConfiguration.Builder builder = JobConfiguration.newBuilder(jobName, 3)
                .cron(cronExpression)
                .shardingItemParameters("0=a,1=b,2=c");
        if (Objects.nonNull(tracingConfig)) {
            builder.addExtraConfigurations(tracingConfig);
        }
        return builder.build();
    }
}

最后創(chuàng)建調(diào)度器,并執(zhí)行:

import org.apache.shardingsphere.elasticjob.lite.api.bootstrap.impl.ScheduleJobBootstrap;
import org.apache.shardingsphere.elasticjob.lite.example.job.simple.JavaSimpleJob;
import org.apache.shardingsphere.elasticjob.reg.base.CoordinatorRegistryCenter;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperConfiguration;
import org.apache.shardingsphere.elasticjob.reg.zookeeper.ZookeeperRegistryCenter;
import org.apache.shardingsphere.elasticjob.tracing.api.TracingConfiguration;

import javax.sql.DataSource;


/**
 * (這個(gè)類的說明)
 *
 * @author mars醬
 */

public final class SchedulerMain {

    private static final int EMBED_ZOOKEEPER_PORT = 4181;

    private static final String ZOOKEEPER_CONNECTION_STRING = "localhost:" + EMBED_ZOOKEEPER_PORT;

    private static final String JOB_NAMESPACE = "elasticjob-marsz-lite-java";

    // CHECKSTYLE:OFF
    public static void main(final String[] args) {
        // 內(nèi)嵌zk服務(wù)
        EmbedZookeeperServer.start(EMBED_ZOOKEEPER_PORT);
        CoordinatorRegistryCenter regCenter = setUpRegistryCenter();
        // 簡單作業(yè)
        setUpSimpleJob(regCenter, null);
    }

    private static CoordinatorRegistryCenter setUpRegistryCenter() {
        ZookeeperConfiguration zkConfig = new ZookeeperConfiguration(ZOOKEEPER_CONNECTION_STRING, JOB_NAMESPACE);
        CoordinatorRegistryCenter result = new ZookeeperRegistryCenter(zkConfig);
        result.init();
        return result;
    }

    private static void setUpSimpleJob(final CoordinatorRegistryCenter regCenter, final TracingConfiguration<DataSource> tracingConfig) {
        new ScheduleJobBootstrap(regCenter,
                new MarsSimpleJob(),
                JobConfigurationBuilder.buildJobConfiguration("marsSimpleJob", "0/5 * * * * ?", tracingConfig)).schedule();

    }

}

運(yùn)行的效果:

Java | 一分鐘掌握定時(shí)任務(wù) | 7 - ElasticJob分布式定時(shí)任務(wù)

截圖中Item是處理的分片項(xiàng),Thread是當(dāng)前線程的id,看到了Quartz框架的影子…。

任務(wù)執(zhí)行流程

既然能成功運(yùn)行,我們看看內(nèi)部的處理邏輯吧。Mars醬本機(jī)并沒有安裝zk,所以copy了官方的例子,在程序運(yùn)行前先啟用了一個(gè)內(nèi)嵌的zk服務(wù):

EmbedZookeeperServer.start(EMBED_ZOOKEEPER_PORT);

這個(gè)只能在模擬的時(shí)候使用,千萬不能拿去放生產(chǎn)環(huán)境。接下來就是注冊中心的配置了,我們需要的是CoordinatorRegistryCenter對象:

private static CoordinatorRegistryCenter setUpRegistryCenter() {
	ZookeeperConfiguration zkConfig = new ZookeeperConfiguration(ZOOKEEPER_CONNECTION_STRING, JOB_NAMESPACE);
    CoordinatorRegistryCenter result = new ZookeeperRegistryCenter(zkConfig);
    result.init();
    return result;
}

好了,zk的部分處理完成,下面就是直接SchedulerJobBootstrap的部分了。

ScheduleJobBootstrap初始化

ScheduleJobBootstrap的初始化在例子中需要三個(gè)參數(shù):

CoordinatorRegistryCenter:這個(gè)是協(xié)調(diào)用的注冊中心。是一個(gè)接口類,它的實(shí)現(xiàn)在ElasticJob里面只有一個(gè)ZookeeperRegisterCenter對象,未來是不是會(huì)支持其他的注冊中心呢?

ElasticJob: Mars醬理解為任務(wù)對象。但是ElasticJob這個(gè)對象本身是個(gè)空接口,有兩個(gè)子接口SimpleJobDataflowJob,前者M(jìn)ars醬的理解是和Quartz中的Job對象類似,只要實(shí)現(xiàn)execute函數(shù)就行,后者有需要實(shí)現(xiàn)兩個(gè)接口,一個(gè)fetchData獲取數(shù)據(jù),一個(gè)processData處理數(shù)據(jù)。所以,ElasticJob這個(gè)接口留空,是為了還有其他擴(kuò)展吧?

JobConfiguration:彈性任務(wù)配置項(xiàng)。構(gòu)建這個(gè)對象不能直接設(shè)置,只能用buider的方式構(gòu)建。需要配置的屬性很多,但是核心屬性大致就是幾個(gè):任務(wù)名稱、分片數(shù)、執(zhí)行頻率、分片參數(shù)。JobConfiguration的所有屬性如下:

屬性名 說明
String jobName 任務(wù)名稱
String cron cron表達(dá)式
String timeZone 任務(wù)運(yùn)行的時(shí)區(qū)
int shardingTotalCount 任務(wù)分片總數(shù)
String shardingItemParameters 分片序號和參數(shù),多個(gè)鍵值對之間用逗號分隔,從0開始,但是不能大于或等于任務(wù)分片的總數(shù)
String jobParameter 任務(wù)自定義任務(wù)參數(shù)
boolean monitorExecution 是否監(jiān)聽執(zhí)行
boolean failover 是否啟用故障轉(zhuǎn)移。開啟表示如果任務(wù)在一次任務(wù)執(zhí)行中途宕機(jī),允許將該次未完成的任務(wù)在另一任務(wù)節(jié)點(diǎn)上補(bǔ)償執(zhí)行
boolean misfire 不發(fā)火。哈哈,其實(shí)是是否開啟錯(cuò)過任務(wù)重新執(zhí)行
int maxTimeDiffSeconds 最大時(shí)差
int reconcileIntervalMinutes 間隔時(shí)長
String jobShardingStrategyType 任務(wù)分片策略類型,總共三種
String jobExecutorServiceHandlerType 任務(wù)執(zhí)行程序服務(wù)處理程序類型
String jobErrorHandlerType 任務(wù)錯(cuò)誤處理類型
Collection jobListenerTypes 任務(wù)監(jiān)聽類型
Collection extraConfigurations 附加配置信息
String description 任務(wù)描述
Properties props 擴(kuò)展用屬性值
boolean disabled 是否禁用
boolean overwrite 是否覆蓋
String label 標(biāo)簽
boolean staticSharding 是否支持靜態(tài)分片

ScheduleJobBootstrap執(zhí)行

同樣的,例子中的MarsSimpleJob的execute函數(shù),最終會(huì)被ElasticJob框架調(diào)用,我們按照被執(zhí)行的反向順序往上找。MarsSimpleJob是繼承SimpleJob的, 而SimpleJob的execute函數(shù)是被SimpleJobExecutor所調(diào)用:

/**
 * Simple job executor.
 */
public final class SimpleJobExecutor implements ClassedJobItemExecutor<SimpleJob> {
    
    @Override
    public void process(final SimpleJob elasticJob, final JobConfiguration jobConfig, final JobFacade jobFacade, final ShardingContext shardingContext) {
        // 這里調(diào)用execute函數(shù)
        elasticJob.execute(shardingContext);
    }
    
    @Override
    public Class<SimpleJob> getElasticJobClass() {
        return SimpleJob.class;
    }
}

再繼續(xù)往上找,process的核心流程就是在ElasticJobExecutor里面了,調(diào)用process的部分在ElasticJobExcutor中幾個(gè)重載的process方法調(diào)用的,兩個(gè)process函數(shù)完成不同的功能,調(diào)用SimpleExecutor的process部分是這樣:

@SuppressWarnings("unchecked")
private void process(final JobConfiguration jobConfig, final ShardingContexts shardingContexts, final int item, final JobExecutionEvent startEvent) {
        jobFacade.postJobExecutionEvent(startEvent);
        log.trace("Job '{}' executing, item is: '{}'.", jobConfig.getJobName(), item);
        JobExecutionEvent completeEvent;
        try {
            // 這里調(diào)用SimpleJobExecutor的process
            jobItemExecutor.process(elasticJob, jobConfig, jobFacade, shardingContexts.createShardingContext(item));
            completeEvent = startEvent.executionSuccess();
            log.trace("Job '{}' executed, item is: '{}'.", jobConfig.getJobName(), item);
            jobFacade.postJobExecutionEvent(completeEvent);
            // CHECKSTYLE:OFF
        } catch (final Throwable cause) {
            // CHECKSTYLE:ON
            completeEvent = startEvent.executionFailure(ExceptionUtils.transform(cause));
            jobFacade.postJobExecutionEvent(completeEvent);
            itemErrorMessages.put(item, ExceptionUtils.transform(cause));
            JobErrorHandler jobErrorHandler = executorContext.get(JobErrorHandler.class);
            jobErrorHandler.handleException(jobConfig.getJobName(), cause);
        }
}

上面這個(gè)process負(fù)責(zé)最終任務(wù)的執(zhí)行部分,由JobItemExecutor對象調(diào)用,SimpleJobExecutor被JobItemExecutor接口定義。整個(gè)這個(gè)proces由guava包的EventBus處理消息事件,執(zhí)行之前有startEvent,執(zhí)行完成有completeEvent,異常也有對應(yīng)的失敗event,方面架構(gòu)圖中存盤事件日志、ELK日志收集動(dòng)作。

調(diào)用這個(gè)process的部分,由另一個(gè)process完成,長這樣的:

private void process(final JobConfiguration jobConfig, final ShardingContexts shardingContexts, final ExecutionSource executionSource) {
        Collection<Integer> items = shardingContexts.getShardingItemParameters().keySet();
        if (1 == items.size()) {
            int item = shardingContexts.getShardingItemParameters().keySet().iterator().next();
            JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(IpUtils.getHostName(), IpUtils.getIp(), shardingContexts.getTaskId(), jobConfig.getJobName(), executionSource, item);
            process(jobConfig, shardingContexts, item, jobExecutionEvent);
            return;
        }
        CountDownLatch latch = new CountDownLatch(items.size());
        for (int each : items) {
            JobExecutionEvent jobExecutionEvent = new JobExecutionEvent(IpUtils.getHostName(), IpUtils.getIp(), shardingContexts.getTaskId(), jobConfig.getJobName(), executionSource, each);
            ExecutorService executorService = executorContext.get(ExecutorService.class);
            if (executorService.isShutdown()) {
                return;
            }
            // 提交給線程池執(zhí)行
            executorService.submit(() -> {
                try {
                    process(jobConfig, shardingContexts, each, jobExecutionEvent);
                } finally {
                    latch.countDown();
                }
            });
        }
        try {
            latch.await();
        } catch (final InterruptedException ex) {
            Thread.currentThread().interrupt();
        }
}

上面這個(gè)process負(fù)責(zé)把分片參數(shù)依次組裝好,設(shè)置好JobExecutionEvent中的ip、主機(jī)名等參數(shù),然后放入線程池中去執(zhí)行。再往上,看現(xiàn)在這個(gè)process被調(diào)用的部分:

private void execute(final JobConfiguration jobConfig, final ShardingContexts shardingContexts, final ExecutionSource executionSource) {
	if (shardingContexts.getShardingItemParameters().isEmpty()) {
		jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_FINISHED, String.format("Sharding item for job '%s' is empty.", jobConfig.getJobName()));
        return;
    }
    // 往注冊中心注冊ShardingContexts信息
    jobFacade.registerJobBegin(shardingContexts);
    String taskId = shardingContexts.getTaskId();
    // 發(fā)送跟蹤日志,標(biāo)記任務(wù)正在運(yùn)行
    jobFacade.postJobStatusTraceEvent(taskId, State.TASK_RUNNING, "");
    try {
        // 調(diào)用process
        process(jobConfig, shardingContexts, executionSource);
    } finally {
        // TODO Consider increasing the status of job failure, and how to handle the overall loop of job failure
        // 告知注冊中心任務(wù)完成
        jobFacade.registerJobCompleted(shardingContexts);
        if (itemErrorMessages.isEmpty()) {
            // 沒有失敗信息,通知任務(wù)完成
            jobFacade.postJobStatusTraceEvent(taskId, State.TASK_FINISHED, "");
        } else {
            // 否則通知失敗
            jobFacade.postJobStatusTraceEvent(taskId, State.TASK_ERROR, itemErrorMessages.toString());
            itemErrorMessages.clear();
        }
    }
}

方法execute從注冊中心注冊ShardingContext信息,并發(fā)送跟蹤日志事件,然后調(diào)用process,最后發(fā)送跟蹤消息標(biāo)記任務(wù)完成。再有一個(gè)重載的execute方法調(diào)用上面這個(gè)execute方法,如下:

public void execute() {
    // job的配置信息
    JobConfiguration jobConfig = jobFacade.loadJobConfiguration(true);
    executorContext.reloadIfNecessary(jobConfig);
    JobErrorHandler jobErrorHandler = executorContext.get(JobErrorHandler.class);
    try {
        jobFacade.checkJobExecutionEnvironment();
    } catch (final JobExecutionEnvironmentException cause) {
        jobErrorHandler.handleException(jobConfig.getJobName(), cause);
    }
    // 這里有玄機(jī)
    ShardingContexts shardingContexts = jobFacade.getShardingContexts();
    // 發(fā)送時(shí)間消息總線
    jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), State.TASK_STAGING, String.format("Job '%s' execute begin.", jobConfig.getJobName()));
    if (jobFacade.misfireIfRunning(shardingContexts.getShardingItemParameters().keySet())) {
        jobFacade.postJobStatusTraceEvent(shardingContexts.getTaskId(), 
                                          State.TASK_FINISHED, 
                                          String.format(
                "Previous job '%s' - shardingItems '%s' is still running, misfired job will start after previous job completed.", 
                                              jobConfig.getJobName(),
                                              shardingContexts.getShardingItemParameters().keySet()));
            return;
    }
    try {
        // 任務(wù)執(zhí)行的前置流程
        jobFacade.beforeJobExecuted(shardingContexts);
        //CHECKSTYLE:OFF
    } catch (final Throwable cause) {
        //CHECKSTYLE:ON
        jobErrorHandler.handleException(jobConfig.getJobName(), cause);
    }
    // 調(diào)用上面的execute方法
    execute(jobConfig, shardingContexts, ExecutionSource.NORMAL_TRIGGER);
    while (jobFacade.isExecuteMisfired(shardingContexts.getShardingItemParameters().keySet())) {
        jobFacade.clearMisfire(shardingContexts.getShardingItemParameters().keySet());
        execute(jobConfig, shardingContexts, ExecutionSource.MISFIRE);
    }
    // 故障轉(zhuǎn)移
    jobFacade.failoverIfNecessary();
    try {
        // 任務(wù)執(zhí)行的后置流程
        jobFacade.afterJobExecuted(shardingContexts);
        //CHECKSTYLE:OFF
    } catch (final Throwable cause) {
        //CHECKSTYLE:ON
        jobErrorHandler.handleException(jobConfig.getJobName(), cause);
    }
}

這個(gè)execute就由Quartz的JobRunShell調(diào)用了,Quartz的調(diào)用的過程在 Java | 一分鐘掌握定時(shí)任務(wù) | 6 - Quartz定時(shí)任務(wù) - 掘金 (juejin.cn) 里面還好Mars醬分析過了。

執(zhí)行流程總結(jié)

那么,追蹤完源代碼,大致的流程就應(yīng)該是如下:

1.組裝基本參數(shù)(任務(wù)、頻率等) -> 2. ScheduleJobBootstrap初始化 -> 3.配置任務(wù)屬性 -> 4.設(shè)置各種facade -> 5.初始化ElasticJobExecutor -> 6.調(diào)用scheduler執(zhí)行任務(wù) -> 7.獲取任務(wù)執(zhí)行器(SimpleJobExecutor) -> 8.各種校驗(yàn)邏輯 -> 9. 處理分片參數(shù) -> 10. 設(shè)置任務(wù)為運(yùn)行狀態(tài) -> 11. 提交任務(wù)到線程池 -> 12.執(zhí)行任務(wù) -> 13.處理任務(wù)后續(xù)邏輯

任務(wù)的調(diào)度過程由zk完成,取決于zk的任務(wù)調(diào)度策略吧?如果一臺機(jī)器的定時(shí)運(yùn)行時(shí)掛了,zk會(huì)轉(zhuǎn)移到另一臺運(yùn)行中的機(jī)器中去。-- Mars醬

分片的策略

任務(wù)的分片策略,用于將任務(wù)在分布式環(huán)境下分解成為任務(wù)使用。

SPI 名稱 詳細(xì)說明
JobShardingStrategy 作業(yè)分片策略接口
已知實(shí)現(xiàn)類 詳細(xì)說明
AverageAllocationJobShardingStrategy 根據(jù)分片項(xiàng)平均分片
OdevitySortByNameJobShardingStrategy 根據(jù)任務(wù)名稱哈希值的奇偶數(shù)決定按照任務(wù)服務(wù)器 IP 升序或是降序的方式分片
RoundRobinByNameJobShardingStrategy 根據(jù)任務(wù)名稱輪詢分片

那么任務(wù)的分片策略在哪里使用的呢?就在代碼中注釋的“這里有玄機(jī)”那行。在getShardingContexts的方法中會(huì)調(diào)用ShardingService,它會(huì)去獲取JobConfiguration中配置的分片策略方式:

public void shardingIfNecessary() {
    List<JobInstance> availableJobInstances = instanceService.getAvailableJobInstances();
    if (!isNeedSharding() || availableJobInstances.isEmpty()) {
        return;
    }
    if (!leaderService.isLeaderUntilBlock()) {
        blockUntilShardingCompleted();
        return;
    }
    waitingOtherShardingItemCompleted();
    JobConfiguration jobConfig = configService.load(false);
    int shardingTotalCount = jobConfig.getShardingTotalCount();
    log.debug("Job '{}' sharding begin.", jobName);
    jobNodeStorage.fillEphemeralJobNode(ShardingNode.PROCESSING, "");
    resetShardingInfo(shardingTotalCount);
    // 獲取任務(wù)分片策略
    JobShardingStrategy jobShardingStrategy = JobShardingStrategyFactory.getStrategy(jobConfig.getJobShardingStrategyType());
    jobNodeStorage.executeInTransaction(getShardingResultTransactionOperations(jobShardingStrategy.sharding(availableJobInstances, jobName, shardingTotalCount)));
    log.debug("Job '{}' sharding complete.", jobName);
}

如果不設(shè)置,默認(rèn)使用的是平均分片策略。

總結(jié)

這,大抵就是ElasticJob的工作原理了吧。文章來源地址http://www.zghlxwxcb.cn/news/detail-462374.html

到了這里,關(guān)于Java | 一分鐘掌握定時(shí)任務(wù) | 7 - ElasticJob分布式定時(shí)任務(wù)的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 分布式定時(shí)任務(wù)

    分布式定時(shí)任務(wù)

    本文引用了谷粒商城的課程 定時(shí)任務(wù)是我們系統(tǒng)里面經(jīng)常要用到的一些功能。如每天的支付訂單要與支付寶進(jìn)行對賬操作、每個(gè)月定期進(jìn)行財(cái)務(wù)匯總、在服務(wù)空閑時(shí)定時(shí)統(tǒng)計(jì)當(dāng)天所有信息數(shù)據(jù)等。 定時(shí)任務(wù)有個(gè)非常流行的框架Quartz和Java原生API的Timer類。Spring框架也可以支持

    2023年04月15日
    瀏覽(24)
  • 分布式任務(wù)調(diào)度,定時(shí)任務(wù)的處理方案

    分布式任務(wù)調(diào)度,定時(shí)任務(wù)的處理方案

    適用場景: Spring 定時(shí)任務(wù)是 Spring 框架提供的一種輕量級的任務(wù)調(diào)度方案,它的特點(diǎn)是簡單易用、輕量級。Spring 定時(shí)任務(wù)的執(zhí)行是在 單個(gè)節(jié)點(diǎn) 上進(jìn)行的,如果需要分布式任務(wù)調(diào)度,需要自己實(shí)現(xiàn)相應(yīng)的解決方案。 1.導(dǎo)入依賴版本自己控制 2.啟動(dòng)類加上@EnableScheduling 3.編寫業(yè)

    2023年04月14日
    瀏覽(44)
  • 分布式作業(yè)調(diào)度框架——ElasticJob

    分布式作業(yè)調(diào)度框架——ElasticJob

    ElasticJob 是面向互聯(lián)網(wǎng)生態(tài)和海量任務(wù)的分布式調(diào)度解決方案,由兩個(gè)相互獨(dú)立的子項(xiàng)目 ElasticJob-Lite 和 ElasticJob-Cloud 組成。 它通過彈性調(diào)度、資源管控、以及作業(yè)治理的功能,打造一個(gè)適用于互聯(lián)網(wǎng)場景的分布式調(diào)度解決方案,并通過開放的架構(gòu)設(shè)計(jì),提供多元化的作業(yè)生

    2024年02月13日
    瀏覽(29)
  • 使用shedlock實(shí)現(xiàn)分布式定時(shí)任務(wù)鎖【防止task定時(shí)任務(wù)重復(fù)執(zhí)行】

    使用shedlock實(shí)現(xiàn)分布式定時(shí)任務(wù)鎖【防止task定時(shí)任務(wù)重復(fù)執(zhí)行】

    第一步:引入shedlock相關(guān)依賴 ShedLock還可以使用Mongo,Redis,Hazelcast,ZooKeeper等外部存儲進(jìn)行協(xié)調(diào),例如使用redis則引入下面的包 第二步:創(chuàng)建數(shù)據(jù)庫表結(jié)構(gòu),數(shù)據(jù)庫表的腳本如下: 第三步:添加shedlock配置類 (定時(shí)任務(wù)防重復(fù)執(zhí)行的配置類) 第四步:在啟動(dòng)類上添加啟動(dòng)注

    2024年02月10日
    瀏覽(30)
  • 分布式定時(shí)任務(wù)框架 PowerJob

    分布式定時(shí)任務(wù)框架 PowerJob

    1.1 為什么需要使用定時(shí)任務(wù)調(diào)度 (1)時(shí)間驅(qū)動(dòng)處理場景:整點(diǎn)發(fā)送優(yōu)惠券,每天更新收益,每天刷新標(biāo)簽數(shù)據(jù)和人群數(shù)據(jù)。 (2)批量處理數(shù)據(jù):按月批量統(tǒng)計(jì)報(bào)表數(shù)據(jù),批量更新短信狀態(tài),實(shí)時(shí)性要求不高。 (3)異步執(zhí)行解耦:活動(dòng)狀態(tài)刷新,異步執(zhí)行離線查詢,與內(nèi)部

    2024年02月09日
    瀏覽(24)
  • 分布式定時(shí)任務(wù)調(diào)度框架Quartz

    分布式定時(shí)任務(wù)調(diào)度框架Quartz

    Quartz是一個(gè)定時(shí)任務(wù)調(diào)度框架,比如你遇到這樣的問題: 比如淘寶的待支付功能,后臺會(huì)在你生成訂單后24小時(shí)后,查看訂單是否支付,未支付則取消訂單 比如vip的每月自動(dòng)續(xù)費(fèi)功能 … 想定時(shí)在某個(gè)時(shí)間,去做某件事 Quartz是一套輕量級的任務(wù)調(diào)度框架,只需要定義了 Job(

    2024年02月04日
    瀏覽(34)
  • 分布式定時(shí)任務(wù)調(diào)度xxl-job

    分布式定時(shí)任務(wù)調(diào)度xxl-job

    Quartz中最重要的三個(gè)對象:Job(作業(yè))、Trigger(觸發(fā)器)、Scheduler(調(diào)度器)。 xxl-job的調(diào)度原理:調(diào)度線程在一個(gè)while循環(huán)中不斷地獲取一定數(shù)量的即將觸發(fā)的Trigger,拿到綁定的Job,包裝成工作線程執(zhí)行。 當(dāng)然,不管在任何調(diào)度系統(tǒng)中,底層都是線程模型。如果要自己寫一個(gè)

    2024年03月10日
    瀏覽(20)
  • springcloud:新一代分布式定時(shí)任務(wù)框架——PowerJob

    springcloud:新一代分布式定時(shí)任務(wù)框架——PowerJob

    之前我們講解過主流的分布式定時(shí)任務(wù)框架xxl-job,隨著技術(shù)的迭代更新,更多的定時(shí)任務(wù)框架也開始出現(xiàn),今天我們來看一看新一代的定時(shí)任務(wù)框架 PowerJob PowerJob是基于java開發(fā)的企業(yè)級的分布式任務(wù)調(diào)度平臺,與xxl-job一樣,基于web頁面實(shí)現(xiàn)任務(wù)調(diào)度配置與記錄,使用簡單,

    2024年02月02日
    瀏覽(27)
  • 分布式定時(shí)任務(wù)-XXL-JOB-教程+實(shí)戰(zhàn)

    分布式定時(shí)任務(wù)-XXL-JOB-教程+實(shí)戰(zhàn)

    1.定時(shí)任務(wù)認(rèn)識 1.1.什么是定時(shí)任務(wù) 定時(shí)任務(wù)是按照指定時(shí)間周期運(yùn)行任務(wù)。使用場景為在某個(gè)固定時(shí)間點(diǎn)執(zhí)行,或者周期性的去執(zhí)行某個(gè)任務(wù),比如:每天晚上24點(diǎn)做數(shù)據(jù)匯總,定時(shí)發(fā)送短信等。 1.2.常見定時(shí)任務(wù)方案 While + Sleep : 通過循環(huán)加休眠的方式定時(shí)執(zhí)行 Timer和Time

    2024年02月16日
    瀏覽(30)
  • Springboot 定時(shí)任務(wù),分布式下冪等性如何解決

    Springboot 定時(shí)任務(wù),分布式下冪等性如何解決

    在分布式環(huán)境下,定時(shí)任務(wù)的冪等性問題需要考慮多個(gè)節(jié)點(diǎn)之間的數(shù)據(jù)一致性和事務(wù)處理。 一種解決方法是使用分布式鎖來保證同一時(shí)間只有一個(gè)節(jié)點(diǎn)能夠執(zhí)行該任務(wù)。具體實(shí)現(xiàn)可以使用Redis或Zookeeper等分布式協(xié)調(diào)工具提供的分布式鎖功能。 另一種解決方法是使用消息隊(duì)列來

    2024年02月11日
    瀏覽(22)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包