一、前言
-
SeaTunnel是一個(gè)分布式、高性能、可擴(kuò)展的數(shù)據(jù)同步工具,它支持多種數(shù)據(jù)源之間的數(shù)據(jù)同步,包括Hive和StarRocks??梢允褂肧eaTunnel的Hive源連接器從Hive讀取外部數(shù)據(jù)源數(shù)據(jù),然后使用StarRocks接收器連接器將數(shù)據(jù)發(fā)送到StarRocks。
-
通過StarRocks讀取外部數(shù)據(jù)源數(shù)據(jù)。StarRocks源連接器的內(nèi)部實(shí)現(xiàn)是從前端(FE)獲得查詢計(jì)劃,將查詢計(jì)劃作為參數(shù)傳遞給BE節(jié)點(diǎn),然后從BE節(jié)點(diǎn)獲得數(shù)據(jù)結(jié)果。
名稱 | 版本 |
---|---|
StarRocks | 2.4.2 |
SeaTunnel | 2.3.1 |
Spark | 3.2.1 |
Flink | 1.16.1 |
二、安裝SeaTunnel
- 安裝并設(shè)置Java(Java 8 或 11,其他高于 Java 8 的版本理論上也可以使用)JAVA_HOME。
- 進(jìn)入seatunnel下載頁(yè)面,下載最新版本的distribute packageseatunnel--bin.tar.gz,或者可以通過終端下載(下載很慢,這邊已上傳至云盤,可以直接自?。?/li>
export version="2.3.1"
wget "https://archive.apache.org/dist/incubator/seatunnel/${version}/apache-seatunnel-incubating-${version}-bin.tar.gz"
tar -xzvf "apache-seatunnel-incubating-${version}-bin.tar.gz"
鏈接:https://pan.baidu.com/s/1nT0BgUutW66cyiu2C_jqIg
提取碼:acdy
- 安裝連接器
- 從2.2.0-beta開始,二進(jìn)制包默認(rèn)不提供connector依賴,所以第一次使用時(shí),我們需要執(zhí)行如下命令安裝connector:(當(dāng)然也可以手動(dòng)下載connector從(Apache Maven Repository下載,然后手動(dòng)移動(dòng)到connectors目錄下的seatunnel子目錄)。
- 指定connector的版本,執(zhí)行
/bin/bash /app/apache-seatunnel-incubating-2.3.1/bin/install-plugin.sh 2.3.1
- 安裝完檢查connectors目錄文件
- 配置 SeaTunnel,更改設(shè)置config/seatunnel-env.sh
- 自行選擇合適的spark(要求版本>=2.4.0)、flink 版本(要求版本>=1.12.0)
vim /app/apache-seatunnel-incubating-2.3.1/config/seatunnel-env.sh
- 運(yùn)行SeaTunnel,查看是否部署安裝成功
-- 本地模式
/app/apache-seatunnel-incubating-2.3.1/bin/start-seatunnel-spark-3-connector-v2.sh \
-m local[*] \
-e client \
-c /app/apache-seatunnel-incubating-2.3.1/config/seatunnel.streaming.conf.template
-- 集群模式
/app/apache-seatunnel-incubating-2.3.1/bin/start-seatunnel-spark-3-connector-v2.sh \
-m yarn \
-e client \
-c /app/apache-seatunnel-incubating-2.3.1/config/seatunnel.streaming.conf.template
- SeaTunnel 控制臺(tái)會(huì)打印一些日志如下
三、配置Seatunnel config
1、Hive source
- 配置 Hive源,您需要在SeaTunnel作業(yè)配置文件中指定Hive的連接信息,包括metastore_uri、table_name。更多seatunnel source hive 例如:
source {
Hive {
#parallelism = 6
table_name = "mid.ads_test_hive_starrocks_ds"
metastore_uri = "thrift://192.168.10.200:9083"
result_table_name = "hive_starrocks_ds"
}
}
2、Seatunnel Transform
- Transform 是指在數(shù)據(jù)遷移過程中對(duì)數(shù)據(jù)進(jìn)行轉(zhuǎn)換的過程。它支持多種轉(zhuǎn)換插件,包括Json、NullRate、Nulltf、Replace、Split、Sql、udf和UUID等,Transform插件具有一些通用參數(shù),可以在SeaTunnel作業(yè)配置文件中指定這些參數(shù)來控制數(shù)據(jù)轉(zhuǎn)換的行為。更多seatunnel Transform 例如:
transform {
Filter {
source_table_name = "fake"
fields = [name]
result_table_name = "fake_name"
}
Filter {
source_table_name = "fake"
fields = [age]
result_table_name = "fake_age"
}
}
3、StarRocks sink
- 配置 StarRocks接收器,您需要在SeaTunnel作業(yè)配置文件中指定StarRocks的連接信息,包括JDBC URL、用戶名和密碼。更多seatunnel sink starrocks 例如:
sink {
starrocks {
nodeUrls = ["192.168.10.10:8030","192.168.10.11:8030","192.168.10.12:8030"]
base-url = "jdbc:mysql://192.168.10.10:9030/"
username = root
password = "xxxxxxxxx"
database = "example_db"
table = "ads_test_hive_starrocks_ds"
batch_max_rows = 500000
batch_max_bytes = 104857600
batch_interval_ms = 30000
starrocks.config = {
format = "CSV"
column_separator = "\\x01"
row_delimiter = "\\x02"
}
}
}
四、Run SeaTunnel hive_to_starrocks
cat /app/apache-seatunnel-incubating-2.3.1/config/hive_to_sr2.conf
env {
spark.app.name = "apache-seatunnel-2.3.1_hive_to_sr"
spark.yarn.queue = "root.default"
spark.executor.instances = 2
spark.executor.cores = 4
spark.driver.memory = "3g"
spark.executor.memory = "4g"
spark.ui.port = 1300
spark.sql.catalogImplementation = "hive"
spark.hadoop.hive.exec.dynamic.partition = "true"
spark.hadoop.hive.exec.dynamic.partition.mode = "nonstrict"
spark.network.timeout = "1200s"
spark.sql.sources.partitionOverwriteMode = "dynamic"
spark.yarn.executor.memoryOverhead = 800m
spark.kryoserializer.buffer.max = 512m
spark.task.maxFailures=0
spark.executor.extraJavaOptions = "-Dfile.encoding=UTF-8"
spark.driver.extraJavaOptions = "-Dfile.encoding=UTF-8"
job.name = "apache-seatunnel-2.3.1_hive_to_sr"
}
source {
Hive {
#parallelism = 6
table_name = "mid.ads_test_hive_starrocks_ds"
metastore_uri = "thrift://192.168.10.200:9083"
result_table_name = "hive_starrocks_ds_t1"
}
}
transform {
sql {
query ="select xxx,xxx,xxx,xxx from hive_starrocks_ds_t1 where period_sdate >= '2022-10-31'"
source_table_name = "hive_starrocks_ds_t1"
result_table_name = "hive_starrocks_ds_t2"
}
}
sink {
starrocks {
nodeUrls = ["192.168.10.10:8030","192.168.10.11:8030","192.168.10.12:8030"]
base-url = "jdbc:mysql://192.168.10.10:9030/"
username = root
password = "xxxxxxxxx"
database = "example_db"
table = "ads_test_hive_starrocks_ds"
batch_max_rows = 500000
batch_max_bytes = 104857600
batch_interval_ms = 30000
starrocks.config = {
format = "CSV"
column_separator = "\\x01"
row_delimiter = "\\x02"
}
}
}
- spark3.x.x
sudo -u hive /app/apache-seatunnel-incubating-2.3.1/bin/start-seatunnel-spark-3-connector-v2.sh \
-m yarn \
-e client \
-c /app/apache-seatunnel-incubating-2.3.1/config/hive_to_sr2.conf
- spark2.x.x
sudo -u hive /app/apache-seatunnel-incubating-2.3.1/bin/start-seatunnel-spark-2-connector-v2.sh \
-m yarn \
-e client \
-c /app/apache-seatunnel-incubating-2.3.1/config/hive_to_sr2.conf
五、總結(jié)
1、問題總結(jié)
- A. 中文亂碼問題
設(shè)置環(huán)境變量來解決中文亂碼問題??梢栽趀nv中添加以下參數(shù),這將設(shè)置Java虛擬機(jī)的編碼格式為UTF-8,以便正確處理中文字符。
spark.executor.extraJavaOptions = "-Dfile.encoding=UTF-8"
spark.driver.extraJavaOptions = "-Dfile.encoding=UTF-8"
- B. 內(nèi)存限制而被YARN殺死
Spark 程序在 YARN 集群上運(yùn)行時(shí),由于超出了內(nèi)存限制而丟失了一個(gè)執(zhí)行器。考慮增加 spark.yarn.executor.memoryOverhead 用于指定每個(gè)執(zhí)行器保留的用于內(nèi)部元數(shù)據(jù)、用戶數(shù)據(jù)結(jié)構(gòu)和其他堆外內(nèi)存需求的堆外內(nèi)存量。該參數(shù)的值將添加到執(zhí)行器內(nèi)存中,以確定每個(gè)執(zhí)行器對(duì) YARN 的完整內(nèi)存請(qǐng)求。建議不要將此值設(shè)置得過高,因?yàn)檫@可能會(huì)導(dǎo)致過多的垃圾收集開銷和性能降低。
spark.yarn.executor.memoryOverhead = 800M
- C. db 2153532 is 100 larger than limit 100?
錯(cuò)誤信息表明您在嘗試將數(shù)據(jù)刷新到 StarRocks 時(shí)遇到了問題。在 db 2153532 上運(yùn)行的事務(wù)數(shù)為 100,超過了限制 100??梢試L試減少并發(fā)事務(wù)的數(shù)量,以減輕集群的壓力。另外可以調(diào)整相關(guān)參數(shù)。
-- 修改事務(wù)數(shù)
ADMIN SHOW FRONTEND CONFIG ('max_running_txn_num_per_db' = '300')
-- 查看參數(shù)是否調(diào)整
ADMIN SHOW FRONTEND CONFIG LIKE '%max_running_txn_nu%';
- D. sink導(dǎo)入starrocks數(shù)據(jù)量不對(duì)
剛開始遇到這個(gè)問題挺納悶,懷疑是Seatunnel設(shè)置的任務(wù)重試導(dǎo)致starrocks這邊數(shù)據(jù)量變多,看到官方文檔有設(shè)置重試參數(shù)為max_retries,將其設(shè)置為0,重跑還是有問題,明明在yarn上面提交的任務(wù)application是成功的。
后來查看spark ui看到有3個(gè)Failed Tasks之后才明白,原來是spark某幾個(gè)tasks內(nèi)存不足導(dǎo)致tasks失敗重試,導(dǎo)致sink導(dǎo)入的數(shù)據(jù)量變多。
解決方案:
1、使用starrocks主鍵模型進(jìn)行數(shù)據(jù)去重特性,保證數(shù)據(jù)唯一性的(目前官方最新版本2.3.1還不支持sink_starrocks exectly-once保障數(shù)據(jù)冪等性)
2、將spark參數(shù)重試機(jī)制設(shè)置為0,這樣設(shè)置后,當(dāng)任務(wù)執(zhí)行失敗時(shí),Spark不再嘗試重新執(zhí)行該任務(wù)。
spark.task.maxFailures=0
3、要確定spark.task.maxFailures=0是否在Spark任務(wù)中生效,可以查看Spark任務(wù)的日志文件。當(dāng)Spark任務(wù)失敗時(shí),應(yīng)該可以在日志中找到類似于以下內(nèi)容的行:文章來源:http://www.zghlxwxcb.cn/news/detail-625783.html
23/05/12 15:17:55 ERROR scheduler.TaskSetManager: Task 165 in stage 1.0 failed 0 times; aborting job
可以看出任務(wù)失敗了一次,但是并沒有重試,因?yàn)槿罩局械腻e(cuò)誤信息是Task 165 in stage 1.0 failed 0 times。這是因?yàn)閷park.task.maxFailures設(shè)置為0,表示任務(wù)失敗后不進(jìn)行重試。文章來源地址http://www.zghlxwxcb.cn/news/detail-625783.html
2、使用總結(jié)
- 本篇文章帶大家了解使用Seatunnel將Hive中的數(shù)據(jù)導(dǎo)入到StarRocks中,除此之外,Seatunnel還有很多種數(shù)據(jù)源可以支持,也有很多種導(dǎo)入方式,例如DataX 、CloudCanal 等
- 將數(shù)倉(cāng)中跑完的Hive的相關(guān)表每天導(dǎo)入到StarRocks中,可以使用以下場(chǎng)景:
1、不更新歷史數(shù)據(jù): 如果是分區(qū)表,我們?cè)隽繉?dǎo)入到 StarRocks 中即可。非分區(qū)表全量導(dǎo)入。
2、更新歷史數(shù)據(jù): 這種情況主要存在分區(qū)表中,往往會(huì)更改前幾個(gè)月數(shù)據(jù)或者時(shí)間更久的數(shù)據(jù),這種情況下,就不得不將該表重新同步一邊,使StarRocks中的數(shù)據(jù)和hive中的數(shù)據(jù)保持一致。hive中表的元數(shù)據(jù)發(fā)生變化,和StarRocks中的表結(jié)構(gòu)不一致: 這種情況下,就需要我們刪除重新建表或者truncate歷史分區(qū),重新同步數(shù)據(jù)。
到了這里,關(guān)于Seatunnel實(shí)戰(zhàn):hive_to_starrocks的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!