国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

一種基于springboot、redis的分布式任務(wù)引擎的實(shí)現(xiàn)(一)

這篇具有很好參考價(jià)值的文章主要介紹了一種基于springboot、redis的分布式任務(wù)引擎的實(shí)現(xiàn)(一)。希望對大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

?總體思路是,主節(jié)點(diǎn)接收到任務(wù)請求,將根據(jù)任務(wù)情況拆分成多個(gè)任務(wù)塊,將任務(wù)塊標(biāo)識(shí)的主鍵放入redis。發(fā)送redis消息,等待其他節(jié)點(diǎn)運(yùn)行完畢,結(jié)束處理。接收到信息的節(jié)點(diǎn)注冊本節(jié)點(diǎn)信息到redis、開啟多線程、獲取任務(wù)塊、執(zhí)行任務(wù)、結(jié)束處理。

1、主節(jié)點(diǎn)接收任務(wù)請求

    @Override
    public void executeTaskInfo(PrepareDTO prepareDTO) {
        //異常標(biāo)記
        String taskInfo = prepareDTO.getTaskId();
        //任務(wù)分組狀態(tài)
        String taskStatus = "";
        try {
            log.info("數(shù)據(jù)準(zhǔn)備任務(wù)并設(shè)定任務(wù)執(zhí)行狀態(tài),{}", prepareDTO);
            this.dataPrepareBo.doStartGroupJobInfo(prepareDTO);
            //給redis集合中放計(jì)算對象
            log.info("開始放入計(jì)算任務(wù):{}", prepareDTO);
            boolean getTaskFlag = this.dataPrepareBo.pushCalculationObject(prepareDTO);
            if (!getTaskFlag) {
                taskStatus = String.format("沒有獲取數(shù)據(jù)或計(jì)劃已取消,%s", taskInfo);
                log.error(taskStatus);
                throw new Exception(taskStatus);
            }
            //發(fā)消息執(zhí)行緩存中任務(wù)
            log.info("發(fā)消息執(zhí)行任務(wù):{}", prepareDTO);
            sendMessage(prepareDTO);
            //等待任務(wù)執(zhí)行完畢
            log.info("等待任務(wù)執(zhí)行結(jié)果");
            taskStatus = this.getGroupUpLoadTaskFinsh(prepareDTO);
        } catch (Exception e) {//捕獲日志
            e.printStackTrace();
            taskStatus = "獲取任務(wù)狀態(tài)異常" + e;
            log.info(taskStatus);
            dataPrepareBo.putExceptionMsg2Cache(taskInfo, "數(shù)據(jù)準(zhǔn)備分發(fā)計(jì)算任務(wù)線程異常:" + taskStatus);
        } finally {
            //做任務(wù)結(jié)束處理
            this.doGroupTaskFinshpPocess(prepareDTO, taskStatus);
        }
    }

2,發(fā)送消息

    @Override
    public void sendMessage(String topic, String msg) {
        this.redisTemplate.convertAndSend(topic, msg);
    }

