国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

海豚2來了丨DolphinDB 集成 DolphinScheduler,任務(wù)調(diào)度更輕松

這篇具有很好參考價(jià)值的文章主要介紹了海豚2來了丨DolphinDB 集成 DolphinScheduler,任務(wù)調(diào)度更輕松。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

DolphinDB 是一款高性能時(shí)序數(shù)據(jù)庫。DolphinDB 集成了功能強(qiáng)大的編程語言和高容量高速度的批流一體數(shù)據(jù)分析系統(tǒng),為海量數(shù)據(jù)(特別是時(shí)間序列數(shù)據(jù))的快速存儲(chǔ)、檢索、計(jì)算及分析提供一站式解決方案。在實(shí)際生產(chǎn)環(huán)境中,經(jīng)常存在數(shù)據(jù)導(dǎo)入、轉(zhuǎn)換、查詢計(jì)算,更新等一系列流程任務(wù),各個(gè)部分之間存在依賴,如何將這些 DolphinDB 任務(wù)按照需求準(zhǔn)確、有效率地調(diào)度,可以借用 DolphinScheduler 任務(wù)調(diào)度器。

本文將從生產(chǎn)環(huán)境中的一個(gè) ETL 場(chǎng)景出發(fā),將 DolphinScheduler 引入到 DolphinDB 的高可用集群中,通過使用 DolphinScheduler 提供的功能來調(diào)度 DolphinDB 的數(shù)據(jù) ETL 作業(yè)。

1. Apache DolphinScheduler

Apache DolphinScheduler 是一個(gè)分布式易擴(kuò)展的可視化 DAG 工作流任務(wù)調(diào)度開源系統(tǒng)。該系統(tǒng)適用于企業(yè)級(jí)場(chǎng)景,提供了一個(gè)支持可視化操作任務(wù)、工作流和全生命周期數(shù)據(jù)處理的解決方案,解決了數(shù)據(jù)研發(fā) ETL 依賴錯(cuò)綜復(fù)雜,無法監(jiān)控任務(wù)健康狀態(tài)的問題。 DolphinScheduler 以 DAG(Directed Acyclic Graph,DAG)流式方式組裝任務(wù),可以及時(shí)監(jiān)控任務(wù)的執(zhí)行狀態(tài),支持重試、指定節(jié)點(diǎn)恢復(fù)失敗、暫停、恢復(fù)、終止任務(wù)等操作。

1.1 特性

  • 執(zhí)行定時(shí)任務(wù):在生產(chǎn)環(huán)境中,一個(gè)普遍的需求是周期性地從數(shù)據(jù)源中提取、轉(zhuǎn)換、加載數(shù)據(jù)到 DolphinDB,如每一天、每個(gè)小時(shí)等,DolphinScheduler 可以方便地進(jìn)行定時(shí)管理,滿足需求。
  • 執(zhí)行歷史任務(wù):有時(shí)候我們由于業(yè)務(wù)變動(dòng),需要將歷史數(shù)據(jù)進(jìn)行重新計(jì)算和加載入 DolphinDB。在這種情況下,我們僅需在 DolphinScheduler Web 界面上定義相應(yīng)任務(wù)的工作流,并定義和傳入開始時(shí)間和結(jié)束時(shí)間的參數(shù)。通過這個(gè)步驟,我們可以處理任意時(shí)間段的數(shù)據(jù)。
  • 并行執(zhí)行任務(wù):在業(yè)務(wù)中,我們可能需要同時(shí)處理多項(xiàng)數(shù)據(jù)ETL任務(wù),如同時(shí)導(dǎo)入數(shù)據(jù)到 DolphinDB 的不同表或同一個(gè)表的不同分區(qū)。DolphinScheduler 允許我們并行執(zhí)行多項(xiàng)工作流任務(wù),提高執(zhí)行效率。
  • 高效處理編排:在生產(chǎn)環(huán)境中,大多數(shù)情況下存在任務(wù)之間、工作流之間有條件地執(zhí)行,比如在 DolphinDB 的數(shù)據(jù)ETL中,下游任務(wù)依賴于上游任務(wù)的執(zhí)行狀態(tài)做不同的操作,有的任務(wù)之間關(guān)系錯(cuò)綜復(fù)雜??梢栽?DolphinScheduler 中按照邏輯對(duì)工作流進(jìn)行定義,輕松編排任務(wù)之間的關(guān)系。

1.2 安裝部署

DolphinScheduler 可在單機(jī)、單服務(wù)器集群、多服務(wù)器集群、K8S環(huán)境下部署,本節(jié)內(nèi)容將以單機(jī)部署流程作為演示內(nèi)容,僅供參考。

前置條件

  • JDK:安裝JDK(1.8+)并配置JAVA_HOME環(huán)境變量,DolphinScheduler的啟動(dòng)依賴于該環(huán)境變量,同時(shí)將其下的bin目錄追加到PATH環(huán)境變量中。
  • 二進(jìn)制包:已配置DolphinDB數(shù)據(jù)源的DolphinScheduler版本,下載鏈接在https://cdn.dolphindb.cn/downloads/apache-dolphinscheduler-3.1.7-bin.tar.gz
  • 本教程將MySQL作為 DolphinScheduler 持久化的元數(shù)據(jù)庫,因此要保證服務(wù)器已安裝好MySQL,若沒有,需要下載安裝,所有平臺(tái)的mysql下載地址為:MySQL 下載地址

元數(shù)據(jù)持久化配置

單機(jī)服務(wù)使用H2數(shù)據(jù)庫來存儲(chǔ)元數(shù)據(jù),而H2數(shù)據(jù)庫是一種內(nèi)存級(jí)別的數(shù)據(jù)庫,因此當(dāng)DolphinScheduler程序重啟時(shí),會(huì)導(dǎo)致之前定義的工作流等內(nèi)容全部丟失,需要重新定義,造成效率低下和不必要的麻煩。因此,將元數(shù)據(jù)持久化是非常有必要的,DolphinScheduler支持MySQLPostgreSQL作為元數(shù)據(jù)的存儲(chǔ)數(shù)據(jù)庫,本文以配置MySQL為例,主要有以下流程:

  • 解壓DolphinScheduler程序包
tar -xvzf apache-dolphinscheduler-3.1.7-bin.tar.gz
cd apache-dolphinscheduler-3.1.7-bin
  • 進(jìn)入MySQL,創(chuàng)建數(shù)據(jù)庫和用戶
// 創(chuàng)建數(shù)據(jù)庫
CREATE DATABASE dolphinscheduler DEFAULT CHARACTER SET utf8 DEFAULT COLLATE utf8_general_ci;
// 創(chuàng)建用戶,并設(shè)置密碼
CREATE USER 'dolphinscheduler'@'%' IDENTIFIED BY '密碼';
// 給用戶賦予庫的權(quán)限
GRANT ALL PRIVILEGES ON dolphinscheduler.* TO 'dolphinscheduler'@'%';
flush privileges;
  • 修改apache-dolphinscheduler-3.1.7-bin/bin/env/dolphinscheduler_env.sh文件設(shè)定環(huán)境變量,將{user}{password}改為上一步創(chuàng)建的用戶名和密碼
export DATABASE=mysql
export SPRING_PROFILES_ACTIVE=${DATABASE}
export SPRING_DATASOURCE_URL="jdbc:mysql://127.0.0.1:3306/dolphinscheduler?useUnicode=true&characterEncoding=UTF-8&useSSL=false"
export SPRING_DATASOURCE_USERNAME={user}
export SPRING_DATASOURCE_PASSWORD={password}
  • 修改apache-dolphinscheduler-3.1.7-bin/standalone-server/conf/application.yaml文件中的配置(在文件尾部),上半部分由于這里用的不是postgresql,直接注釋掉就好。將{user}{password}改為上面創(chuàng)建的用戶名和密碼
---
#spring:
#  config:
#    activate:
#      on-profile: postgresql
#  quartz:
#    properties:
#      org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.PostgreSQLDelegate
#  datasource:
#    driver-class-name: org.postgresql.Driver
#    url: jdbc:postgresql://127.0.0.1:5432/dolphinscheduler
#    username: root
#    password: root

