開源地址:https://gitee.com/LIEN321/mydata-blade
詳細介紹:MyData 基于 Web API 的數(shù)據(jù)集成平臺 v0.7.0
部署文檔:用 Docker 部署 MyData v0.7.1
使用手冊:MyData 使用手冊v0.7.1
交流Q群:430089673
MyData后端結構
MyData的后端由3個子服務組成,分別是管理服務
、任務服務
、業(yè)務數(shù)據(jù)服務
;
- 管理服務:通過項目、數(shù)據(jù)標準、應用API、環(huán)境的管理 配置出同步業(yè)務數(shù)據(jù)的任務;
- 任務服務:根據(jù)配置的任務 定時調(diào)用應用API和數(shù)據(jù)服務 實現(xiàn)業(yè)務數(shù)據(jù)的傳輸和存儲;
- 數(shù)據(jù)服務:封裝業(yè)務數(shù)據(jù)的隔離機制和讀寫操作;
依賴的組件:
- MySQL:存儲管理數(shù)據(jù);
- Redis:緩存管理數(shù)據(jù)和任務;
- MongoDB;存儲業(yè)務數(shù)據(jù);
下圖從數(shù)據(jù)流角度 展示3個子服務的關聯(lián):
注:開源版本采用單體SpringBoot;
任務服務
配置任務
任務主要包括:項目環(huán)境、數(shù)據(jù)標準、應用API、任務類型、字段映射、任務周期;文章來源:http://www.zghlxwxcb.cn/news/detail-839641.html
- 項目環(huán)境:確定應用API的統(tǒng)一前綴地址;
- 數(shù)據(jù)標準:明確集成的業(yè)務數(shù)據(jù)的數(shù)據(jù)結構;
- 應用API: 業(yè)務數(shù)據(jù)的傳輸通道;
- 任務類型:明確數(shù)據(jù)的傳輸方向,
提供數(shù)據(jù)
表示從應用API讀取業(yè)務員數(shù)據(jù)、消費數(shù)據(jù)
表示向應用API發(fā)送業(yè)務數(shù)據(jù); - 字段映射:配置接口響應結構中 與標準數(shù)據(jù)字段的映射關系;
- 任務周期:定期執(zhí)行任務的時間間隔,格式為cron表達式;
任務流程
數(shù)據(jù)集成的任務執(zhí)行流程如下圖:文章來源地址http://www.zghlxwxcb.cn/news/detail-839641.html
- 任務服務啟動時(即MyData系統(tǒng)啟動),查詢所有運行狀態(tài)的任務;
public class JobExecutor implements ApplicationRunner {
...
@Override
public void run(ApplicationArguments args) {
// 移除已有緩存
jobCache.removeAll();
// 查詢已啟動的任務
List<Task> tasks = taskService.listRunningTasks();
log.info("tasks.size() = " + tasks.size());
if (CollUtil.isNotEmpty(tasks)) {
tasks.forEach(this::startTask);
}
}
...
}
- 根據(jù)任務的cron表達式,計算任務的下次執(zhí)行時間;
/**
* 根據(jù) 任務的上次執(zhí)行時間 和 設定間隔規(guī)則,計算任務的 下次執(zhí)行時間
*
* @param taskInfo 定時任務
*/
private void calculateNextRunTime(TaskInfo taskInfo) {
Assert.notNull(taskInfo);
Assert.notEmpty(taskInfo.getTaskPeriod());
Date date = taskInfo.getStartTime();
if (taskInfo.getFailCount() > 0) {
date = taskInfo.getNextRunTime();
}
CronExpression cronExpression = new CronExpression(taskInfo.getTaskPeriod());
Date nextRunTime = cronExpression.getNextValidTimeAfter(date);
taskInfo.setNextRunTime(nextRunTime);
}
- 計算任務的下次執(zhí)行時間 與 當前時間的時間差,以時間差作為緩存失效期 將任務存入redis緩存;
/**
* 緩存任務
*
* @param taskInfo 任務對象
* @throws IllegalArgumentException 緩存時長無效
*/
public void cacheJob(TaskInfo taskInfo) throws IllegalArgumentException {
// 計算任務緩存有效時長
long expire = DateUtil.between(taskInfo.getStartTime(), taskInfo.getNextRunTime(), DateUnit.SECOND);
if (expire <= 0) {
throw new IllegalArgumentException(StrUtil.format("expire <= 0, startTime = {}, nextRunTime = {}"
, DateUtil.format(taskInfo.getStartTime(), DatePattern.NORM_DATETIME_MS_PATTERN)
, DateUtil.format(taskInfo.getNextRunTime(), DatePattern.NORM_DATETIME_MS_PATTERN)));
}
redisUtil.set(CACHE_TASK + taskInfo.getId(), taskInfo);
redisUtil.set(CACHE_JOB + taskInfo.getId(), taskInfo.getId(), expire);
taskInfo.appendLog("任務存入redis,緩存時長 {} 秒", expire);
}
- 通過監(jiān)聽redis的key失效事件,獲得待執(zhí)行的任務;
public class RedisKeyExpiredListener implements MessageListener {
private final JobExecutor jobExecutor;
@Override
public void onMessage(Message message, byte[] pattern) {
String expiredKey = message.toString();
if (StrUtil.startWith(expiredKey, JobCache.CACHE_JOB)) {
String taskId = StrUtil.subSuf(expiredKey, JobCache.CACHE_JOB.length());
jobExecutor.notify(taskId);
}
}
}
- 將任務加入待執(zhí)行的線程池,隨后即可執(zhí)行
/**
* 任務存入執(zhí)行隊列
*
* @param taskInfo 任務
*/
private void executeJob(TaskInfo taskInfo) {
taskInfo.appendLog("任務存入執(zhí)行隊列");
Runnable runnable = new JobThread(taskInfo);
getThreadPoolExecutor().execute(runnable);
}
- 根據(jù)任務類型分別執(zhí)行
提供數(shù)據(jù)
和消費數(shù)據(jù)
流程;- 提供數(shù)據(jù)
- 調(diào)用應用API,獲取json格式數(shù)據(jù);
- 根據(jù)任務中字段映射 解析json為業(yè)務數(shù)據(jù)Map集合;
- 調(diào)用數(shù)據(jù)服務 將業(yè)務數(shù)據(jù)存入MongoDB;
- 提供數(shù)據(jù)
case MdConstant.DATA_PRODUCER:
// 調(diào)用api 獲取json
String json = ApiUtil.read(taskInfo);
// 將json按字段映射 解析為業(yè)務數(shù)據(jù)
jobDataService.parseData(taskInfo, json);
// 根據(jù)條件過濾數(shù)據(jù)
jobDataFilterService.doFilter(taskInfo);
// 保存業(yè)務數(shù)據(jù)
jobDataService.saveTaskData(taskInfo);
// 更新環(huán)境變量
jobVarService.saveVarValue(taskInfo, json);
break;
- 消費數(shù)據(jù)
- 根據(jù)任務所選數(shù)據(jù)標準,查詢業(yè)務數(shù)據(jù);
- 再根據(jù)字段映射,將業(yè)務數(shù)據(jù) 轉為指定的json對象集合;
- 調(diào)用應用API,傳輸json數(shù)據(jù);
case MdConstant.DATA_CONSUMER:
List<BizDataFilter> filters = taskInfo.getDataFilters();
if (CollUtil.isNotEmpty(filters)) {
// 解析過濾條件值中的 自定義字符串
parseFilterValue(filters);
// 排除值為null的條件
filters = filters.stream().filter(filter -> filter.getValue() != null).collect(Collectors.toList());
}
// 根據(jù)過濾條件 查詢數(shù)據(jù)
String dataCode = taskInfo.getDataCode();
if (StrUtil.isNotEmpty(dataCode)) {
List<Map> dataList = bizDataDAO.list(MdUtil.getBizDbCode(taskInfo.getTenantId(), taskInfo.getProjectId(), taskInfo.getEnvId()), dataCode, filters);
taskInfo.setConsumeDataList(dataList);
// 根據(jù)字段映射轉換為api參數(shù)
jobDataService.convertData(taskInfo);
}
// 調(diào)用api傳輸數(shù)據(jù)
ApiUtil.write(taskInfo);
break;
- 保存任務執(zhí)行日志;
到了這里,關于[自研開源] MyData 數(shù)據(jù)集成的任務流程 v0.7.1的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!