3,節(jié)點(diǎn)接收任務(wù),并執(zhí)行

    public void doUpLoadTask(String msg) throws Exception {
        log.info("開始執(zhí)行明細(xì)任務(wù){(diào)}" + msg);
        String taskId = this.getTaskId(msg);
        try {
            Object cancelFlag = this.redisTemplate.opsForValue().get(String.format(EngineConstant.JOB_CANCEL_FLAG, taskId));
            if(cancelFlag != null && "1".equals(cancelFlag.toString())){
                log.info("本次任務(wù)已取消");
                return;
            }
            //上傳本機(jī)執(zhí)行信息到redis
            this.cacheBo.initGroupUpLoadTaskStats(taskId,ENGINE_DISTRIBUTION_RUNNING.getKey());
            //從緩存獲取任務(wù),獲取任務(wù)后啟線程執(zhí)行任務(wù)。如果沒獲取到任務(wù),則本節(jié)點(diǎn)任務(wù)執(zhí)行完畢
            //循環(huán)獲取任務(wù)
            this.groupTaskProcessBO.doGroupTaskProcess(taskId, null);
            //處理結(jié)束
            this.cacheBo.finishGroupUpLoadTaskStats(taskId,ENGINE_DISTRIBUTION_RUNNING.getKey());
        } catch (Exception e) {
			//記錄日志
            taskUpldExeLogCDTO.setRunStas("-1");
            String exceptionInfo = this.taskLogUtils.getExceptionInfo(e) ;
            taskUpldExeLogCDTO.setAbnInfo(exceptionInfo);
            throw e;
        } finally {
			//記錄日志
            taskUpldExeLogCDTO.setEndtime(DateUtil.getCurrentDate());
            if("-1".equals(taskUpldExeLogCDTO.getRunStas())){//異常結(jié)束
                this.taskLogUtils.sendLogInfo(taskUpldExeLogCDTO,"執(zhí)行上傳任務(wù)異常");
            } else {//正常結(jié)束
                taskUpldExeLogCDTO.setRunStas("1");
                this.taskLogUtils.sendLogInfo(taskUpldExeLogCDTO,"執(zhí)行上傳任務(wù)正常");
            }
        }
    }