---
spring:
  config:
    activate:
      on-profile: mysql
  sql:
     init:
       schema-locations: classpath:sql/dolphinscheduler_mysql.sql
  datasource:
    driver-class-name: com.mysql.cj.jdbc.Driver
    url: jdbc:mysql://127.0.0.1:3306/dolphinscheduler?useUnicode=true&characterEncoding=UTF-8
    username: {user}
    password: {password}
 
  • 初始化數(shù)據(jù)庫,如果上述步驟沒有問題,這里就不會(huì)報(bào)錯(cuò):
bash apache-dolphinscheduler-3.1.7-bin/tools/bin/upgrade-schema.sh
  • 執(zhí)行完成之后進(jìn)入 MySQL 查詢會(huì)發(fā)現(xiàn)名稱為dolphinscheduler的數(shù)據(jù)庫已經(jīng)生成了很多表格

海豚2來了丨DolphinDB 集成 DolphinScheduler,任務(wù)調(diào)度更輕松

啟動(dòng)DolphinScheduler單機(jī)服務(wù)器

注意:啟動(dòng)DolphinScheduler需要依賴多個(gè)端口號(hào),分別是:12345、50052、25333、25334、1234、5678
使用lsof -i:<port>檢查以上端口號(hào)是否被占用,如果有端口被別的進(jìn)程占用,修改apache-dolphinscheduler-3.1.7-bin/standalone-server/conf/application.yaml中對(duì)應(yīng)的端口

  1. 當(dāng)配置好以上內(nèi)容之后,進(jìn)入執(zhí)行apache-dolphinscheduler-3.1.7-bin目錄并執(zhí)行以下命令啟動(dòng):
bash ./bin/dolphinscheduler-daemon.sh start standalone-server

2. 輸入以下命令查看是否執(zhí)行成功

?    a. 運(yùn)行jps查看相應(yīng)實(shí)例是否已在進(jìn)程中

海豚2來了丨DolphinDB 集成 DolphinScheduler,任務(wù)調(diào)度更輕松

?    b. 運(yùn)行bash ./bin/dolphinscheduler-daemon.sh status standalone-server查看 standalone-server的運(yùn)行狀態(tài)

海豚2來了丨DolphinDB 集成 DolphinScheduler,任務(wù)調(diào)度更輕松

3. 停止運(yùn)行

bash ./bin/dolphinscheduler-daemon.sh stop standalone-server

4. 啟動(dòng)成功后,在瀏覽器中輸入服務(wù)器IP:12345/dolphinscheduler/ui/login進(jìn)行登錄

    a. 默認(rèn)用戶名:admin

?    b. 默認(rèn)密碼:dolphinscheduler123

登陸成功后將會(huì)看到如下頁面:

海豚2來了丨DolphinDB 集成 DolphinScheduler,任務(wù)調(diào)度更輕松

  • 重啟dolphinscheduler服務(wù),測(cè)試已經(jīng)連接好數(shù)據(jù)庫

海豚2來了丨DolphinDB 集成 DolphinScheduler,任務(wù)調(diào)度更輕松

1.3 DolphinDB 與 DolphinScheduler 結(jié)合

DolphinDB 作為強(qiáng)大的高性能時(shí)序數(shù)據(jù)庫,能夠高效存儲(chǔ)和處理 PB 級(jí)的海量數(shù)據(jù)集,可以通過編寫腳本實(shí)現(xiàn)數(shù)據(jù)的處理、存儲(chǔ)、因子計(jì)算、建模、回測(cè)等任務(wù)。在實(shí)際生產(chǎn)中,源數(shù)據(jù)下載、數(shù)據(jù)處理、數(shù)據(jù)入庫、數(shù)據(jù)校驗(yàn)、指標(biāo)計(jì)算等任務(wù)之間存在先后關(guān)系和條件關(guān)系,如果在 DolphinDB 腳本中編寫相關(guān)的邏輯關(guān)系代碼,一來會(huì)造成與實(shí)際任務(wù)不相關(guān)的腳本冗余,二來如果實(shí)際業(yè)務(wù)變動(dòng),需要增加或刪減部分任務(wù),相互依賴的任務(wù)之間的關(guān)系代碼也需要變動(dòng),造成更新迭代效率低下。

考慮到這種情況,如果能夠?qū)?DolphinDB 與 DolphinScheduler 結(jié)合起來,在 DolphinDB 中編寫相關(guān)任務(wù)代碼模塊,在 DolphinScheduler 上將這些任務(wù)按照邏輯編排調(diào)度,這樣,就能夠?qū)⑷蝿?wù)代碼和任務(wù)之間邏輯關(guān)系分開,每個(gè)部分專注于發(fā)揮自己的作用,實(shí)現(xiàn)更高效地運(yùn)行維護(hù)。

1.3.1 如何創(chuàng)建 DolphinDB 數(shù)據(jù)源

  1. 在安裝部署好DolphinScheduler之后,登錄其Web界面,點(diǎn)擊數(shù)據(jù)源中心并點(diǎn)擊創(chuàng)建數(shù)據(jù)源

海豚2來了丨DolphinDB 集成 DolphinScheduler,任務(wù)調(diào)度更輕松

2. 輸入相關(guān)參數(shù)定義,創(chuàng)建DolphinDB數(shù)據(jù)源

海豚2來了丨DolphinDB 集成 DolphinScheduler,任務(wù)調(diào)度更輕松

注意:數(shù)據(jù)庫名和jdbc連接參數(shù)不用填,不然會(huì)報(bào)錯(cuò) JDBC connect failed。

1.3.2 如何調(diào)度 DolphinDB 任務(wù)

創(chuàng)建DolphinDB數(shù)據(jù)源后,需要?jiǎng)?chuàng)建租戶、項(xiàng)目、工作流,具體流程可參考:DolphinScheduler 部署流程

定義好工作流之后,點(diǎn)擊該工作流進(jìn)入操作界面,接著在左端拖拽SQL 節(jié)點(diǎn)進(jìn)行DolphinDB任務(wù)定義

注意:SQL類型分為查詢類型和非查詢類型,這兩種類型分別適用于不同的使用場(chǎng)景,在本文 2.5.2 小節(jié)中會(huì)詳細(xì)介紹。

海豚2來了丨DolphinDB 集成 DolphinScheduler,任務(wù)調(diào)度更輕松

海豚2來了丨DolphinDB 集成 DolphinScheduler,任務(wù)調(diào)度更輕松

由于在SQL任務(wù)節(jié)點(diǎn)中,每次只能執(zhí)行一行DolphinDB代碼。因此,調(diào)度DolphinDB任務(wù)主要有以下兩種途徑:

  1. run 函數(shù)

假設(shè)DolphinDB腳本在服務(wù)器上的路徑是:/data/script.dos,那么,在SQL語句上可以輸入:

run("/data/script.dos");

2. 函數(shù)視圖

?    a. 在DolphinDB編寫函數(shù):

