DolphinDB 是一款高性能時(shí)序數(shù)據(jù)庫。DolphinDB 集成了功能強(qiáng)大的編程語言和高容量高速度的批流一體數(shù)據(jù)分析系統(tǒng),為海量數(shù)據(jù)(特別是時(shí)間序列數(shù)據(jù))的快速存儲(chǔ)、檢索、計(jì)算及分析提供一站式解決方案。在實(shí)際生產(chǎn)環(huán)境中,經(jīng)常存在數(shù)據(jù)導(dǎo)入、轉(zhuǎn)換、查詢計(jì)算,更新等一系列流程任務(wù),各個(gè)部分之間存在依賴,如何將這些 DolphinDB 任務(wù)按照需求準(zhǔn)確、有效率地調(diào)度,可以借用 DolphinScheduler 任務(wù)調(diào)度器。
本文將從生產(chǎn)環(huán)境中的一個(gè) ETL 場(chǎng)景出發(fā),將 DolphinScheduler 引入到 DolphinDB 的高可用集群中,通過使用 DolphinScheduler 提供的功能來調(diào)度 DolphinDB 的數(shù)據(jù) ETL 作業(yè)。
1. Apache DolphinScheduler
Apache DolphinScheduler 是一個(gè)分布式易擴(kuò)展的可視化 DAG 工作流任務(wù)調(diào)度開源系統(tǒng)。該系統(tǒng)適用于企業(yè)級(jí)場(chǎng)景,提供了一個(gè)支持可視化操作任務(wù)、工作流和全生命周期數(shù)據(jù)處理的解決方案,解決了數(shù)據(jù)研發(fā) ETL 依賴錯(cuò)綜復(fù)雜,無法監(jiān)控任務(wù)健康狀態(tài)的問題。 DolphinScheduler 以 DAG(Directed Acyclic Graph,DAG)流式方式組裝任務(wù),可以及時(shí)監(jiān)控任務(wù)的執(zhí)行狀態(tài),支持重試、指定節(jié)點(diǎn)恢復(fù)失敗、暫停、恢復(fù)、終止任務(wù)等操作。
1.1 特性
- 執(zhí)行定時(shí)任務(wù):在生產(chǎn)環(huán)境中,一個(gè)普遍的需求是周期性地從數(shù)據(jù)源中提取、轉(zhuǎn)換、加載數(shù)據(jù)到 DolphinDB,如每一天、每個(gè)小時(shí)等,DolphinScheduler 可以方便地進(jìn)行定時(shí)管理,滿足需求。
- 執(zhí)行歷史任務(wù):有時(shí)候我們由于業(yè)務(wù)變動(dòng),需要將歷史數(shù)據(jù)進(jìn)行重新計(jì)算和加載入 DolphinDB。在這種情況下,我們僅需在 DolphinScheduler Web 界面上定義相應(yīng)任務(wù)的工作流,并定義和傳入開始時(shí)間和結(jié)束時(shí)間的參數(shù)。通過這個(gè)步驟,我們可以處理任意時(shí)間段的數(shù)據(jù)。
- 并行執(zhí)行任務(wù):在業(yè)務(wù)中,我們可能需要同時(shí)處理多項(xiàng)數(shù)據(jù)ETL任務(wù),如同時(shí)導(dǎo)入數(shù)據(jù)到 DolphinDB 的不同表或同一個(gè)表的不同分區(qū)。DolphinScheduler 允許我們并行執(zhí)行多項(xiàng)工作流任務(wù),提高執(zhí)行效率。
- 高效處理編排:在生產(chǎn)環(huán)境中,大多數(shù)情況下存在任務(wù)之間、工作流之間有條件地執(zhí)行,比如在 DolphinDB 的數(shù)據(jù)ETL中,下游任務(wù)依賴于上游任務(wù)的執(zhí)行狀態(tài)做不同的操作,有的任務(wù)之間關(guān)系錯(cuò)綜復(fù)雜??梢栽?DolphinScheduler 中按照邏輯對(duì)工作流進(jìn)行定義,輕松編排任務(wù)之間的關(guān)系。
1.2 安裝部署
DolphinScheduler 可在單機(jī)、單服務(wù)器集群、多服務(wù)器集群、K8S環(huán)境下部署,本節(jié)內(nèi)容將以單機(jī)部署流程作為演示內(nèi)容,僅供參考。
前置條件
- JDK:安裝JDK(1.8+)并配置JAVA_HOME環(huán)境變量,DolphinScheduler的啟動(dòng)依賴于該環(huán)境變量,同時(shí)將其下的bin目錄追加到PATH環(huán)境變量中。
- 二進(jìn)制包:已配置DolphinDB數(shù)據(jù)源的DolphinScheduler版本,下載鏈接在https://cdn.dolphindb.cn/downloads/apache-dolphinscheduler-3.1.7-bin.tar.gz
- 本教程將MySQL作為 DolphinScheduler 持久化的元數(shù)據(jù)庫,因此要保證服務(wù)器已安裝好MySQL,若沒有,需要下載安裝,所有平臺(tái)的mysql下載地址為:MySQL 下載地址
元數(shù)據(jù)持久化配置
單機(jī)服務(wù)使用H2數(shù)據(jù)庫來存儲(chǔ)元數(shù)據(jù),而H2數(shù)據(jù)庫是一種內(nèi)存級(jí)別的數(shù)據(jù)庫,因此當(dāng)DolphinScheduler程序重啟時(shí),會(huì)導(dǎo)致之前定義的工作流等內(nèi)容全部丟失,需要重新定義,造成效率低下和不必要的麻煩。因此,將元數(shù)據(jù)持久化是非常有必要的,DolphinScheduler支持MySQL和PostgreSQL作為元數(shù)據(jù)的存儲(chǔ)數(shù)據(jù)庫,本文以配置MySQL為例,主要有以下流程:
- 解壓DolphinScheduler程序包
tar -xvzf apache-dolphinscheduler-3.1.7-bin.tar.gz
cd apache-dolphinscheduler-3.1.7-bin
- 進(jìn)入MySQL,創(chuàng)建數(shù)據(jù)庫和用戶
// 創(chuàng)建數(shù)據(jù)庫
CREATE DATABASE dolphinscheduler DEFAULT CHARACTER SET utf8 DEFAULT COLLATE utf8_general_ci;
// 創(chuàng)建用戶,并設(shè)置密碼
CREATE USER 'dolphinscheduler'@'%' IDENTIFIED BY '密碼';
// 給用戶賦予庫的權(quán)限
GRANT ALL PRIVILEGES ON dolphinscheduler.* TO 'dolphinscheduler'@'%';
flush privileges;
- 修改
apache-dolphinscheduler-3.1.7-bin/bin/env/dolphinscheduler_env.sh
文件設(shè)定環(huán)境變量,將{user}
和{password}
改為上一步創(chuàng)建的用戶名和密碼
export DATABASE=mysql
export SPRING_PROFILES_ACTIVE=${DATABASE}
export SPRING_DATASOURCE_URL="jdbc:mysql://127.0.0.1:3306/dolphinscheduler?useUnicode=true&characterEncoding=UTF-8&useSSL=false"
export SPRING_DATASOURCE_USERNAME={user}
export SPRING_DATASOURCE_PASSWORD={password}
- 修改
apache-dolphinscheduler-3.1.7-bin/standalone-server/conf/application.yaml
文件中的配置(在文件尾部),上半部分由于這里用的不是postgresql,直接注釋掉就好。將{user}
和{password}
改為上面創(chuàng)建的用戶名和密碼
---
#spring:
# config:
# activate:
# on-profile: postgresql
# quartz:
# properties:
# org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.PostgreSQLDelegate
# datasource:
# driver-class-name: org.postgresql.Driver
# url: jdbc:postgresql://127.0.0.1:5432/dolphinscheduler
# username: root
# password: root
---
spring:
config:
activate:
on-profile: mysql
sql:
init:
schema-locations: classpath:sql/dolphinscheduler_mysql.sql
datasource:
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/dolphinscheduler?useUnicode=true&characterEncoding=UTF-8
username: {user}
password: {password}
- 初始化數(shù)據(jù)庫,如果上述步驟沒有問題,這里就不會(huì)報(bào)錯(cuò):
bash apache-dolphinscheduler-3.1.7-bin/tools/bin/upgrade-schema.sh
- 執(zhí)行完成之后進(jìn)入 MySQL 查詢會(huì)發(fā)現(xiàn)名稱為
dolphinscheduler
的數(shù)據(jù)庫已經(jīng)生成了很多表格
啟動(dòng)DolphinScheduler單機(jī)服務(wù)器
注意:啟動(dòng)DolphinScheduler需要依賴多個(gè)端口號(hào),分別是:
12345、50052、25333、25334、1234、5678
使用lsof -i:<port>
檢查以上端口號(hào)是否被占用,如果有端口被別的進(jìn)程占用,修改apache-dolphinscheduler-3.1.7-bin/standalone-server/conf/application.yaml
中對(duì)應(yīng)的端口
- 當(dāng)配置好以上內(nèi)容之后,進(jìn)入執(zhí)行
apache-dolphinscheduler-3.1.7-bin
目錄并執(zhí)行以下命令啟動(dòng):
bash ./bin/dolphinscheduler-daemon.sh start standalone-server
2. 輸入以下命令查看是否執(zhí)行成功
? a. 運(yùn)行jps
查看相應(yīng)實(shí)例是否已在進(jìn)程中
? b. 運(yùn)行bash ./bin/dolphinscheduler-daemon.sh status standalone-server
查看 standalone-server的運(yùn)行狀態(tài)
3. 停止運(yùn)行
bash ./bin/dolphinscheduler-daemon.sh stop standalone-server
4. 啟動(dòng)成功后,在瀏覽器中輸入服務(wù)器IP:12345/dolphinscheduler/ui/login
進(jìn)行登錄
a. 默認(rèn)用戶名:admin
? b. 默認(rèn)密碼:dolphinscheduler123
登陸成功后將會(huì)看到如下頁面:
- 重啟dolphinscheduler服務(wù),測(cè)試已經(jīng)連接好數(shù)據(jù)庫
1.3 DolphinDB 與 DolphinScheduler 結(jié)合
DolphinDB 作為強(qiáng)大的高性能時(shí)序數(shù)據(jù)庫,能夠高效存儲(chǔ)和處理 PB 級(jí)的海量數(shù)據(jù)集,可以通過編寫腳本實(shí)現(xiàn)數(shù)據(jù)的處理、存儲(chǔ)、因子計(jì)算、建模、回測(cè)等任務(wù)。在實(shí)際生產(chǎn)中,源數(shù)據(jù)下載、數(shù)據(jù)處理、數(shù)據(jù)入庫、數(shù)據(jù)校驗(yàn)、指標(biāo)計(jì)算等任務(wù)之間存在先后關(guān)系和條件關(guān)系,如果在 DolphinDB 腳本中編寫相關(guān)的邏輯關(guān)系代碼,一來會(huì)造成與實(shí)際任務(wù)不相關(guān)的腳本冗余,二來如果實(shí)際業(yè)務(wù)變動(dòng),需要增加或刪減部分任務(wù),相互依賴的任務(wù)之間的關(guān)系代碼也需要變動(dòng),造成更新迭代效率低下。
考慮到這種情況,如果能夠?qū)?DolphinDB 與 DolphinScheduler 結(jié)合起來,在 DolphinDB 中編寫相關(guān)任務(wù)代碼模塊,在 DolphinScheduler 上將這些任務(wù)按照邏輯編排調(diào)度,這樣,就能夠?qū)⑷蝿?wù)代碼和任務(wù)之間邏輯關(guān)系分開,每個(gè)部分專注于發(fā)揮自己的作用,實(shí)現(xiàn)更高效地運(yùn)行維護(hù)。
1.3.1 如何創(chuàng)建 DolphinDB 數(shù)據(jù)源
- 在安裝部署好DolphinScheduler之后,登錄其Web界面,點(diǎn)擊數(shù)據(jù)源中心并點(diǎn)擊創(chuàng)建數(shù)據(jù)源
2. 輸入相關(guān)參數(shù)定義,創(chuàng)建DolphinDB數(shù)據(jù)源
注意:數(shù)據(jù)庫名和jdbc連接參數(shù)不用填,不然會(huì)報(bào)錯(cuò) JDBC connect failed。
1.3.2 如何調(diào)度 DolphinDB 任務(wù)
創(chuàng)建DolphinDB數(shù)據(jù)源后,需要?jiǎng)?chuàng)建租戶、項(xiàng)目、工作流,具體流程可參考:DolphinScheduler 部署流程
定義好工作流之后,點(diǎn)擊該工作流進(jìn)入操作界面,接著在左端拖拽SQL 節(jié)點(diǎn)進(jìn)行DolphinDB任務(wù)定義
注意:SQL類型分為查詢類型和非查詢類型,這兩種類型分別適用于不同的使用場(chǎng)景,在本文 2.5.2 小節(jié)中會(huì)詳細(xì)介紹。
由于在SQL任務(wù)節(jié)點(diǎn)中,每次只能執(zhí)行一行DolphinDB代碼。因此,調(diào)度DolphinDB任務(wù)主要有以下兩種途徑:
- run 函數(shù)
假設(shè)DolphinDB腳本在服務(wù)器上的路徑是:/data/script.dos
,那么,在SQL語句上可以輸入:
run("/data/script.dos");
2. 函數(shù)視圖
? a. 在DolphinDB編寫函數(shù):
// 在DolphinDB中定義一個(gè)函數(shù),用于創(chuàng)建數(shù)據(jù)庫表
def createTable(dbName, tbName){
login("admin", "123456")
if(!existsDatabase(dbName)){
db1 = database(, VALUE, 2020.01.01..2021.01.01)
db2 = database(, HASH, [SYMBOL, 10])
db = database(dbName, COMPO, [db1, db2], , "TSDB")
}else{
db = database(dbName)
}
if(!existsTable(dbName,tbName)){
name =`SecurityID`ChannelNo`ApplSeqNum`MDStreamID`SecurityIDSource`Price
`OrderQty`Side`TradeTIme`OrderType`OrderIndex`LocalTime`SeqNo
`Market`DataStatus`BizIndex
type = [SYMBOL, INT, LONG, INT, INT, DOUBLE, INT,
SYMBOL, TIMESTAMP, SYMBOL, INT, TIME, LONG, SYMBOL,INT,LONG]
schemaTable = table(1:0, name, type)
db.createPartitionedTable(table=schemaTable, tableName=tbName,
partitionColumns=`TradeTime`SecurityID,
compressMethods={TradeTime:"delta"},
sortColumns=`Market`SecurityID`TradeTime, keepDuplicates=ALL)
}
else{
writeRunLog("數(shù)據(jù)庫:" + dbName + " 數(shù)據(jù)表:" + tbName + " 已存在...")
}
}
? b. 將函數(shù)添加為函數(shù)視圖,作用是使得該函數(shù)視圖全局可用
// 添加為函數(shù)視圖
addFunctionView(createTable)
? c. 在dolphinScheduler上調(diào)用該函數(shù)視圖
// 在SQL語句內(nèi)輸入以下內(nèi)容:
createTable("dfs://testDb", "testTb");
上述兩種方法的區(qū)別是:
- 使用
run
執(zhí)行腳本不能傳遞參數(shù),靈活性較差 - 使用函數(shù)視圖可以在DolphinScheduler上面傳入?yún)?shù),以上面為例,可以在SQL語句中輸入:
createTable(${dbName}, ${tbName});
傳入?yún)?shù)的方法有局部參數(shù)和全局參數(shù)兩種方法,它們的區(qū)別是全局參數(shù)可以在創(chuàng)建工作流實(shí)例時(shí)直接進(jìn)行修改,而局部參數(shù)需要進(jìn)入到工作流中的具體任務(wù)節(jié)點(diǎn)進(jìn)行修改:
- 局部參數(shù)在定義任務(wù)節(jié)點(diǎn)時(shí)定義
2. 全局參數(shù)在保存工作流時(shí)定義
2. 調(diào)度 DolphinDB 數(shù)據(jù) ETL 任務(wù)
2.1 任務(wù)流程結(jié)構(gòu)
- 文件結(jié)構(gòu)
2.1.1 DolphinDB 功能模塊部分
-
stockData: 數(shù)據(jù)導(dǎo)入模塊
-
createStockTable.dos
: 定義創(chuàng)建相應(yīng)的股票數(shù)據(jù)庫表的函數(shù)。 -
stockDataProcess.dos
: 定義將源數(shù)據(jù)進(jìn)行清洗、處理與格式轉(zhuǎn)換的函數(shù)。 -
stockDataLoad.dos
: 定義將數(shù)據(jù)導(dǎo)入相應(yīng)的DolphinDB庫表的函數(shù)。
-
-
minFactor: 分鐘線因子指標(biāo)相關(guān)模塊
-
createMinFactorTable.dos
: 定義創(chuàng)建分鐘線因子指標(biāo)庫表的函數(shù)。 -
computeMinFactor.dos
: 定義計(jì)算分鐘線因子指標(biāo)的函數(shù)。
-
-
dataCheck: 數(shù)據(jù)校驗(yàn)?zāi)K
-
stockCheck.dos
: 定義校驗(yàn)導(dǎo)入的股票逐筆委托、快照行情、逐筆成交數(shù)據(jù)的函數(shù)。 -
minFactorCheck.dos
: 定義校驗(yàn)分鐘線因子指標(biāo)數(shù)據(jù)的函數(shù)。
-
2.1.2 DolphinDB 腳本部分
-
initTable.dos
: 執(zhí)行該腳本可以創(chuàng)建相應(yīng)的股票數(shù)據(jù)庫表和分鐘因子指標(biāo)庫表。 -
createFuncView.dos
: 執(zhí)行該腳本可以定義需要在dolphinScheduler上用到的函數(shù)視圖。
2.2 數(shù)據(jù)介紹
本文選取了 20230201 上交所某股票 level 2 委托數(shù)據(jù)、快照數(shù)據(jù)、成交數(shù)據(jù)作為演示。以下是逐筆委托表在DolphinDB的結(jié)構(gòu)。快照數(shù)據(jù)和成交數(shù)據(jù)結(jié)構(gòu)可在附件中查看:
字段名 | 字段含義 | 數(shù)據(jù)類型(DolphinDB) |
---|---|---|
ChannelNo | 通道代碼 | INT |
ApplSeqNum | 消息記錄號(hào) | LONG |
MDStreamID | 行情類別 | INT |
SecurityID | 證券代碼 | SYMBOL |
SecurityIDSource | 證券代碼源 | INT |
Price | 委托價(jià)格 | DOUBLE |
OrderQty | 委托數(shù)量 | INT |
Side | 委托買賣方向 | SYMBOL |
TradeTime | 委托時(shí)間 | TIMESTAMP |
OrderType | 委托類型 | SYMBOL |
OrderIndex | 委托序號(hào) | INT |
LocalTime | 本地接受時(shí)間戳 | TIME |
SeqNo | 消息序列號(hào) | LONG |
Market | 交易市場(chǎng) | SYMBOL |
DataStatus | 數(shù)據(jù)狀態(tài) | INT |
BizIndex | 業(yè)務(wù)序列號(hào) | LONG |
2.3 數(shù)據(jù)導(dǎo)入、指標(biāo)計(jì)算與校驗(yàn)任務(wù)
注意:以下各部分內(nèi)容均將相關(guān)函數(shù)定義在模塊中,以方便進(jìn)行工程化管理,關(guān)于DolphinDB模塊的創(chuàng)建、加載、調(diào)用方法,請(qǐng)參照:DolphinDB 模塊復(fù)用教程
2.3.1 數(shù)據(jù)清洗、處理、入表
由于源數(shù)據(jù)的字段結(jié)構(gòu)有時(shí)候不符合我們的業(yè)務(wù)需求,因此需要增刪、處理一些字段后再導(dǎo)入數(shù)據(jù)庫,下面以逐筆委托數(shù)據(jù)為例,介紹源數(shù)據(jù)經(jīng)清洗、處理后再導(dǎo)入庫表的過程,快照和成交數(shù)據(jù)的處理邏輯與委托數(shù)據(jù)相同,詳細(xì)內(nèi)容可以在附件中查看。
- 創(chuàng)建數(shù)據(jù)庫表
逐筆委托、快照、逐筆成交數(shù)據(jù)都保存在同一個(gè)庫中,本文采用了組合分區(qū)作為分區(qū)方案,第一層按天分區(qū),第二層對(duì)股票代碼分25個(gè)哈希分區(qū)。如何確定數(shù)據(jù)分區(qū)請(qǐng)參照:DolphinDB數(shù)據(jù)庫分區(qū)教程
module createStockTable
// 創(chuàng)建逐筆委托數(shù)據(jù)存儲(chǔ)庫表
def createEntrust(dbName, tbName,userName = "admin",password = "123456")
{
login(userName, password)
if(!existsDatabase(dbName))
{
db1 = database(, VALUE, 2020.01.01..2021.01.01)
db2 = database(, HASH, [SYMBOL, 25])
// 按天和股票組合分區(qū)
db = database(dbName, COMPO, [db1, db2], , "TSDB")
}
else
{
db = database(dbName)
}
name=`ChannelNo`ApplSeqNum`MDStreamID`SecurityID`SecurityIDSource`Price`OrderQty`Side`TradeTIme`OrderType`OrderIndex`LocalTime`SeqNo`Market`DataStatus`BizIndex
type = [INT, LONG, INT, SYMBOL, INT, DOUBLE, INT, SYMBOL, TIMESTAMP, SYMBOL, INT, TIME, LONG, SYMBOL,INT,LONG]
schemaTable = table(1:0, name, type)
// 創(chuàng)建分區(qū)表
db.createPartitionedTable(table=schemaTable, tableName=tbName, partitionColumns=`TradeTime`SecurityID, compressMethods={TradeTime:"delta"}, sortColumns=`Market`SecurityID`TradeTime, keepDuplicates=ALL)
}
- csv數(shù)據(jù)清洗與處理
module stockData::stockDataProcess
// 定義逐筆委托csv數(shù)據(jù)文件中各個(gè)字段的名稱和字段類型
def schemaEntrust()
{
name = `DataStatus`OrderIndex`ChannelNo`SecurityID`TradeTime`OrderType`ApplSeqNum`Price`OrderQty`Side`BizIndex`LocalTime`SeqNo
typeString = `INT`LONG`INT`SYMBOL`TIME`SYMBOL`INT`DOUBLE`INT`SYMBOL`INT`TIME`INT
return table(name, typeString)
}
// 數(shù)據(jù)處理函數(shù),包括字段增加,數(shù)據(jù)去重等操作
def processEntrust(loadDate, mutable t)
{
// 字段名替換
t.replaceColumn!(`TradeTime, concatDateTime(day, t.TradeTime))
n1 = t.size()
// 數(shù)據(jù)去重
t = select * from t where isDuplicated([DataStatus, OrderIndex, ChannelNo, SecurityID, TradeTime, OrderType, ApplSeqNum, Price, OrderQty, Side, BizIndex],FIRST)=false
n2 = t.size()
// 增加字段
update t set Market = `sh
update t set MDStreamID = int(NULL)
update t set SecurityIDSource = int(NULL)
reorderColumns!(t, `ChannelNo`ApplSeqNum`MDStreamID`SecurityID`SecurityIDSource`Price`OrderQty`Side`TradeTime`OrderType`OrderIndex`LocalTime`SeqNo`Market`DataStatus`BizIndex)
return t,n1,n2
}
- 將處理后的數(shù)據(jù)導(dǎo)入數(shù)據(jù)庫表
module stockData::stockDataLoad
use stockData::stockDataProcess
def loadEntrust(userName, userPassword, startDate, endDate, dbName, tbName, filePath, loadType,mutable infoTb)
{
for(loadDate in startDate..endDate)
{
// 刪除已有數(shù)據(jù)
dateString = temporalFormat(loadDate,"yyyyMMdd")
dataCount = exec count(*) from loadTable(dbName, tbName) where date(tradeTime)=loadDate
// 如果表里面已經(jīng)存在當(dāng)天要處理的數(shù)據(jù),刪除庫里面已有數(shù)據(jù)
if(dataCount != 0){
msg = "Start to delete the entrust data, the delete date is: " + dateString
print(msg)
infoTb.tableInsert(msg)
dropPartition(database(dbName), loadDate, tbName)
msg = "Successfully deleted the entrust data, the delete date is: " + dateString
print(msg)
infoTb.tableInsert(msg)
}
// 數(shù)據(jù)導(dǎo)入
// 判斷數(shù)據(jù)csv文件是否存在
fileName = filePath + "/" + dateString + "/" + "entrust.csv"
if(!exists(fileName))
{
throw fileName + "不存在!請(qǐng)檢查數(shù)據(jù)源!"
}
// 如果是全市場(chǎng)數(shù)據(jù),數(shù)據(jù)量較大,因此分批導(dǎo)入
schemaTB = schemaEntrust()
tmpData1 = loadText(filename=fileName, schema=schemaTB)
tmpData1,n1,n2 = processEntrust(loadDate,tmpData1)
pt = loadTable(dbName,tbName)
msg = "the data size in csv file is :" + n2 + ", the duplicated count is " + (n1 - n2)
print(msg)
infoTb.tableInsert(msg)
for(i in 0..23)
{
startTime = 08:00:00.000 + 1200 * 1000 * i
tmpData2 = select * from tmpData1 where time(TradeTime)>=startTime and time(TradeTime)<(startTime+ 1200 * 1000)
if(size(tmpData2) < 1)
{
continue
}
//數(shù)據(jù)入庫
pt.append!(tmpData2)
}
msg = "successfully loaded!"
print(msg)
infoTb.tableInsert(msg)
}
}
2.3.2 K 分鐘線因子指標(biāo)計(jì)算
當(dāng)導(dǎo)入數(shù)據(jù)之后,我們希望根據(jù)業(yè)務(wù)、策略對(duì)數(shù)據(jù)進(jìn)行進(jìn)一步加工,形成分鐘線級(jí)別的因子指標(biāo),讓數(shù)據(jù)產(chǎn)生價(jià)值,驅(qū)動(dòng)業(yè)務(wù)發(fā)展。下面,我們以計(jì)算成交數(shù)據(jù)K分鐘線因子指標(biāo)為例,介紹該任務(wù)流程:
- 創(chuàng)建K分鐘線結(jié)果存儲(chǔ)表
module minFactor::createMinFactorTable
def createMinuteFactor(dbName, tbName)
{
if(existsDatabase(dbName)){
dropDatabase(dbName)
}
//按天分區(qū)
db = database(dbName, VALUE, 2021.01.01..2021.01.03,engine = `TSDB)
colName = `TradeDate`TradeTime`SecurityID`Open`High`Low`Close`Volume`Amount`Vwap
colType =[DATE, MINUTE, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, LONG, DOUBLE, DOUBLE]
tbSchema = table(1:0, colName, colType)
db.createPartitionedTable(table=tbSchema,tableName=tbName,partitionColumns=`TradeDate,sortColumns=`SecurityID`TradeTime,keepDuplicates=ALL)
}
- 計(jì)算K分鐘線因子指標(biāo)并入庫
module minFactor::computeMinFactor
def calFactorOneMinute(dbName, startDate, endDate, mutable factorTb,mutable infoTb)
{
pt = loadTable(dbName, "trade")
dayList = startDate..endDate
if(dayList.size()>12) dayList = dayList.cut(12)
for(days in dayList){
//計(jì)算分鐘 K 線
res = select first(TradePrice) as open, max(TradePrice) as high, min(TradePrice) as low, last(TradePrice) as close, sum(tradeQty) as volume,sum(TradePrice*TradeQty) as amount,sum(TradePrice*TradeQty)\sum(TradeQty) as vwap from pt where date(tradeTime) in days group by date(tradeTime) as TradeDate,minute(tradeTime) as TradeTime, SecurityID
msg = "Start to append minute factor result , the days is: [" + concat(days, ",")+"]"
print(msg)
infoTb.tableInsert(msg)
//分鐘 K 線入庫
factorTb.append!(res)
msg = "Successfully append the minute factor result to databse, the days is: [" + concat(days, ",")+"]"
print(msg)
infoTb.tableInsert(msg)
}
}
2.3.3 數(shù)據(jù)校驗(yàn)與 K 分鐘線指標(biāo)校驗(yàn)
由于K分鐘線因子指標(biāo)計(jì)算依賴于上游導(dǎo)入數(shù)據(jù)的正確性,而業(yè)務(wù)中又依賴于K分鐘線指標(biāo)數(shù)據(jù)的正確性,因此,對(duì)這兩部分?jǐn)?shù)據(jù)進(jìn)行校驗(yàn)是有必要的,下面介紹部分校驗(yàn)步驟,詳細(xì)校驗(yàn)內(nèi)容請(qǐng)參照附件:
- 股票數(shù)據(jù)校驗(yàn)
module dataCheck::stockCheck
def checkStockCounts(idate,dbName)
{
// 校驗(yàn)逐筆委托、快照行情、逐筆成交表的股票個(gè)數(shù)是否一致
getCodes = def (dbName,tbName,idate) {
tb = loadTable(dbName,tbName)
return exec distinct(SecurityID) from tb where date(tradetime)=idate and ((Market=`sh and SecurityID like "6%")or(Market=`sz and (SecurityID like "0%" or SecurityID like "3%" ) ))
}
entrustCodes = getCodes(dbName,"entrust",idate)
tradeCodes = getCodes(dbName,"trade",idate)
snapshotCodes = exec distinct(SecurityID) from loadTable(dbName,"snapshot") where date(tradetime)=idate and ((Market=`sh and SecurityID like "6%")or(Market=`sz and (SecurityID like "0%" or SecurityID like "3%" ))) and HighPrice != 0
if(entrustCodes.size() != snapshotCodes.size() or entrustCodes.size() != tradeCodes.size() or snapshotCodes.size() != tradeCodes.size())
{
throw "逐筆委托股票數(shù)量:" + size(entrustCodes) + " 快照行情股票數(shù)量:" + size(snapshotCodes) + " 逐筆成交股票數(shù)量:" + size(tradeCodes) + ", 它們數(shù)量不一致!"
}
}
- K分鐘線指標(biāo)校驗(yàn)
module dataCheck::minFactorCheck
def checkHighLowPrice(idate,dbName,tbName)
{
// 分鐘線最高價(jià)指標(biāo)與最低價(jià)指標(biāo)校驗(yàn)
tb= loadTable(dbName,tbName)
temp=select * from tb where tradedate=idate and High < Low
if(size(temp)>0)
{
throw "分鐘線計(jì)算錯(cuò)誤!分鐘線最高價(jià)小于最低價(jià)!"
}
}
def checkVolumeAmount(idate,dbName,tbName)
{
// 分鐘線交易量與交易額指標(biāo)校驗(yàn)
tb = loadTable(dbName,tbName)
temp = select * from loadTable(dbName,tbName) where tradedate=idate and ((Volume == 0 and Amount != 0) or (Volume != 0 and Amount == 0))
if(size(temp)>0)
{
throw "分鐘線計(jì)算錯(cuò)誤!交易量和交易額不同時(shí)為0!"
}
}
2.4 實(shí)現(xiàn) DolphinDB 任務(wù)調(diào)度
在定義好數(shù)據(jù)ETL模塊之后,就可以通過以下步驟實(shí)現(xiàn)DolphinDB ETL任務(wù)的調(diào)度:
- 創(chuàng)建相關(guān)庫表
第一次執(zhí)行時(shí),需要?jiǎng)?chuàng)建相關(guān)數(shù)據(jù)庫表
use stockData::createStockTable
use minFactor::createMinFactorTable
// 創(chuàng)建數(shù)據(jù)庫表,庫表名可以根據(jù)實(shí)際需要修改
createEntrust("dfs://stockData", "entrust")
createSnapshot("dfs://stockData", "snapshot")
createTrade("dfs://stockData", "trade")
createMinuteFactor("dfs://factorData", "stockMinFactor")
- 定義函數(shù)視圖并執(zhí)行
由于每日定時(shí)ETL任務(wù)與歷史批量ETL任務(wù)的整體處理邏輯相同,因此,在定義函數(shù)視圖時(shí)通過傳入?yún)?shù)來區(qū)分不同類型的任務(wù),以定義逐筆委托數(shù)據(jù)導(dǎo)入任務(wù)函數(shù)視圖為例,具體如下(全部函數(shù)視圖定義請(qǐng)參照附件):
use stockData::stockDataLoad
// 定義函數(shù)
def loadEntrustFV(userName="admin" , userPassword="123456", startDate = 2023.02.01, endDate = 2023.02.01, dbName = "dfs://stockData", tbName = "entrust", filePath = "/hdd/hdd8/ymchen", loadType = "daily")
{
infoTb = table(1:0,["info"] ,[STRING])
if(loadType == "daily")
{
sDate = today()
eDate = today()
loadEntrust(userName, userPassword, sDate, eDate, dbName, tbName, filePath, loadType,infoTb)
}
else if(loadType == "batch")
{
loadEntrust(userName, userPassword, date(startDate), date(endDate), dbName, tbName, filePath, loadType,infoTb)
}
return infoTb
}
注意:在定義函數(shù)視圖時(shí),一些默認(rèn)參數(shù)如數(shù)據(jù)存放路徑:filePath 需要根據(jù)實(shí)際情況進(jìn)行更改
- 創(chuàng)建DolphinDB任務(wù)節(jié)點(diǎn)
創(chuàng)建好每個(gè)任務(wù)的函數(shù)視圖之后,每個(gè)函數(shù)視圖對(duì)應(yīng)于DolphinScheduler上的一個(gè)任務(wù)節(jié)點(diǎn),以逐筆委托數(shù)據(jù)導(dǎo)入任務(wù)為例,我們分以下兩種情況:
- 對(duì)于每日定時(shí)任務(wù),由于我們?cè)诙x函數(shù)視圖時(shí)已經(jīng)針對(duì)每日定時(shí)任務(wù)做了默認(rèn)參數(shù)處理,因此僅需在DolphinScheduler任務(wù)節(jié)點(diǎn)的SQL語句中輸入:
loadEntrustFV();
2. 對(duì)于歷史批量任務(wù),我們需要傳入三個(gè)參數(shù):開始時(shí)間_startDate_,結(jié)束時(shí)間_endDate_以及任務(wù)類型_loadType_,在DolphinScheduler任務(wù)節(jié)點(diǎn)的SQL語句中需要輸入:
loadEntrustFV(startDate=${startDate},endDate=${endDate},loadType="batch");
注意:需要在DolphinScheduler上面定義局部參數(shù)或全局參數(shù)_startDate_和_endDate_,如何定義請(qǐng)參照本文 1.3.2 小節(jié)。
- 創(chuàng)建DolphinDB任務(wù)工作流
我們需要在DolphinScheduler上創(chuàng)建兩個(gè)工作流,一個(gè)是定時(shí)任務(wù)工作流,一個(gè)是歷史批量任務(wù)工作流。在每個(gè)工作流中需要根據(jù)ETL流程編排具有邏輯關(guān)系的任務(wù)節(jié)點(diǎn)。以歷史批量任務(wù)為例,創(chuàng)建如下工作流:
創(chuàng)建任務(wù)工作流之后,點(diǎn)擊運(yùn)行按鈕就可以開始執(zhí)行,點(diǎn)擊定時(shí)按鈕就可以進(jìn)行定時(shí)管理
運(yùn)行任務(wù)后,工作流實(shí)例為綠色代表整個(gè)工作流運(yùn)行成功;黑色則表示存在失敗任務(wù),可以通過雙擊失敗的工作流實(shí)例查看具體是哪個(gè)任務(wù)執(zhí)行失敗。
在DolphinScheduler中,可以導(dǎo)入工作流和導(dǎo)出工作流,以上介紹的DolphinDB每日任務(wù)與批量任務(wù),可以直接通過附件中對(duì)應(yīng)的json文件直接導(dǎo)入。
- 調(diào)度DolphinDB ETL 工作流
導(dǎo)入工作流之后,點(diǎn)擊執(zhí)行按鈕就可以開始執(zhí)行
2.5 獲取 DolphinDB 任務(wù)調(diào)度結(jié)果
2.5.1 查看 DolphinDB 任務(wù)執(zhí)行情況
在DolphinScheduler上運(yùn)行DolphinDB工作流任務(wù)之后,可以通過以下步驟查看工作流任務(wù)運(yùn)行情況:
- 進(jìn)入工作流界面,可以看到所有工作流實(shí)例的狀態(tài),在狀態(tài)欄,齒輪形狀代表正在運(yùn)行,綠色打勾代表工作流任務(wù)成功運(yùn)行,黑色打叉代表工作流任務(wù)運(yùn)行失敗。
- 在工作流實(shí)例名稱下,點(diǎn)擊想要查看的工作流實(shí)例,進(jìn)入該工作流詳情界面:
- 如上圖所示,我們可以看到股票委托、快照、成交數(shù)據(jù)導(dǎo)入任務(wù)成功了,但是股票數(shù)據(jù)校驗(yàn)任務(wù)失敗了,導(dǎo)致整個(gè)工作流任務(wù)執(zhí)行失敗。在該任務(wù)節(jié)點(diǎn)上點(diǎn)擊鼠標(biāo)右鍵,然后點(diǎn)擊查看日志,就可以查看該任務(wù)節(jié)點(diǎn)具體的報(bào)錯(cuò)信息:
2.5.2 獲取 DolphinDB 任務(wù)運(yùn)行過程中的信息
在任務(wù)執(zhí)行過程中,在 DolphinScheduler 日志中并不能顯示 DolphinDB 腳本中通過print
函數(shù)輸出的信息,但是在實(shí)踐中大多數(shù)情況下存在保存任務(wù)運(yùn)行信息以查看任務(wù)具體執(zhí)行情況的需求。以下內(nèi)容首先介紹DolphinDB SQL任務(wù)節(jié)點(diǎn)查詢類型和非查詢類型的特點(diǎn),然后講述如何在DolphinScheduler 任務(wù)節(jié)點(diǎn)的日志信息中顯示DolphinDB任務(wù)的運(yùn)行信息。
SQL 任務(wù)節(jié)點(diǎn)非查詢類型
非查詢類型主要用于無結(jié)果返回的函數(shù),它的特點(diǎn)主要有:
- 可以以分段執(zhí)行符號(hào)為界,執(zhí)行多段代碼。
- 在任務(wù)節(jié)點(diǎn)的日志只能在報(bào)錯(cuò)時(shí)查看報(bào)錯(cuò)信息,在任務(wù)執(zhí)行成功時(shí)并沒有詳細(xì)的運(yùn)行信息。
SQL 任務(wù)節(jié)點(diǎn)查詢類型
查詢類型主要用于有結(jié)果集返回的函數(shù),它的特點(diǎn)主要有:
- 在任務(wù)節(jié)點(diǎn)的日志不僅能在報(bào)錯(cuò)時(shí)查看報(bào)錯(cuò)信息,而且當(dāng)任務(wù)執(zhí)行成功時(shí),能夠在節(jié)點(diǎn)日志中查看在DolphinDB腳本中設(shè)定的運(yùn)行信息。
- 只能執(zhí)行一行語句,不能執(zhí)行多段代碼。
如何獲取 DolphinDB 任務(wù)運(yùn)行過程中的信息
下面,以股票委托數(shù)據(jù)導(dǎo)入任務(wù)為例,介紹如何在DolphinDB腳本中在不同運(yùn)行階段設(shè)定運(yùn)行日志信息,以及在DolphinScheduler上當(dāng)任務(wù)執(zhí)行完畢之后在日志中顯示完整的運(yùn)行信息。
- 在股票委托數(shù)據(jù)導(dǎo)入模塊的函數(shù)中引入一個(gè)參數(shù),該參數(shù)為內(nèi)存表_infoTb_,當(dāng)任務(wù)執(zhí)行過程中,將需要記錄的運(yùn)行信息寫入該表。
// 模塊 stockData::stockDataLoad 的 loadEntrust 函數(shù)定義如下:
module stockData::stockDataLoad
use stockData::stockDataProcess
def loadEntrust(userName, userPassword, startDate, endDate, dbName, tbName, filePath, loadType,mutable infoTb)
{
for(loadDate in startDate..endDate)
{
// 刪除已有數(shù)據(jù)
dateString = temporalFormat(loadDate,"yyyyMMdd")
dataCount = exec count(*) from loadTable(dbName, tbName) where date(tradeTime)=loadDate
// 如果表里面已經(jīng)存在當(dāng)天要處理的數(shù)據(jù),刪除庫里面已有數(shù)據(jù)
if(dataCount != 0){
msg = "Start to delete the entrust data, the delete date is: " + dateString
print(msg)
// 將運(yùn)行信息添加到表中
infoTb.tableInsert(msg)
dropPartition(database(dbName), loadDate, tbName)
msg = "Successfully deleted the entrust data, the delete date is: " + dateString
print(msg)
infoTb.tableInsert(msg)
}
// 數(shù)據(jù)導(dǎo)入
// 判斷數(shù)據(jù)csv文件是否存在
fileName = filePath + "/" + dateString + "/" + "entrust.csv"
if(!exists(fileName))
{
throw fileName + "不存在!請(qǐng)檢查數(shù)據(jù)源!"
}
// 如果是全市場(chǎng)數(shù)據(jù),數(shù)據(jù)量較大,因此分批導(dǎo)入
schemaTB = schemaEntrust()
tmpData1 = loadText(filename=fileName, schema=schemaTB)
tmpData1,n1,n2 = processEntrust(loadDate,tmpData1)
pt = loadTable(dbName,tbName)
msg = "the data size in csv file is :" + n2 + ", the duplicated count is " + (n1 - n2)
print(msg)
infoTb.tableInsert(msg)
for(i in 0..23)
{
startTime = 08:00:00.000 + 1200 * 1000 * i
tmpData2 = select * from tmpData1 where time(TradeTime)>=startTime and time(TradeTime)<(startTime+ 1200 * 1000)
if(size(tmpData2) < 1)
{
continue
}
//數(shù)據(jù)入庫
pt.append!(tmpData2)
}
msg = "successfully loaded!"
print(msg)
infoTb.tableInsert(msg)
}
}
2. 在 DolphinDB 腳本中每個(gè)函數(shù)視圖中定義一個(gè)內(nèi)存表,在任務(wù)執(zhí)行過程中,產(chǎn)生一條運(yùn)行信息時(shí)便寫入該表,函數(shù)執(zhí)行完畢后返回該表 :
use stockData::stockDataLoad
def loadEntrustFV(userName="admin" , userPassword="123456", startDate = 2023.02.01, endDate = 2023.02.01, dbName = "dfs://stockData", tbName = "entrust", filePath = "/hdd/hdd8/ymchen", loadType = "daily")
{
// 定義運(yùn)行信息表
infoTb = table(1:0,["info"] ,[STRING])
if(loadType == "daily")
{
sDate = today()
eDate = today()
// 將運(yùn)行信息表作為參數(shù)傳入數(shù)據(jù)導(dǎo)入函數(shù)中,將每次需要輸出的信息寫入該表
loadEntrust(userName, userPassword, sDate, eDate, dbName, tbName, filePath, loadType,infoTb)
}
else if(loadType == "batch")
{
loadEntrust(userName, userPassword, date(startDate), date(endDate), dbName, tbName, filePath, loadType,infoTb)
}
// 返回運(yùn)行信息表
return infoTb
}
3. 在 DolphinScheduler 工作流中添加SQL查詢節(jié)點(diǎn),用于獲取運(yùn)行信息表的內(nèi)容。主要實(shí)現(xiàn)邏輯如下:a. 通過執(zhí)行股票委托數(shù)據(jù)導(dǎo)入函數(shù)視圖,獲取運(yùn)行信息表對(duì)象;b. 通過SQL查詢節(jié)點(diǎn),將這個(gè)運(yùn)行信息表對(duì)象轉(zhuǎn)化格式,以更直觀的形式顯示在任務(wù)節(jié)點(diǎn)的日志界面中。為實(shí)現(xiàn)以上步驟,可通過兩種方法,以下將詳細(xì)介紹:
? a. 直接在SQL語句中輸入以下代碼,將以上介紹的兩個(gè)步驟整合在一行代碼中:
"\n[DOLPHINDB INFO] " + concat(exec * from loadEntrustFV(startDate=${startDate},endDate=${endDate},loadType="batch"),"\n[DOLPHINDB INFO] ");
? b. 在SQL查詢類型的前置任務(wù)中用于獲取運(yùn)行信息表,在SQL語句中將該表轉(zhuǎn)化成目標(biāo)格式。
4. 整個(gè)工作流結(jié)構(gòu)圖如下所示:
5. 通過鼠標(biāo)右鍵點(diǎn)擊相應(yīng)任務(wù)節(jié)點(diǎn),選擇查看日志選項(xiàng),可以查看對(duì)應(yīng)DolphinDB任務(wù)節(jié)點(diǎn)的運(yùn)行信息。
2.6 DolphinDB 腳本開發(fā)注意事項(xiàng)
- 當(dāng)在模塊中調(diào)用插件的函數(shù)時(shí),需要提前在多個(gè)數(shù)據(jù)節(jié)點(diǎn)和控制節(jié)點(diǎn)導(dǎo)入該插件。
- 在 DolphinScheduler 上傳入的參數(shù),是以字符串類型存在的。因此,在 DolphinDB 上需要進(jìn)一步將其改成預(yù)期的數(shù)據(jù)類型,比如:
// 在DolphinScheduler上執(zhí)行的語句為:
loadSnapshotFV(startDate=${startDate},endDate=${endDate},loadType="batch");
// 由于傳入的 startDate 是字符串類型,因此在DolphinDB上定義該函數(shù)時(shí)需要先轉(zhuǎn)成 Date 類型
use stockData::stockDataLoad
def loadEntrustFV(userName="admin" , userPassword="123456", startDate = 2023.02.01, endDate = 2023.02.01, dbName = "dfs://stockData", tbName = "entrust", filePath = "/hdd/hdd/ymchen", loadType = "daily")
{
if(loadType == "batch")
{
// 使用 date(startDate) 轉(zhuǎn)成 Date 類型
loadEntrust(userName, userPassword, date(startDate), date(endDate), dbName, tbName, filePath, loadType)
}
}
// 創(chuàng)建函數(shù)視圖
addFunctionView(loadEntrustFV)
3. 當(dāng)模塊文件內(nèi)容更改時(shí),需要按照以下步驟更新函數(shù)視圖
- 使用
clearCachedModules
函數(shù)或者重新連接 session 清除之前緩存的模塊。 - 使用
dropFunctionView
函數(shù)刪除指定的函數(shù)視圖。 - 使用
use
語句重新導(dǎo)入更改后的模塊。 - 使用
addFunctionView
函數(shù)添加新的視圖。
3. DolphinScheduler 與 Airflow 對(duì)比
Airflow也是一款具有不錯(cuò)性能的調(diào)度軟件,關(guān)于它與DolphinDB相結(jié)合的教程可參照:DolphinDB與Airflow最佳實(shí)踐。以下是DolphinScheduler與Airflow在一些方面的對(duì)比:
功能 | Airflow | DolphinScheduler |
---|---|---|
調(diào)度模塊 | 自實(shí)現(xiàn) | Quartz任務(wù)調(diào)度庫 |
Job類型 | Python、Bash、HTTP、Mysql等,支持Operator的自定義擴(kuò)展 | 支持傳統(tǒng)的Shell任務(wù),同時(shí)支持大數(shù)據(jù)平臺(tái)任務(wù)調(diào)度:MR、Spark、SQL、Python等 |
Executor觸發(fā) | Restful | Restful |
工作流 | dag → tasks | project → flows → tasks |
部署運(yùn)維 | 較復(fù)雜,包括WebServer、Scheduler、Worker | 簡單 |
單點(diǎn)故障 | Scheduler存在單點(diǎn)故障風(fēng)險(xiǎn) | 去中心化的多Master和多Worker |
高可用額外要求 | Celery、Dask、Mesos + Load Balancer + DB | 不需要(本來就支持) |
過載處理 | 任務(wù)太多時(shí)會(huì)卡死服務(wù)器 | 任務(wù)隊(duì)列機(jī)制,單個(gè)機(jī)器上可調(diào)度的任務(wù)數(shù)量可以靈活配置,當(dāng)任務(wù)過多時(shí)會(huì)緩存在任務(wù)隊(duì)列中,不會(huì)造成機(jī)器卡死 |
DAG監(jiān)控界面 | 不能直觀區(qū)分任務(wù)類型 | 任務(wù)狀態(tài)、任務(wù)類型、重試次數(shù)、任務(wù)運(yùn)行機(jī)器、可視化變量等關(guān)鍵信息一目了然 |
可視化流程定義 | 否。通過python代碼來繪制DAG,使用不便,對(duì)于缺乏編碼基礎(chǔ)的業(yè)務(wù)人員存在較高使用門檻 | 是。所有流程定義都是可視化的,通過拖拽任務(wù)來繪制DAG,配置數(shù)據(jù)源及資源,同時(shí)對(duì)于第三方系統(tǒng),提供api方式的操作。 |
快速部署 | 集群化部署復(fù)雜 | 一鍵部署 |
是否能暫停和恢復(fù) | 否。只能先將工作流殺死再重新運(yùn)行 | 支持暫停、恢復(fù)操作,支持停止操作 |
是否支持集群擴(kuò)展 | 是,但是僅支持復(fù)雜Executor的水平擴(kuò)展 | 是,調(diào)度器使用分布式調(diào)度,整體的調(diào)度能力會(huì)隨集群的規(guī)模線性增長,Master和Worker支持動(dòng)態(tài)上下線 |
開發(fā)語言 | Python | Java |
4.常見問題
1. DolphinScheduler status 顯示 running,web 的登陸頁面卻無法登錄
查看standalone-server/logs/
目錄下對(duì)應(yīng)日期的日志中具體情況
2. DolphinScheduler 設(shè)置開機(jī)自啟動(dòng)后,服務(wù)器重啟無法正常啟動(dòng)
正常情況下,由于dolphinScheduler的standalone模式源碼原因,需要在服務(wù)器重啟前到scheduler部署目錄執(zhí)行停止腳本bash ./bin/dolphinscheduler-daemon.sh stop standalone-server
; 如果無法正常啟動(dòng),請(qǐng)通過ps aux | grep dolphinscheduler
找到dolphinScheduler的所有進(jìn)程,并通過kill -15 進(jìn)程ID
結(jié)束這些進(jìn)程后,再到scheduler部署目錄執(zhí)行啟動(dòng)腳本bash ./bin/dolphinscheduler-daemon.sh start standalone-server
文章來源:http://www.zghlxwxcb.cn/news/detail-776890.html
5. 附錄
- ETL腳本模塊:ETLCase.zip
- 示例數(shù)據(jù):20230201.zip
- MySQL插件:mysql-connector-j-8.0.31.jar
本文由 白鯨開源 提供發(fā)布支持!文章來源地址http://www.zghlxwxcb.cn/news/detail-776890.html
到了這里,關(guān)于海豚2來了丨DolphinDB 集成 DolphinScheduler,任務(wù)調(diào)度更輕松的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!