4,開啟線程執(zhí)行任務(wù)

    @Override
    public CalculationDTO doGroupTaskProcess(String taskId, TaskUpldExeLogCDTO taskUpldExeLogCDTO) throws Exception {
        List<Future> futureList = new ArrayList<>();
        //開始執(zhí)行明細(xì)任務(wù)處理
        ThreadPoolTaskExecutor taskTransferExecutor = ToolUtil.getExecutor("engine-file-tasks-pool-", Math.min(parallelProcessNum,10), 8);
        ExecutorListHolder.putThreadPool(String.format(GroupConstant.PREPARE_ENGINE_POOL,taskId), taskTransferExecutor.getThreadPoolExecutor());
        for(int i = 0 ; i < parallelProcessNum ; i++) {
            DoGroupUpLoadTaskThread doGroupUpLoadTaskThread = new DoGroupUpLoadTaskThread(taskId
                    , redisTemplate, calculationBo, null, null);
            Future<?> future = taskTransferExecutor.submit(doGroupUpLoadTaskThread);
            futureList.add(future);
        }

        if (!CollectionUtil.isEmpty(futureList)) {
            futureList.forEach(f -> {
                try {
                    f.get(GroupTaskProcessBOImpl.maxTime, TimeUnit.SECONDS);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        }
        log.info("本節(jié)點(diǎn)執(zhí)行分組任務(wù)執(zhí)行完畢{}", taskId + ":" + GroupConstant.IDENTITY);
        return null;
    }

5,線程執(zhí)行明細(xì)

    @Override
    public ResponseDTO call() throws Exception {
        //執(zhí)行任務(wù)
        while(true) {
            FilterTableUniqueDTO filterTableUniqueDTO = (FilterTableUniqueDTO)this.redisTemplate.opsForList().leftPop(String.format(ENGINE_FILTERTABLEUNIQUE_QUEUE.getKey(), taskId));
            log.debug("取出任務(wù):" + filterTableUniqueDTO);
            if(null == filterTableUniqueDTO) {
                break ;
            }
            long lastNum = this.redisTemplate.opsForList().size(String.format(ENGINE_FILTERTABLEUNIQUE_QUEUE.getKey(), taskId));
            log.info("生成文件剩余任務(wù)數(shù)量:" + lastNum);
//           處理任務(wù)
            calculationBo.GenerateFile(filterTableUniqueDTO, taskUpldDetlLogCDTO);
        }
        return null;
    }

以上是主要入口總體思路涉及代碼,詳細(xì)實(shí)現(xiàn)整理起來涉及內(nèi)容比較繁多,將在第二部分分享。文章來源地址http://www.zghlxwxcb.cn/news/detail-667404.html

到了這里,關(guān)于一種基于springboot、redis的分布式任務(wù)引擎的實(shí)現(xiàn)(一)的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • Springboot 定時(shí)任務(wù),分布式下冪等性如何解決

    Springboot 定時(shí)任務(wù),分布式下冪等性如何解決

    在分布式環(huán)境下,定時(shí)任務(wù)的冪等性問題需要考慮多個(gè)節(jié)點(diǎn)之間的數(shù)據(jù)一致性和事務(wù)處理。 一種解決方法是使用分布式鎖來保證同一時(shí)間只有一個(gè)節(jié)點(diǎn)能夠執(zhí)行該任務(wù)。具體實(shí)現(xiàn)可以使用Redis或Zookeeper等分布式協(xié)調(diào)工具提供的分布式鎖功能。 另一種解決方法是使用消息隊(duì)列來

    2024年02月11日
    瀏覽(22)
  • SpringBoot使用Redis實(shí)現(xiàn)分布式緩存

    SpringBoot使用Redis實(shí)現(xiàn)分布式緩存

    ?作者簡介:2022年 博客新星 第八 。熱愛國學(xué)的Java后端開發(fā)者,修心和技術(shù)同步精進(jìn)。 ??個(gè)人主頁:Java Fans的博客 ??個(gè)人信條:不遷怒,不貳過。小知識(shí),大智慧。 ??當(dāng)前專欄:SpringBoot 框架從入門到精通 ?特色專欄:國學(xué)周更-心性養(yǎng)成之路 ??本文內(nèi)容:SpringBoot使用

    2023年04月09日
    瀏覽(20)
  • SpringBoot 定時(shí)任務(wù) @Scheduled 集群環(huán)境優(yōu)化 (使用分布式鎖, 注解形式)

    SpringBoot提供了 Schedule模塊完美支持定時(shí)任務(wù)的執(zhí)行 在實(shí)際開發(fā)中由于項(xiàng)目部署在分布式或集群服務(wù)器上 會(huì)導(dǎo)致定時(shí)任務(wù)多次觸發(fā) 因此,使用redis分布鎖機(jī)制可以有效避免多次執(zhí)行定時(shí)任務(wù) ? 核心方法是org.springframework.data.redis.core包下的 ?setIfAbsent() 方法 返回值為布爾類型

    2024年02月15日
    瀏覽(29)
  • springboot3 redis 實(shí)現(xiàn)分布式鎖

    分布式鎖介紹 分布式鎖是一種在分布式系統(tǒng)中用于控制不同節(jié)點(diǎn)上的進(jìn)程或線程對共享資源進(jìn)行互斥訪問的技術(shù)機(jī)制。 在分布式環(huán)境中,多個(gè)服務(wù)可能同時(shí)訪問和操作共享資源,如數(shù)據(jù)庫、文件系統(tǒng)等。為了保持?jǐn)?shù)據(jù)的一致性和完整性,需要確保在同一時(shí)刻只有一個(gè)服務(wù)能

    2024年04月16日
    瀏覽(23)
  • XXL-JOB分布式任務(wù)調(diào)度平臺(tái)搭建以及和SpringBoot整合應(yīng)用

    XXL-JOB分布式任務(wù)調(diào)度平臺(tái)搭建以及和SpringBoot整合應(yīng)用

    XXL-JOB?是一個(gè)輕量級(jí)分布式任務(wù)調(diào)度平臺(tái),其核心設(shè)計(jì)目標(biāo)是開發(fā)迅速、學(xué)習(xí)簡單、輕量級(jí)、易擴(kuò)展。現(xiàn)已開放源代碼并接入多家公司線上產(chǎn)品線,開箱即用。 可以前往 Gitee 地址進(jìn)行下載使用: ? 代碼結(jié)構(gòu)如下: 運(yùn)行 SQL 文件至本地?cái)?shù)據(jù)庫: 修改 xxl-job-admin 模塊的 yml 文件

    2023年04月21日
    瀏覽(21)
  • SpringBoot整合Redis、以及緩存穿透、緩存雪崩、緩存擊穿的理解分布式情況下如何添加分布式鎖 【續(xù)篇】

    SpringBoot整合Redis、以及緩存穿透、緩存雪崩、緩存擊穿的理解分布式情況下如何添加分布式鎖 【續(xù)篇】

    上一篇實(shí)現(xiàn)了單體應(yīng)用下如何上鎖,這一篇主要說明如何在分布式場景下上鎖 上一篇地址:加鎖 需要注意的點(diǎn)是: 在上鎖和釋放鎖的過程中要保證 原子性操作 核心是上鎖和解鎖的過程 關(guān)于解鎖使用腳本參考:SET key value [EX seconds] [PX milliseconds] [NX|XX] 3.1 一個(gè)服務(wù)按照多個(gè)端口同時(shí)

    2023年04月10日
    瀏覽(29)
  • 基于 Redis 實(shí)現(xiàn)分布式限流

    分布式限流是指通過將限流策略嵌入到分布式系統(tǒng)中,以控制流量或保護(hù)服務(wù),保證系統(tǒng)在高并發(fā)訪問情況下不被過載。 分布式限流可以防止系統(tǒng)因大量請求同時(shí)到達(dá)導(dǎo)致壓力過大而崩潰,從而提高系統(tǒng)的穩(wěn)定性和可靠性。同時(shí),它可以使得業(yè)務(wù)資源能夠更好地分配,提高系

    2024年02月12日
    瀏覽(25)
  • 2、基于redis實(shí)現(xiàn)分布式鎖

    2、基于redis實(shí)現(xiàn)分布式鎖

    借助于redis中的命令setnx(key, value),key不存在就新增,存在就什么都不做。同時(shí)有多個(gè)客戶端發(fā)送setnx命令,只有一個(gè)客戶端可以成功,返回1(true);其他的客戶端返回0(false)。 多個(gè)客戶端同時(shí)獲取鎖(setnx) 獲取成功,執(zhí)行業(yè)務(wù)邏輯,執(zhí)行完成釋放鎖(del) 其他客戶端等

    2024年02月15日
    瀏覽(38)
  • 基于Mongodb分布式鎖簡單實(shí)現(xiàn),解決定時(shí)任務(wù)并發(fā)執(zhí)行問題

    我們?nèi)粘i_發(fā)過程,會(huì)有一些定時(shí)任務(wù)的代碼來統(tǒng)計(jì)一些系統(tǒng)運(yùn)行數(shù)據(jù),但是我們應(yīng)用有需要部署多個(gè)實(shí)例,傳統(tǒng)的通過配置文件來控制定時(shí)任務(wù)是否啟動(dòng)又太過繁瑣,而且還經(jīng)常出錯(cuò),導(dǎo)致一些異常數(shù)據(jù)的產(chǎn)生 網(wǎng)上有很多分布式鎖的實(shí)現(xiàn)方案,基于redis、zk、等有很多,但

    2023年04月18日
    瀏覽(25)
  • 自定義注解,基于redis實(shí)現(xiàn)分布式鎖

    自定義注解,基于redis實(shí)現(xiàn)分布式鎖

    1.1、注解的基礎(chǔ)知識(shí) 實(shí)現(xiàn)自定義注解其實(shí)很簡單,格式基本都差不多。也就參數(shù)可能變一變。 @Retention:取值決定了注解在什么時(shí)候生效,一般都是取運(yùn)行時(shí),也就是RetentionPolicy.RUNTIME。 @Target:決定了這個(gè)注解可以使用在哪些地方,可以取方法,字段,類等。 注解這就定義

    2024年02月08日
    瀏覽(20)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包