// 在DolphinDB中定義一個(gè)函數(shù),用于創(chuàng)建數(shù)據(jù)庫表
def createTable(dbName, tbName){
	login("admin", "123456")

	if(!existsDatabase(dbName)){
		db1 = database(, VALUE, 2020.01.01..2021.01.01)
		db2 = database(, HASH, [SYMBOL, 10])
		db = database(dbName, COMPO, [db1, db2], , "TSDB")
	}else{
		db = database(dbName)
	}
    if(!existsTable(dbName,tbName)){
        name =`SecurityID`ChannelNo`ApplSeqNum`MDStreamID`SecurityIDSource`Price
              `OrderQty`Side`TradeTIme`OrderType`OrderIndex`LocalTime`SeqNo
              `Market`DataStatus`BizIndex
        type = [SYMBOL, INT, LONG, INT, INT, DOUBLE, INT, 
                SYMBOL, TIMESTAMP, SYMBOL, INT, TIME, LONG, SYMBOL,INT,LONG]
        schemaTable = table(1:0, name, type)
        db.createPartitionedTable(table=schemaTable, tableName=tbName, 
                partitionColumns=`TradeTime`SecurityID, 
                compressMethods={TradeTime:"delta"}, 
                sortColumns=`Market`SecurityID`TradeTime, keepDuplicates=ALL)
    }
    else{
        writeRunLog("數(shù)據(jù)庫:" + dbName + " 數(shù)據(jù)表:" + tbName + " 已存在...")
    }
}

?    b. 將函數(shù)添加為函數(shù)視圖,作用是使得該函數(shù)視圖全局可用

// 添加為函數(shù)視圖
addFunctionView(createTable)

?    c. 在dolphinScheduler上調(diào)用該函數(shù)視圖

// 在SQL語句內(nèi)輸入以下內(nèi)容:
createTable("dfs://testDb", "testTb");

上述兩種方法的區(qū)別是:

  • 使用run執(zhí)行腳本不能傳遞參數(shù),靈活性較差
  • 使用函數(shù)視圖可以在DolphinScheduler上面傳入?yún)?shù),以上面為例,可以在SQL語句中輸入:
createTable(${dbName}, ${tbName});

傳入?yún)?shù)的方法有局部參數(shù)和全局參數(shù)兩種方法,它們的區(qū)別是全局參數(shù)可以在創(chuàng)建工作流實(shí)例時(shí)直接進(jìn)行修改,而局部參數(shù)需要進(jìn)入到工作流中的具體任務(wù)節(jié)點(diǎn)進(jìn)行修改:

  1. 局部參數(shù)在定義任務(wù)節(jié)點(diǎn)時(shí)定義

海豚2來了丨DolphinDB 集成 DolphinScheduler,任務(wù)調(diào)度更輕松

2. 全局參數(shù)在保存工作流時(shí)定義

海豚2來了丨DolphinDB 集成 DolphinScheduler,任務(wù)調(diào)度更輕松

2. 調(diào)度 DolphinDB 數(shù)據(jù) ETL 任務(wù)

2.1 任務(wù)流程結(jié)構(gòu)

  • 文件結(jié)構(gòu)

海豚2來了丨DolphinDB 集成 DolphinScheduler,任務(wù)調(diào)度更輕松

2.1.1 DolphinDB 功能模塊部分

  • stockData: 數(shù)據(jù)導(dǎo)入模塊

    • createStockTable.dos: 定義創(chuàng)建相應(yīng)的股票數(shù)據(jù)庫表的函數(shù)。
    • stockDataProcess.dos: 定義將源數(shù)據(jù)進(jìn)行清洗、處理與格式轉(zhuǎn)換的函數(shù)。
    • stockDataLoad.dos: 定義將數(shù)據(jù)導(dǎo)入相應(yīng)的DolphinDB庫表的函數(shù)。
  • minFactor: 分鐘線因子指標(biāo)相關(guān)模塊

    • createMinFactorTable.dos: 定義創(chuàng)建分鐘線因子指標(biāo)庫表的函數(shù)。
    • computeMinFactor.dos: 定義計(jì)算分鐘線因子指標(biāo)的函數(shù)。
  • dataCheck: 數(shù)據(jù)校驗(yàn)?zāi)K

    • stockCheck.dos: 定義校驗(yàn)導(dǎo)入的股票逐筆委托、快照行情、逐筆成交數(shù)據(jù)的函數(shù)。
    • minFactorCheck.dos: 定義校驗(yàn)分鐘線因子指標(biāo)數(shù)據(jù)的函數(shù)。

2.1.2 DolphinDB 腳本部分

  • initTable.dos: 執(zhí)行該腳本可以創(chuàng)建相應(yīng)的股票數(shù)據(jù)庫表和分鐘因子指標(biāo)庫表。
  • createFuncView.dos: 執(zhí)行該腳本可以定義需要在dolphinScheduler上用到的函數(shù)視圖。

2.2 數(shù)據(jù)介紹

本文選取了 20230201 上交所某股票 level 2 委托數(shù)據(jù)、快照數(shù)據(jù)、成交數(shù)據(jù)作為演示。以下是逐筆委托表在DolphinDB的結(jié)構(gòu)。快照數(shù)據(jù)和成交數(shù)據(jù)結(jié)構(gòu)可在附件中查看:

字段名 字段含義 數(shù)據(jù)類型(DolphinDB)
ChannelNo 通道代碼 INT
ApplSeqNum 消息記錄號(hào) LONG
MDStreamID 行情類別 INT
SecurityID 證券代碼 SYMBOL
SecurityIDSource 證券代碼源 INT
Price 委托價(jià)格 DOUBLE
OrderQty 委托數(shù)量 INT
Side 委托買賣方向 SYMBOL
TradeTime 委托時(shí)間 TIMESTAMP
OrderType 委托類型 SYMBOL
OrderIndex 委托序號(hào) INT
LocalTime 本地接受時(shí)間戳 TIME
SeqNo 消息序列號(hào) LONG
Market 交易市場(chǎng) SYMBOL
DataStatus 數(shù)據(jù)狀態(tài) INT
BizIndex 業(yè)務(wù)序列號(hào) LONG

2.3 數(shù)據(jù)導(dǎo)入、指標(biāo)計(jì)算與校驗(yàn)任務(wù)

注意:以下各部分內(nèi)容均將相關(guān)函數(shù)定義在模塊中,以方便進(jìn)行工程化管理,關(guān)于DolphinDB模塊的創(chuàng)建、加載、調(diào)用方法,請(qǐng)參照:DolphinDB 模塊復(fù)用教程

2.3.1 數(shù)據(jù)清洗、處理、入表

由于源數(shù)據(jù)的字段結(jié)構(gòu)有時(shí)候不符合我們的業(yè)務(wù)需求,因此需要增刪、處理一些字段后再導(dǎo)入數(shù)據(jù)庫,下面以逐筆委托數(shù)據(jù)為例,介紹源數(shù)據(jù)經(jīng)清洗、處理后再導(dǎo)入庫表的過程,快照和成交數(shù)據(jù)的處理邏輯與委托數(shù)據(jù)相同,詳細(xì)內(nèi)容可以在附件中查看。

  • 創(chuàng)建數(shù)據(jù)庫表

逐筆委托、快照、逐筆成交數(shù)據(jù)都保存在同一個(gè)庫中,本文采用了組合分區(qū)作為分區(qū)方案,第一層按天分區(qū),第二層對(duì)股票代碼分25個(gè)哈希分區(qū)。如何確定數(shù)據(jù)分區(qū)請(qǐng)參照:DolphinDB數(shù)據(jù)庫分區(qū)教程

module createStockTable

// 創(chuàng)建逐筆委托數(shù)據(jù)存儲(chǔ)庫表
def createEntrust(dbName, tbName,userName = "admin",password = "123456")
{
    login(userName, password)
    if(!existsDatabase(dbName))
    {
        db1 = database(, VALUE, 2020.01.01..2021.01.01)
        db2 = database(, HASH, [SYMBOL, 25])
        // 按天和股票組合分區(qū)
        db = database(dbName, COMPO, [db1, db2], , "TSDB")
    }
    else
    {
        db = database(dbName)
    }
    name=`ChannelNo`ApplSeqNum`MDStreamID`SecurityID`SecurityIDSource`Price`OrderQty`Side`TradeTIme`OrderType`OrderIndex`LocalTime`SeqNo`Market`DataStatus`BizIndex
    type = [INT, LONG, INT, SYMBOL, INT, DOUBLE, INT, SYMBOL, TIMESTAMP, SYMBOL, INT, TIME, LONG, SYMBOL,INT,LONG]
    schemaTable = table(1:0, name, type)
    // 創(chuàng)建分區(qū)表
    db.createPartitionedTable(table=schemaTable, tableName=tbName, partitionColumns=`TradeTime`SecurityID, compressMethods={TradeTime:"delta"}, sortColumns=`Market`SecurityID`TradeTime, keepDuplicates=ALL)
}
  • csv數(shù)據(jù)清洗與處理
module stockData::stockDataProcess

// 定義逐筆委托csv數(shù)據(jù)文件中各個(gè)字段的名稱和字段類型
def schemaEntrust()
{
    name = `DataStatus`OrderIndex`ChannelNo`SecurityID`TradeTime`OrderType`ApplSeqNum`Price`OrderQty`Side`BizIndex`LocalTime`SeqNo
    typeString = `INT`LONG`INT`SYMBOL`TIME`SYMBOL`INT`DOUBLE`INT`SYMBOL`INT`TIME`INT
    return table(name, typeString)
}

// 數(shù)據(jù)處理函數(shù),包括字段增加,數(shù)據(jù)去重等操作
def processEntrust(loadDate, mutable t)
{
    // 字段名替換
    t.replaceColumn!(`TradeTime, concatDateTime(day, t.TradeTime))
    n1 = t.size()
    // 數(shù)據(jù)去重
    t = select * from t where isDuplicated([DataStatus, OrderIndex, ChannelNo, SecurityID, TradeTime, OrderType, ApplSeqNum, Price, OrderQty, Side, BizIndex],FIRST)=false
    n2 = t.size()
    // 增加字段
    update t set Market = `sh
    update t set MDStreamID = int(NULL)
    update t set SecurityIDSource = int(NULL)
    reorderColumns!(t, `ChannelNo`ApplSeqNum`MDStreamID`SecurityID`SecurityIDSource`Price`OrderQty`Side`TradeTime`OrderType`OrderIndex`LocalTime`SeqNo`Market`DataStatus`BizIndex)
    return t,n1,n2
}
  • 將處理后的數(shù)據(jù)導(dǎo)入數(shù)據(jù)庫表
module stockData::stockDataLoad
use stockData::stockDataProcess

def loadEntrust(userName, userPassword, startDate, endDate, dbName, tbName, filePath, loadType,mutable infoTb)
{
	for(loadDate in startDate..endDate)
	{
		// 刪除已有數(shù)據(jù)
		dateString = temporalFormat(loadDate,"yyyyMMdd")
		dataCount = exec count(*) from loadTable(dbName, tbName) where date(tradeTime)=loadDate
		// 如果表里面已經(jīng)存在當(dāng)天要處理的數(shù)據(jù),刪除庫里面已有數(shù)據(jù)
		if(dataCount != 0){
			msg = "Start to delete the entrust data, the delete date is: " + dateString
			print(msg)
			infoTb.tableInsert(msg)

			dropPartition(database(dbName), loadDate, tbName)
			msg = "Successfully deleted the entrust data, the delete date is: " + dateString
			print(msg)
			infoTb.tableInsert(msg)
		}
		// 數(shù)據(jù)導(dǎo)入
		// 判斷數(shù)據(jù)csv文件是否存在
		fileName = filePath + "/" + dateString + "/" + "entrust.csv"
		if(!exists(fileName))
		{
			throw fileName + "不存在!請(qǐng)檢查數(shù)據(jù)源!"
		}
		// 如果是全市場(chǎng)數(shù)據(jù),數(shù)據(jù)量較大,因此分批導(dǎo)入
		schemaTB = schemaEntrust()
		tmpData1 = loadText(filename=fileName, schema=schemaTB)
		tmpData1,n1,n2 = processEntrust(loadDate,tmpData1)
		pt = loadTable(dbName,tbName)
		msg = "the data size in csv file is :" + n2 + ", the duplicated count is " + (n1 - n2)
		print(msg)
		infoTb.tableInsert(msg)
		for(i in 0..23)
		{
			startTime = 08:00:00.000 + 1200 * 1000 * i
			tmpData2 = select * from tmpData1 where time(TradeTime)>=startTime and time(TradeTime)<(startTime+ 1200 * 1000)
			if(size(tmpData2) < 1)
			{
				continue
			}
			//數(shù)據(jù)入庫
			pt.append!(tmpData2)
		}
		msg = "successfully loaded!"
		print(msg)
		infoTb.tableInsert(msg)
	}
}

2.3.2 K 分鐘線因子指標(biāo)計(jì)算

當(dāng)導(dǎo)入數(shù)據(jù)之后,我們希望根據(jù)業(yè)務(wù)、策略對(duì)數(shù)據(jù)進(jìn)行進(jìn)一步加工,形成分鐘線級(jí)別的因子指標(biāo),讓數(shù)據(jù)產(chǎn)生價(jià)值,驅(qū)動(dòng)業(yè)務(wù)發(fā)展。下面,我們以計(jì)算成交數(shù)據(jù)K分鐘線因子指標(biāo)為例,介紹該任務(wù)流程:

  • 創(chuàng)建K分鐘線結(jié)果存儲(chǔ)表
module minFactor::createMinFactorTable

def createMinuteFactor(dbName, tbName)
{
	if(existsDatabase(dbName)){
		dropDatabase(dbName)
	}
	//按天分區(qū)
	db = database(dbName, VALUE, 2021.01.01..2021.01.03,engine = `TSDB)
	colName = `TradeDate`TradeTime`SecurityID`Open`High`Low`Close`Volume`Amount`Vwap
	colType =[DATE, MINUTE, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, LONG, DOUBLE, DOUBLE]
	tbSchema = table(1:0, colName, colType)
  	db.createPartitionedTable(table=tbSchema,tableName=tbName,partitionColumns=`TradeDate,sortColumns=`SecurityID`TradeTime,keepDuplicates=ALL)
}
  • 計(jì)算K分鐘線因子指標(biāo)并入庫
module minFactor::computeMinFactor

def calFactorOneMinute(dbName, startDate, endDate, mutable factorTb,mutable infoTb)
{
	pt = loadTable(dbName, "trade")
	dayList = startDate..endDate
	if(dayList.size()>12) dayList = dayList.cut(12)
	for(days in dayList){
		//計(jì)算分鐘 K 線
		res = select first(TradePrice) as open, max(TradePrice) as high, min(TradePrice) as low, last(TradePrice) as close, sum(tradeQty) as volume,sum(TradePrice*TradeQty) as amount,sum(TradePrice*TradeQty)\sum(TradeQty) as vwap from pt where date(tradeTime) in days group by date(tradeTime) as TradeDate,minute(tradeTime) as TradeTime, SecurityID
		msg = "Start to append minute factor result , the days is: [" + concat(days, ",")+"]"
		print(msg)
		infoTb.tableInsert(msg)
		//分鐘 K 線入庫
		factorTb.append!(res)
		msg = "Successfully append the minute factor result to databse, the days is: [" + concat(days, ",")+"]"
		print(msg)
		infoTb.tableInsert(msg)
	}
}

2.3.3 數(shù)據(jù)校驗(yàn)與 K 分鐘線指標(biāo)校驗(yàn)

由于K分鐘線因子指標(biāo)計(jì)算依賴于上游導(dǎo)入數(shù)據(jù)的正確性,而業(yè)務(wù)中又依賴于K分鐘線指標(biāo)數(shù)據(jù)的正確性,因此,對(duì)這兩部分?jǐn)?shù)據(jù)進(jìn)行校驗(yàn)是有必要的,下面介紹部分校驗(yàn)步驟,詳細(xì)校驗(yàn)內(nèi)容請(qǐng)參照附件

  • 股票數(shù)據(jù)校驗(yàn)
