?總體思路是,主節(jié)點(diǎn)接收到任務(wù)請求,將根據(jù)任務(wù)情況拆分成多個(gè)任務(wù)塊,將任務(wù)塊標(biāo)識(shí)的主鍵放入redis。發(fā)送redis消息,等待其他節(jié)點(diǎn)運(yùn)行完畢,結(jié)束處理。接收到信息的節(jié)點(diǎn)注冊本節(jié)點(diǎn)信息到redis、開啟多線程、獲取任務(wù)塊、執(zhí)行任務(wù)、結(jié)束處理。
1、主節(jié)點(diǎn)接收任務(wù)請求
@Override
public void executeTaskInfo(PrepareDTO prepareDTO) {
//異常標(biāo)記
String taskInfo = prepareDTO.getTaskId();
//任務(wù)分組狀態(tài)
String taskStatus = "";
try {
log.info("數(shù)據(jù)準(zhǔn)備任務(wù)并設(shè)定任務(wù)執(zhí)行狀態(tài),{}", prepareDTO);
this.dataPrepareBo.doStartGroupJobInfo(prepareDTO);
//給redis集合中放計(jì)算對象
log.info("開始放入計(jì)算任務(wù):{}", prepareDTO);
boolean getTaskFlag = this.dataPrepareBo.pushCalculationObject(prepareDTO);
if (!getTaskFlag) {
taskStatus = String.format("沒有獲取數(shù)據(jù)或計(jì)劃已取消,%s", taskInfo);
log.error(taskStatus);
throw new Exception(taskStatus);
}
//發(fā)消息執(zhí)行緩存中任務(wù)
log.info("發(fā)消息執(zhí)行任務(wù):{}", prepareDTO);
sendMessage(prepareDTO);
//等待任務(wù)執(zhí)行完畢
log.info("等待任務(wù)執(zhí)行結(jié)果");
taskStatus = this.getGroupUpLoadTaskFinsh(prepareDTO);
} catch (Exception e) {//捕獲日志
e.printStackTrace();
taskStatus = "獲取任務(wù)狀態(tài)異常" + e;
log.info(taskStatus);
dataPrepareBo.putExceptionMsg2Cache(taskInfo, "數(shù)據(jù)準(zhǔn)備分發(fā)計(jì)算任務(wù)線程異常:" + taskStatus);
} finally {
//做任務(wù)結(jié)束處理
this.doGroupTaskFinshpPocess(prepareDTO, taskStatus);
}
}
2,發(fā)送消息
@Override
public void sendMessage(String topic, String msg) {
this.redisTemplate.convertAndSend(topic, msg);
}
3,節(jié)點(diǎn)接收任務(wù),并執(zhí)行
public void doUpLoadTask(String msg) throws Exception {
log.info("開始執(zhí)行明細(xì)任務(wù){(diào)}" + msg);
String taskId = this.getTaskId(msg);
try {
Object cancelFlag = this.redisTemplate.opsForValue().get(String.format(EngineConstant.JOB_CANCEL_FLAG, taskId));
if(cancelFlag != null && "1".equals(cancelFlag.toString())){
log.info("本次任務(wù)已取消");
return;
}
//上傳本機(jī)執(zhí)行信息到redis
this.cacheBo.initGroupUpLoadTaskStats(taskId,ENGINE_DISTRIBUTION_RUNNING.getKey());
//從緩存獲取任務(wù),獲取任務(wù)后啟線程執(zhí)行任務(wù)。如果沒獲取到任務(wù),則本節(jié)點(diǎn)任務(wù)執(zhí)行完畢
//循環(huán)獲取任務(wù)
this.groupTaskProcessBO.doGroupTaskProcess(taskId, null);
//處理結(jié)束
this.cacheBo.finishGroupUpLoadTaskStats(taskId,ENGINE_DISTRIBUTION_RUNNING.getKey());
} catch (Exception e) {
//記錄日志
taskUpldExeLogCDTO.setRunStas("-1");
String exceptionInfo = this.taskLogUtils.getExceptionInfo(e) ;
taskUpldExeLogCDTO.setAbnInfo(exceptionInfo);
throw e;
} finally {
//記錄日志
taskUpldExeLogCDTO.setEndtime(DateUtil.getCurrentDate());
if("-1".equals(taskUpldExeLogCDTO.getRunStas())){//異常結(jié)束
this.taskLogUtils.sendLogInfo(taskUpldExeLogCDTO,"執(zhí)行上傳任務(wù)異常");
} else {//正常結(jié)束
taskUpldExeLogCDTO.setRunStas("1");
this.taskLogUtils.sendLogInfo(taskUpldExeLogCDTO,"執(zhí)行上傳任務(wù)正常");
}
}
}
4,開啟線程執(zhí)行任務(wù)
@Override
public CalculationDTO doGroupTaskProcess(String taskId, TaskUpldExeLogCDTO taskUpldExeLogCDTO) throws Exception {
List<Future> futureList = new ArrayList<>();
//開始執(zhí)行明細(xì)任務(wù)處理
ThreadPoolTaskExecutor taskTransferExecutor = ToolUtil.getExecutor("engine-file-tasks-pool-", Math.min(parallelProcessNum,10), 8);
ExecutorListHolder.putThreadPool(String.format(GroupConstant.PREPARE_ENGINE_POOL,taskId), taskTransferExecutor.getThreadPoolExecutor());
for(int i = 0 ; i < parallelProcessNum ; i++) {
DoGroupUpLoadTaskThread doGroupUpLoadTaskThread = new DoGroupUpLoadTaskThread(taskId
, redisTemplate, calculationBo, null, null);
Future<?> future = taskTransferExecutor.submit(doGroupUpLoadTaskThread);
futureList.add(future);
}
if (!CollectionUtil.isEmpty(futureList)) {
futureList.forEach(f -> {
try {
f.get(GroupTaskProcessBOImpl.maxTime, TimeUnit.SECONDS);
} catch (Exception e) {
e.printStackTrace();
}
});
}
log.info("本節(jié)點(diǎn)執(zhí)行分組任務(wù)執(zhí)行完畢{}", taskId + ":" + GroupConstant.IDENTITY);
return null;
}
5,線程執(zhí)行明細(xì)文章來源:http://www.zghlxwxcb.cn/news/detail-667404.html
@Override
public ResponseDTO call() throws Exception {
//執(zhí)行任務(wù)
while(true) {
FilterTableUniqueDTO filterTableUniqueDTO = (FilterTableUniqueDTO)this.redisTemplate.opsForList().leftPop(String.format(ENGINE_FILTERTABLEUNIQUE_QUEUE.getKey(), taskId));
log.debug("取出任務(wù):" + filterTableUniqueDTO);
if(null == filterTableUniqueDTO) {
break ;
}
long lastNum = this.redisTemplate.opsForList().size(String.format(ENGINE_FILTERTABLEUNIQUE_QUEUE.getKey(), taskId));
log.info("生成文件剩余任務(wù)數(shù)量:" + lastNum);
// 處理任務(wù)
calculationBo.GenerateFile(filterTableUniqueDTO, taskUpldDetlLogCDTO);
}
return null;
}
以上是主要入口總體思路涉及代碼,詳細(xì)實(shí)現(xiàn)整理起來涉及內(nèi)容比較繁多,將在第二部分分享。文章來源地址http://www.zghlxwxcb.cn/news/detail-667404.html
到了這里,關(guān)于一種基于springboot、redis的分布式任務(wù)引擎的實(shí)現(xiàn)(一)的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!