module dataCheck::stockCheck 

def checkStockCounts(idate,dbName)
{
	// 校驗(yàn)逐筆委托、快照行情、逐筆成交表的股票個(gè)數(shù)是否一致

	getCodes = def (dbName,tbName,idate) {
		tb = loadTable(dbName,tbName)
		return exec distinct(SecurityID) from tb where date(tradetime)=idate and ((Market=`sh and SecurityID like "6%")or(Market=`sz and (SecurityID like "0%" or SecurityID like "3%" ) )) 
	}
	entrustCodes = getCodes(dbName,"entrust",idate)
	tradeCodes = getCodes(dbName,"trade",idate)
    snapshotCodes = exec distinct(SecurityID) from loadTable(dbName,"snapshot") where date(tradetime)=idate and ((Market=`sh and SecurityID like "6%")or(Market=`sz and (SecurityID like "0%" or SecurityID like "3%" ))) and  HighPrice != 0
	if(entrustCodes.size() != snapshotCodes.size() or entrustCodes.size() != tradeCodes.size() or snapshotCodes.size() != tradeCodes.size())
	{
		throw "逐筆委托股票數(shù)量:" + size(entrustCodes) + " 快照行情股票數(shù)量:" + size(snapshotCodes) + " 逐筆成交股票數(shù)量:" + size(tradeCodes) + ", 它們數(shù)量不一致!"
	}
}
  • K分鐘線指標(biāo)校驗(yàn)
module dataCheck::minFactorCheck

def checkHighLowPrice(idate,dbName,tbName)
{
	// 分鐘線最高價(jià)指標(biāo)與最低價(jià)指標(biāo)校驗(yàn)
	tb= loadTable(dbName,tbName)
	temp=select * from tb where tradedate=idate and High < Low 
	if(size(temp)>0)
	{
		throw "分鐘線計(jì)算錯(cuò)誤!分鐘線最高價(jià)小于最低價(jià)!"
	}
}

def checkVolumeAmount(idate,dbName,tbName)
{
	// 分鐘線交易量與交易額指標(biāo)校驗(yàn)
	tb = loadTable(dbName,tbName)
	temp = select * from loadTable(dbName,tbName) where tradedate=idate and ((Volume == 0 and Amount != 0) or (Volume != 0 and Amount == 0))
	if(size(temp)>0)
	{
		throw "分鐘線計(jì)算錯(cuò)誤!交易量和交易額不同時(shí)為0!"
	}
}

2.4 實(shí)現(xiàn) DolphinDB 任務(wù)調(diào)度

在定義好數(shù)據(jù)ETL模塊之后,就可以通過以下步驟實(shí)現(xiàn)DolphinDB ETL任務(wù)的調(diào)度:

  • 創(chuàng)建相關(guān)庫表

第一次執(zhí)行時(shí),需要?jiǎng)?chuàng)建相關(guān)數(shù)據(jù)庫表

use stockData::createStockTable
use minFactor::createMinFactorTable

// 創(chuàng)建數(shù)據(jù)庫表,庫表名可以根據(jù)實(shí)際需要修改
createEntrust("dfs://stockData", "entrust")
createSnapshot("dfs://stockData", "snapshot")
createTrade("dfs://stockData", "trade")
createMinuteFactor("dfs://factorData", "stockMinFactor")
  • 定義函數(shù)視圖并執(zhí)行

由于每日定時(shí)ETL任務(wù)與歷史批量ETL任務(wù)的整體處理邏輯相同,因此,在定義函數(shù)視圖時(shí)通過傳入?yún)?shù)來區(qū)分不同類型的任務(wù),以定義逐筆委托數(shù)據(jù)導(dǎo)入任務(wù)函數(shù)視圖為例,具體如下(全部函數(shù)視圖定義請(qǐng)參照附件):

use stockData::stockDataLoad
// 定義函數(shù)
def loadEntrustFV(userName="admin" , userPassword="123456", startDate = 2023.02.01, endDate = 2023.02.01, dbName = "dfs://stockData", tbName = "entrust", filePath = "/hdd/hdd8/ymchen", loadType = "daily")
{
	infoTb = table(1:0,["info"] ,[STRING])
	if(loadType == "daily")
	{
		sDate = today()
		eDate = today()
		loadEntrust(userName, userPassword, sDate, eDate, dbName, tbName, filePath, loadType,infoTb)
	}
	else if(loadType == "batch")
	{
		loadEntrust(userName, userPassword, date(startDate), date(endDate), dbName, tbName, filePath, loadType,infoTb)
	}
	return infoTb
}

注意:在定義函數(shù)視圖時(shí),一些默認(rèn)參數(shù)如數(shù)據(jù)存放路徑:filePath 需要根據(jù)實(shí)際情況進(jìn)行更改

  • 創(chuàng)建DolphinDB任務(wù)節(jié)點(diǎn)

創(chuàng)建好每個(gè)任務(wù)的函數(shù)視圖之后,每個(gè)函數(shù)視圖對(duì)應(yīng)于DolphinScheduler上的一個(gè)任務(wù)節(jié)點(diǎn),以逐筆委托數(shù)據(jù)導(dǎo)入任務(wù)為例,我們分以下兩種情況:

  1. 對(duì)于每日定時(shí)任務(wù),由于我們?cè)诙x函數(shù)視圖時(shí)已經(jīng)針對(duì)每日定時(shí)任務(wù)做了默認(rèn)參數(shù)處理,因此僅需在DolphinScheduler任務(wù)節(jié)點(diǎn)的SQL語句中輸入:
loadEntrustFV();

2. 對(duì)于歷史批量任務(wù),我們需要傳入三個(gè)參數(shù):開始時(shí)間_startDate_,結(jié)束時(shí)間_endDate_以及任務(wù)類型_loadType_,在DolphinScheduler任務(wù)節(jié)點(diǎn)的SQL語句中需要輸入:

loadEntrustFV(startDate=${startDate},endDate=${endDate},loadType="batch");

注意:需要在DolphinScheduler上面定義局部參數(shù)或全局參數(shù)_startDate_和_endDate_,如何定義請(qǐng)參照本文 1.3.2 小節(jié)

  • 創(chuàng)建DolphinDB任務(wù)工作流

我們需要在DolphinScheduler上創(chuàng)建兩個(gè)工作流,一個(gè)是定時(shí)任務(wù)工作流,一個(gè)是歷史批量任務(wù)工作流。在每個(gè)工作流中需要根據(jù)ETL流程編排具有邏輯關(guān)系的任務(wù)節(jié)點(diǎn)。以歷史批量任務(wù)為例,創(chuàng)建如下工作流:

海豚2來了丨DolphinDB 集成 DolphinScheduler,任務(wù)調(diào)度更輕松

創(chuàng)建任務(wù)工作流之后,點(diǎn)擊運(yùn)行按鈕就可以開始執(zhí)行,點(diǎn)擊定時(shí)按鈕就可以進(jìn)行定時(shí)管理

海豚2來了丨DolphinDB 集成 DolphinScheduler,任務(wù)調(diào)度更輕松

運(yùn)行任務(wù)后,工作流實(shí)例為綠色代表整個(gè)工作流運(yùn)行成功;黑色則表示存在失敗任務(wù),可以通過雙擊失敗的工作流實(shí)例查看具體是哪個(gè)任務(wù)執(zhí)行失敗。

在DolphinScheduler中,可以導(dǎo)入工作流導(dǎo)出工作流,以上介紹的DolphinDB每日任務(wù)與批量任務(wù),可以直接通過附件中對(duì)應(yīng)的json文件直接導(dǎo)入。

海豚2來了丨DolphinDB 集成 DolphinScheduler,任務(wù)調(diào)度更輕松

  • 調(diào)度DolphinDB ETL 工作流

導(dǎo)入工作流之后,點(diǎn)擊執(zhí)行按鈕就可以開始執(zhí)行

2.5 獲取 DolphinDB 任務(wù)調(diào)度結(jié)果

2.5.1 查看 DolphinDB 任務(wù)執(zhí)行情況

在DolphinScheduler上運(yùn)行DolphinDB工作流任務(wù)之后,可以通過以下步驟查看工作流任務(wù)運(yùn)行情況:

  • 進(jìn)入工作流界面,可以看到所有工作流實(shí)例的狀態(tài),在狀態(tài)欄,齒輪形狀代表正在運(yùn)行,綠色打勾代表工作流任務(wù)成功運(yùn)行,黑色打叉代表工作流任務(wù)運(yùn)行失敗。

海豚2來了丨DolphinDB 集成 DolphinScheduler,任務(wù)調(diào)度更輕松

  • 在工作流實(shí)例名稱下,點(diǎn)擊想要查看的工作流實(shí)例,進(jìn)入該工作流詳情界面:

海豚2來了丨DolphinDB 集成 DolphinScheduler,任務(wù)調(diào)度更輕松

  • 如上圖所示,我們可以看到股票委托、快照、成交數(shù)據(jù)導(dǎo)入任務(wù)成功了,但是股票數(shù)據(jù)校驗(yàn)任務(wù)失敗了,導(dǎo)致整個(gè)工作流任務(wù)執(zhí)行失敗。在該任務(wù)節(jié)點(diǎn)上點(diǎn)擊鼠標(biāo)右鍵,然后點(diǎn)擊查看日志,就可以查看該任務(wù)節(jié)點(diǎn)具體的報(bào)錯(cuò)信息:

海豚2來了丨DolphinDB 集成 DolphinScheduler,任務(wù)調(diào)度更輕松

2.5.2 獲取 DolphinDB 任務(wù)運(yùn)行過程中的信息

在任務(wù)執(zhí)行過程中,在 DolphinScheduler 日志中并不能顯示 DolphinDB 腳本中通過print函數(shù)輸出的信息,但是在實(shí)踐中大多數(shù)情況下存在保存任務(wù)運(yùn)行信息以查看任務(wù)具體執(zhí)行情況的需求。以下內(nèi)容首先介紹DolphinDB SQL任務(wù)節(jié)點(diǎn)查詢類型非查詢類型的特點(diǎn),然后講述如何在DolphinScheduler 任務(wù)節(jié)點(diǎn)的日志信息中顯示DolphinDB任務(wù)的運(yùn)行信息。

SQL 任務(wù)節(jié)點(diǎn)非查詢類型

非查詢類型主要用于無結(jié)果返回的函數(shù),它的特點(diǎn)主要有:

  1. 可以以分段執(zhí)行符號(hào)為界,執(zhí)行多段代碼。
  2. 在任務(wù)節(jié)點(diǎn)的日志只能在報(bào)錯(cuò)時(shí)查看報(bào)錯(cuò)信息,在任務(wù)執(zhí)行成功時(shí)并沒有詳細(xì)的運(yùn)行信息。

SQL 任務(wù)節(jié)點(diǎn)查詢類型

查詢類型主要用于有結(jié)果集返回的函數(shù),它的特點(diǎn)主要有:

  1. 在任務(wù)節(jié)點(diǎn)的日志不僅能在報(bào)錯(cuò)時(shí)查看報(bào)錯(cuò)信息,而且當(dāng)任務(wù)執(zhí)行成功時(shí),能夠在節(jié)點(diǎn)日志中查看在DolphinDB腳本中設(shè)定的運(yùn)行信息。
  2. 只能執(zhí)行一行語句,不能執(zhí)行多段代碼。

如何獲取 DolphinDB 任務(wù)運(yùn)行過程中的信息

下面,以股票委托數(shù)據(jù)導(dǎo)入任務(wù)為例,介紹如何在DolphinDB腳本中在不同運(yùn)行階段設(shè)定運(yùn)行日志信息,以及在DolphinScheduler上當(dāng)任務(wù)執(zhí)行完畢之后在日志中顯示完整的運(yùn)行信息。

  1. 在股票委托數(shù)據(jù)導(dǎo)入模塊的函數(shù)中引入一個(gè)參數(shù),該參數(shù)為內(nèi)存表_infoTb_,當(dāng)任務(wù)執(zhí)行過程中,將需要記錄的運(yùn)行信息寫入該表。
// 模塊 stockData::stockDataLoad 的 loadEntrust 函數(shù)定義如下:

module stockData::stockDataLoad
use stockData::stockDataProcess

def loadEntrust(userName, userPassword, startDate, endDate, dbName, tbName, filePath, loadType,mutable infoTb)
{
    for(loadDate in startDate..endDate)
    {
        // 刪除已有數(shù)據(jù)
        dateString = temporalFormat(loadDate,"yyyyMMdd")
        dataCount = exec count(*) from loadTable(dbName, tbName) where date(tradeTime)=loadDate
        // 如果表里面已經(jīng)存在當(dāng)天要處理的數(shù)據(jù),刪除庫里面已有數(shù)據(jù)
        if(dataCount != 0){
            msg = "Start to delete the entrust data, the delete date is: " + dateString
            print(msg)
            // 將運(yùn)行信息添加到表中
            infoTb.tableInsert(msg)

            dropPartition(database(dbName), loadDate, tbName)
            msg = "Successfully deleted the entrust data, the delete date is: " + dateString
            print(msg)
            infoTb.tableInsert(msg)
        }
        // 數(shù)據(jù)導(dǎo)入
        // 判斷數(shù)據(jù)csv文件是否存在
        fileName = filePath + "/" + dateString + "/" + "entrust.csv"
        if(!exists(fileName))
        {
            throw fileName + "不存在!請(qǐng)檢查數(shù)據(jù)源!"
        }
        // 如果是全市場(chǎng)數(shù)據(jù),數(shù)據(jù)量較大,因此分批導(dǎo)入
        schemaTB = schemaEntrust()
        tmpData1 = loadText(filename=fileName, schema=schemaTB)
        tmpData1,n1,n2 = processEntrust(loadDate,tmpData1)
        pt = loadTable(dbName,tbName)
        msg = "the data size in csv file is :" + n2 + ", the duplicated count is " + (n1 - n2)
        print(msg)
        infoTb.tableInsert(msg)
        for(i in 0..23)
        {
            startTime = 08:00:00.000 + 1200 * 1000 * i
            tmpData2 = select * from tmpData1 where time(TradeTime)>=startTime and time(TradeTime)<(startTime+ 1200 * 1000)
            if(size(tmpData2) < 1)
            {
                continue
            }
            //數(shù)據(jù)入庫
            pt.append!(tmpData2)
        }
        msg = "successfully loaded!"
        print(msg)
        infoTb.tableInsert(msg)
    }
}

2. 在 DolphinDB 腳本中每個(gè)函數(shù)視圖中定義一個(gè)內(nèi)存表,在任務(wù)執(zhí)行過程中,產(chǎn)生一條運(yùn)行信息時(shí)便寫入該表,函數(shù)執(zhí)行完畢后返回該表 :

use stockData::stockDataLoad
def loadEntrustFV(userName="admin" , userPassword="123456", startDate = 2023.02.01, endDate = 2023.02.01, dbName = "dfs://stockData", tbName = "entrust", filePath = "/hdd/hdd8/ymchen", loadType = "daily")
{
    // 定義運(yùn)行信息表
    infoTb = table(1:0,["info"] ,[STRING])
    if(loadType == "daily")
    {
        sDate = today()
        eDate = today()
        // 將運(yùn)行信息表作為參數(shù)傳入數(shù)據(jù)導(dǎo)入函數(shù)中,將每次需要輸出的信息寫入該表
        loadEntrust(userName, userPassword, sDate, eDate, dbName, tbName, filePath, loadType,infoTb)
    }
    else if(loadType == "batch")
    {
        loadEntrust(userName, userPassword, date(startDate), date(endDate), dbName, tbName, filePath, loadType,infoTb)
    }
    // 返回運(yùn)行信息表
    return infoTb
}

3. 在 DolphinScheduler 工作流中添加SQL查詢節(jié)點(diǎn),用于獲取運(yùn)行信息表的內(nèi)容。主要實(shí)現(xiàn)邏輯如下:a. 通過執(zhí)行股票委托數(shù)據(jù)導(dǎo)入函數(shù)視圖,獲取運(yùn)行信息表對(duì)象;b. 通過SQL查詢節(jié)點(diǎn),將這個(gè)運(yùn)行信息表對(duì)象轉(zhuǎn)化格式,以更直觀的形式顯示在任務(wù)節(jié)點(diǎn)的日志界面中。為實(shí)現(xiàn)以上步驟,可通過兩種方法,以下將詳細(xì)介紹:

?    a. 直接在SQL語句中輸入以下代碼,將以上介紹的兩個(gè)步驟整合在一行代碼中:

"\n[DOLPHINDB INFO] " + concat(exec * from loadEntrustFV(startDate=${startDate},endDate=${endDate},loadType="batch"),"\n[DOLPHINDB INFO] ");

海豚2來了丨DolphinDB 集成 DolphinScheduler,任務(wù)調(diào)度更輕松

?    b. 在SQL查詢類型的前置任務(wù)中用于獲取運(yùn)行信息表,在SQL語句中將該表轉(zhuǎn)化成目標(biāo)格式。

海豚2來了丨DolphinDB 集成 DolphinScheduler,任務(wù)調(diào)度更輕松

4. 整個(gè)工作流結(jié)構(gòu)圖如下所示:

海豚2來了丨DolphinDB 集成 DolphinScheduler,任務(wù)調(diào)度更輕松

5. 通過鼠標(biāo)右鍵點(diǎn)擊相應(yīng)任務(wù)節(jié)點(diǎn),選擇查看日志選項(xiàng),可以查看對(duì)應(yīng)DolphinDB任務(wù)節(jié)點(diǎn)的運(yùn)行信息。

海豚2來了丨DolphinDB 集成 DolphinScheduler,任務(wù)調(diào)度更輕松

2.6 DolphinDB 腳本開發(fā)注意事項(xiàng)

  1. 當(dāng)在模塊中調(diào)用插件的函數(shù)時(shí),需要提前在多個(gè)數(shù)據(jù)節(jié)點(diǎn)和控制節(jié)點(diǎn)導(dǎo)入該插件。
  2. 在 DolphinScheduler 上傳入的參數(shù),是以字符串類型存在的。因此,在 DolphinDB 上需要進(jìn)一步將其改成預(yù)期的數(shù)據(jù)類型,比如:
// 在DolphinScheduler上執(zhí)行的語句為:
loadSnapshotFV(startDate=${startDate},endDate=${endDate},loadType="batch");

// 由于傳入的 startDate 是字符串類型,因此在DolphinDB上定義該函數(shù)時(shí)需要先轉(zhuǎn)成 Date 類型
use stockData::stockDataLoad
def loadEntrustFV(userName="admin" , userPassword="123456", startDate = 2023.02.01, endDate = 2023.02.01, dbName = "dfs://stockData", tbName = "entrust", filePath = "/hdd/hdd/ymchen", loadType = "daily")
{
    if(loadType == "batch")
    {
        // 使用 date(startDate) 轉(zhuǎn)成 Date 類型
        loadEntrust(userName, userPassword, date(startDate), date(endDate), dbName, tbName, filePath, loadType)
    }
}

// 創(chuàng)建函數(shù)視圖
addFunctionView(loadEntrustFV)

3. 當(dāng)模塊文件內(nèi)容更改時(shí),需要按照以下步驟更新函數(shù)視圖

  • 使用clearCachedModules函數(shù)或者重新連接 session 清除之前緩存的模塊。
  • 使用dropFunctionView函數(shù)刪除指定的函數(shù)視圖。
  • 使用use語句重新導(dǎo)入更改后的模塊。
  • 使用addFunctionView函數(shù)添加新的視圖。

3. DolphinScheduler 與 Airflow 對(duì)比

Airflow也是一款具有不錯(cuò)性能的調(diào)度軟件,關(guān)于它與DolphinDB相結(jié)合的教程可參照:DolphinDB與Airflow最佳實(shí)踐。以下是DolphinScheduler與Airflow在一些方面的對(duì)比:

功能 Airflow DolphinScheduler
調(diào)度模塊 自實(shí)現(xiàn) Quartz任務(wù)調(diào)度庫
Job類型 Python、Bash、HTTP、Mysql等,支持Operator的自定義擴(kuò)展 支持傳統(tǒng)的Shell任務(wù),同時(shí)支持大數(shù)據(jù)平臺(tái)任務(wù)調(diào)度:MR、Spark、SQL、Python等
Executor觸發(fā) Restful Restful
工作流 dag → tasks project → flows → tasks
部署運(yùn)維 較復(fù)雜,包括WebServer、Scheduler、Worker 簡單
單點(diǎn)故障 Scheduler存在單點(diǎn)故障風(fēng)險(xiǎn) 去中心化的多Master和多Worker
高可用額外要求 Celery、Dask、Mesos + Load Balancer + DB 不需要(本來就支持)
過載處理 任務(wù)太多時(shí)會(huì)卡死服務(wù)器 任務(wù)隊(duì)列機(jī)制,單個(gè)機(jī)器上可調(diào)度的任務(wù)數(shù)量可以靈活配置,當(dāng)任務(wù)過多時(shí)會(huì)緩存在任務(wù)隊(duì)列中,不會(huì)造成機(jī)器卡死
DAG監(jiān)控界面 不能直觀區(qū)分任務(wù)類型 任務(wù)狀態(tài)、任務(wù)類型、重試次數(shù)、任務(wù)運(yùn)行機(jī)器、可視化變量等關(guān)鍵信息一目了然
可視化流程定義 否。通過python代碼來繪制DAG,使用不便,對(duì)于缺乏編碼基礎(chǔ)的業(yè)務(wù)人員存在較高使用門檻 是。所有流程定義都是可視化的,通過拖拽任務(wù)來繪制DAG,配置數(shù)據(jù)源及資源,同時(shí)對(duì)于第三方系統(tǒng),提供api方式的操作。
快速部署 集群化部署復(fù)雜 一鍵部署
是否能暫停和恢復(fù) 否。只能先將工作流殺死再重新運(yùn)行 支持暫停、恢復(fù)操作,支持停止操作
是否支持集群擴(kuò)展 是,但是僅支持復(fù)雜Executor的水平擴(kuò)展 是,調(diào)度器使用分布式調(diào)度,整體的調(diào)度能力會(huì)隨集群的規(guī)模線性增長,Master和Worker支持動(dòng)態(tài)上下線
開發(fā)語言 Python Java

4.常見問題

1. DolphinScheduler status 顯示 running,web 的登陸頁面卻無法登錄

查看standalone-server/logs/目錄下對(duì)應(yīng)日期的日志中具體情況

2. DolphinScheduler 設(shè)置開機(jī)自啟動(dòng)后,服務(wù)器重啟無法正常啟動(dòng)

正常情況下,由于dolphinScheduler的standalone模式源碼原因,需要在服務(wù)器重啟前到scheduler部署目錄執(zhí)行停止腳本bash ./bin/dolphinscheduler-daemon.sh stop standalone-server; 如果無法正常啟動(dòng),請(qǐng)通過ps aux | grep dolphinscheduler找到dolphinScheduler的所有進(jìn)程,并通過kill -15 進(jìn)程ID結(jié)束這些進(jìn)程后,再到scheduler部署目錄執(zhí)行啟動(dòng)腳本bash ./bin/dolphinscheduler-daemon.sh start standalone-server

5. 附錄

  • ETL腳本模塊:ETLCase.zip
  • 示例數(shù)據(jù):20230201.zip
  • MySQL插件:mysql-connector-j-8.0.31.jar

本文由 白鯨開源 提供發(fā)布支持!文章來源地址http://www.zghlxwxcb.cn/news/detail-776890.html

到了這里,關(guān)于海豚2來了丨DolphinDB 集成 DolphinScheduler,任務(wù)調(diào)度更輕松的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 海豚調(diào)度任務(wù)類型Apache SeaTunnel部署指南

    Apache DolphinScheduler已支持Apache SeaTunnel任務(wù)類型,本文介紹了SeaTunnel任務(wù)類型如何創(chuàng)建,任務(wù)參數(shù),以及任務(wù)樣例。 SeaTunnel 任務(wù)類型,用于創(chuàng)建并執(zhí)行 SeaTunnel 類型任務(wù)。worker 執(zhí)行該任務(wù)的時(shí)候,會(huì)通過 start-seatunnel-spark.sh 、 start-seatunnel-flink.sh 和 seatunnel.sh 命令解析 config 文件

    2024年04月08日
    瀏覽(49)
  • 用海豚調(diào)度器定時(shí)調(diào)度從Kafka到HDFS的kettle任務(wù)腳本

    用海豚調(diào)度器定時(shí)調(diào)度從Kafka到HDFS的kettle任務(wù)腳本

    在實(shí)際項(xiàng)目中,從Kafka到HDFS的數(shù)據(jù)是每天自動(dòng)生成一個(gè)文件,按日期區(qū)分。而且Kafka在不斷生產(chǎn)數(shù)據(jù),因此看看kettle是不是需要時(shí)刻運(yùn)行?能不能按照每日自動(dòng)生成數(shù)據(jù)文件? 為了測(cè)試實(shí)際項(xiàng)目中的海豚定時(shí)調(diào)度從Kafka到HDFS的Kettle任務(wù)情況,特地提前跑一下海豚定時(shí)調(diào)度這個(gè)

    2024年04月15日
    瀏覽(45)
  • 數(shù)倉工具——DolphinScheduler任務(wù)調(diào)度工具

    數(shù)倉工具——DolphinScheduler任務(wù)調(diào)度工具

    在數(shù)倉項(xiàng)目中,掌握一種任務(wù)調(diào)度工具是十分重要的,常用的調(diào)度工具有Azkaban和oozie,這里學(xué)習(xí)一種國產(chǎn)的調(diào)度工具,DolphinScheduler,綜合了兩種調(diào)度工具的特點(diǎn)而產(chǎn)生的。 DolphinScheduler是一個(gè)分布式、易擴(kuò)展的可視化DAG工作流調(diào)度平臺(tái),致力于解決數(shù)據(jù)處理流程中錯(cuò)綜復(fù)雜的

    2023年04月15日
    瀏覽(25)
  • 一百六十八、Kettle——用海豚調(diào)度器定時(shí)調(diào)度從Kafka到HDFS的任務(wù)腳本(持續(xù)更新追蹤、持續(xù)完善)

    一百六十八、Kettle——用海豚調(diào)度器定時(shí)調(diào)度從Kafka到HDFS的任務(wù)腳本(持續(xù)更新追蹤、持續(xù)完善)

    在實(shí)際項(xiàng)目中,從Kafka到HDFS的數(shù)據(jù)是每天自動(dòng)生成一個(gè)文件,按日期區(qū)分。而且Kafka在不斷生產(chǎn)數(shù)據(jù),因此看看kettle是不是需要時(shí)刻運(yùn)行?能不能按照每日自動(dòng)生成數(shù)據(jù)文件? 為了測(cè)試實(shí)際項(xiàng)目中的海豚定時(shí)調(diào)度從Kafka到HDFS的kettle任務(wù)情況,特地提前跑一下海豚定時(shí)調(diào)度這個(gè)

    2024年02月10日
    瀏覽(28)
  • 一百六十五、Kettle——用海豚調(diào)度器調(diào)度Linux資源庫中的kettle任務(wù)腳本(親測(cè)、附流程截圖)

    一百六十五、Kettle——用海豚調(diào)度器調(diào)度Linux資源庫中的kettle任務(wù)腳本(親測(cè)、附流程截圖)

    在Linux上腳本運(yùn)行kettle的轉(zhuǎn)換任務(wù)、無論是Linux本地還是Linux資源庫都成功后,接下來就是用海豚調(diào)度Linux上kettle任務(wù) 尤其是團(tuán)隊(duì)開發(fā)中,基本都要使用共享資源庫,所以我直接使用海豚調(diào)度Linux資源庫的kettle任務(wù)腳本 1、先開啟zookeeper服務(wù) 2、再開啟海豚調(diào)度器服務(wù) 3、開啟服

    2024年02月11日
    瀏覽(54)
  • 使用 Apache DolphinScheduler 進(jìn)行 EMR 任務(wù)調(diào)度

    使用 Apache DolphinScheduler 進(jìn)行 EMR 任務(wù)調(diào)度

    By AWS Team 隨著企業(yè)規(guī)模的擴(kuò)大,業(yè)務(wù)數(shù)據(jù)的激增,我們會(huì)使用 Hadoop/Spark 框架來處理大量數(shù)據(jù)的 ETL/聚合分析作業(yè),?這些作業(yè)將需要由統(tǒng)一的作業(yè)調(diào)度平臺(tái)去定時(shí)調(diào)度。 在 Amazon EMR 中,可以使用 AWS 提供 Step Function,托管 AirFlow,以及 Apache Oozie 或 Azkaban 進(jìn)行作業(yè)的調(diào)用。但隨

    2024年02月16日
    瀏覽(21)
  • 一百六十八、Kettle——用海豚調(diào)度器定時(shí)調(diào)度從Kafka到HDFS的kettle任務(wù)腳本(持續(xù)更新追蹤、持續(xù)完善)

    一百六十八、Kettle——用海豚調(diào)度器定時(shí)調(diào)度從Kafka到HDFS的kettle任務(wù)腳本(持續(xù)更新追蹤、持續(xù)完善)

    在實(shí)際項(xiàng)目中,從Kafka到HDFS的數(shù)據(jù)是每天自動(dòng)生成一個(gè)文件,按日期區(qū)分。而且Kafka在不斷生產(chǎn)數(shù)據(jù),因此看看kettle是不是需要時(shí)刻運(yùn)行?能不能按照每日自動(dòng)生成數(shù)據(jù)文件? 為了測(cè)試實(shí)際項(xiàng)目中的海豚定時(shí)調(diào)度從Kafka到HDFS的kettle任務(wù)情況,特地提前跑一下海豚定時(shí)調(diào)度這個(gè)

    2024年02月09日
    瀏覽(21)
  • 開源任務(wù)調(diào)度平臺(tái)dolphinscheduler部署及使用指南(未完)

    開源任務(wù)調(diào)度平臺(tái)dolphinscheduler部署及使用指南(未完)

    目錄 一 dolphinsheduler調(diào)研 支持的任務(wù)類型: 1.1 dolphinsheduler集群部署 1.1.1 需要的環(huán)境 1.1.2 dolphinsheduler安裝 可能的報(bào)錯(cuò):zk正常,master或worker一段時(shí)間后掛掉 問題原因 解決辦法 1.1.3 資源中心配置 1.2 參數(shù) 1.2.1 任務(wù)中可能出現(xiàn)的所有參數(shù) 1.2.2 內(nèi)置參數(shù) 1.2.3 全局參數(shù) 1.2.4 本地參

    2023年04月19日
    瀏覽(40)
  • 第二十章 分布式任務(wù)調(diào)度中心&DolphinScheduler架構(gòu)設(shè)計(jì)

    第二十章 分布式任務(wù)調(diào)度中心&DolphinScheduler架構(gòu)設(shè)計(jì)

    1、調(diào)度系統(tǒng)概述 1.1、調(diào)度系統(tǒng)介紹 含義:在 指定時(shí)間協(xié)調(diào)器 通過分布式執(zhí)行器并行執(zhí)行任務(wù)。 (1)目標(biāo) ? 分布式環(huán)境下處理任務(wù)調(diào)度,在基于給定的時(shí)間點(diǎn),給定的時(shí)間間隔或者給定執(zhí)行次數(shù)自動(dòng)的執(zhí)行任務(wù)。 (2)作用 分布式調(diào)度 作業(yè)高可用 最大限度利用資源 (

    2024年02月08日
    瀏覽(22)
  • 使用Docker部署開源分布式任務(wù)調(diào)度系統(tǒng)DolphinScheduler

    使用Docker部署開源分布式任務(wù)調(diào)度系統(tǒng)DolphinScheduler

    ?? 博客主頁 : 小羊失眠啦. ?? 系列專欄 : 《C語言》 《數(shù)據(jù)結(jié)構(gòu)》 《Linux》 《Cpolar》 ?? 感謝大家點(diǎn)贊??收藏?評(píng)論?? 前些天發(fā)現(xiàn)了一個(gè)巨牛的人工智能學(xué)習(xí)網(wǎng)站,通俗易懂,風(fēng)趣幽默,忍不住分享一下給大家。點(diǎn)擊跳轉(zhuǎn)到網(wǎng)站。 本篇教程和大家分享一下DolphinSc

    2024年02月05日
    瀏覽(49)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包