国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Seatunnel實(shí)戰(zhàn):hive_to_starrocks

這篇具有很好參考價(jià)值的文章主要介紹了Seatunnel實(shí)戰(zhàn):hive_to_starrocks。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

一、前言

  • SeaTunnel是一個(gè)分布式、高性能、可擴(kuò)展的數(shù)據(jù)同步工具,它支持多種數(shù)據(jù)源之間的數(shù)據(jù)同步,包括Hive和StarRocks??梢允褂肧eaTunnel的Hive源連接器從Hive讀取外部數(shù)據(jù)源數(shù)據(jù),然后使用StarRocks接收器連接器將數(shù)據(jù)發(fā)送到StarRocks。

  • 通過StarRocks讀取外部數(shù)據(jù)源數(shù)據(jù)。StarRocks源連接器的內(nèi)部實(shí)現(xiàn)是從前端(FE)獲得查詢計(jì)劃,將查詢計(jì)劃作為參數(shù)傳遞給BE節(jié)點(diǎn),然后從BE節(jié)點(diǎn)獲得數(shù)據(jù)結(jié)果。

名稱 版本
StarRocks 2.4.2
SeaTunnel 2.3.1
Spark 3.2.1
Flink 1.16.1

二、安裝SeaTunnel

  1. 安裝并設(shè)置Java(Java 8 或 11,其他高于 Java 8 的版本理論上也可以使用)JAVA_HOME。
  2. 進(jìn)入seatunnel下載頁(yè)面,下載最新版本的distribute packageseatunnel--bin.tar.gz,或者可以通過終端下載(下載很慢,這邊已上傳至云盤,可以直接自?。?/li>
export version="2.3.1"

wget "https://archive.apache.org/dist/incubator/seatunnel/${version}/apache-seatunnel-incubating-${version}-bin.tar.gz"
tar -xzvf "apache-seatunnel-incubating-${version}-bin.tar.gz"

鏈接:https://pan.baidu.com/s/1nT0BgUutW66cyiu2C_jqIg
提取碼:acdy

  1. 安裝連接器
  • 從2.2.0-beta開始,二進(jìn)制包默認(rèn)不提供connector依賴,所以第一次使用時(shí),我們需要執(zhí)行如下命令安裝connector:(當(dāng)然也可以手動(dòng)下載connector從(Apache Maven Repository下載,然后手動(dòng)移動(dòng)到connectors目錄下的seatunnel子目錄)。
  • 指定connector的版本,執(zhí)行
/bin/bash /app/apache-seatunnel-incubating-2.3.1/bin/install-plugin.sh 2.3.1

Seatunnel實(shí)戰(zhàn):hive_to_starrocks,starrocks,hive,大數(shù)據(jù)

  1. 安裝完檢查connectors目錄文件

Seatunnel實(shí)戰(zhàn):hive_to_starrocks,starrocks,hive,大數(shù)據(jù)

  1. 配置 SeaTunnel,更改設(shè)置config/seatunnel-env.sh
  • 自行選擇合適的spark(要求版本>=2.4.0)、flink 版本(要求版本>=1.12.0)
vim /app/apache-seatunnel-incubating-2.3.1/config/seatunnel-env.sh 

Seatunnel實(shí)戰(zhàn):hive_to_starrocks,starrocks,hive,大數(shù)據(jù)

  1. 運(yùn)行SeaTunnel,查看是否部署安裝成功
-- 本地模式
/app/apache-seatunnel-incubating-2.3.1/bin/start-seatunnel-spark-3-connector-v2.sh \
-m local[*] \
-e client \
-c /app/apache-seatunnel-incubating-2.3.1/config/seatunnel.streaming.conf.template 

-- 集群模式
/app/apache-seatunnel-incubating-2.3.1/bin/start-seatunnel-spark-3-connector-v2.sh \
-m yarn \
-e client \
-c /app/apache-seatunnel-incubating-2.3.1/config/seatunnel.streaming.conf.template 

Seatunnel實(shí)戰(zhàn):hive_to_starrocks,starrocks,hive,大數(shù)據(jù)

  1. SeaTunnel 控制臺(tái)會(huì)打印一些日志如下

Seatunnel實(shí)戰(zhàn):hive_to_starrocks,starrocks,hive,大數(shù)據(jù)

三、配置Seatunnel config

1、Hive source

  • 配置 Hive源,您需要在SeaTunnel作業(yè)配置文件中指定Hive的連接信息,包括metastore_uri、table_name。更多seatunnel source hive 例如:
 source {
 
  Hive {
    #parallelism = 6 
    table_name = "mid.ads_test_hive_starrocks_ds"
    metastore_uri = "thrift://192.168.10.200:9083"
    result_table_name = "hive_starrocks_ds"
  }
   
}

Seatunnel實(shí)戰(zhàn):hive_to_starrocks,starrocks,hive,大數(shù)據(jù)

2、Seatunnel Transform

  • Transform 是指在數(shù)據(jù)遷移過程中對(duì)數(shù)據(jù)進(jìn)行轉(zhuǎn)換的過程。它支持多種轉(zhuǎn)換插件,包括Json、NullRate、Nulltf、Replace、Split、Sql、udf和UUID等,Transform插件具有一些通用參數(shù),可以在SeaTunnel作業(yè)配置文件中指定這些參數(shù)來控制數(shù)據(jù)轉(zhuǎn)換的行為。更多seatunnel Transform 例如:
transform {
    Filter {
      source_table_name = "fake"
      fields = [name]
      result_table_name = "fake_name"
    }
    Filter {
      source_table_name = "fake"
      fields = [age]
      result_table_name = "fake_age"
    }
}

Seatunnel實(shí)戰(zhàn):hive_to_starrocks,starrocks,hive,大數(shù)據(jù)

3、StarRocks sink

  • 配置 StarRocks接收器,您需要在SeaTunnel作業(yè)配置文件中指定StarRocks的連接信息,包括JDBC URL、用戶名和密碼。更多seatunnel sink starrocks 例如:
sink {
        starrocks {
                nodeUrls = ["192.168.10.10:8030","192.168.10.11:8030","192.168.10.12:8030"]
                base-url = "jdbc:mysql://192.168.10.10:9030/"
                username = root
                password = "xxxxxxxxx"
                database = "example_db"
                table = "ads_test_hive_starrocks_ds"
                batch_max_rows = 500000
                batch_max_bytes = 104857600
                batch_interval_ms = 30000
                starrocks.config = {
                format = "CSV"
                column_separator = "\\x01"
                row_delimiter = "\\x02"
                }
        }
}

Seatunnel實(shí)戰(zhàn):hive_to_starrocks,starrocks,hive,大數(shù)據(jù)

四、Run SeaTunnel hive_to_starrocks

cat /app/apache-seatunnel-incubating-2.3.1/config/hive_to_sr2.conf

env {
        spark.app.name = "apache-seatunnel-2.3.1_hive_to_sr"
        spark.yarn.queue = "root.default"
        spark.executor.instances = 2
        spark.executor.cores = 4
        spark.driver.memory = "3g"
        spark.executor.memory = "4g"
        spark.ui.port = 1300
        spark.sql.catalogImplementation = "hive"
        spark.hadoop.hive.exec.dynamic.partition = "true"
        spark.hadoop.hive.exec.dynamic.partition.mode = "nonstrict"
        spark.network.timeout = "1200s"
        spark.sql.sources.partitionOverwriteMode = "dynamic"
        spark.yarn.executor.memoryOverhead = 800m
        spark.kryoserializer.buffer.max = 512m
        spark.task.maxFailures=0
        spark.executor.extraJavaOptions = "-Dfile.encoding=UTF-8"
        spark.driver.extraJavaOptions = "-Dfile.encoding=UTF-8"
        job.name = "apache-seatunnel-2.3.1_hive_to_sr"
}

source {

  Hive {
    #parallelism = 6 
    table_name = "mid.ads_test_hive_starrocks_ds"
    metastore_uri = "thrift://192.168.10.200:9083"
    result_table_name = "hive_starrocks_ds_t1"
  }
  
}

transform {
 sql {
      query  ="select xxx,xxx,xxx,xxx from hive_starrocks_ds_t1  where period_sdate >= '2022-10-31'"
      source_table_name = "hive_starrocks_ds_t1"
      result_table_name = "hive_starrocks_ds_t2"
 }
}

sink {
        starrocks {
                nodeUrls = ["192.168.10.10:8030","192.168.10.11:8030","192.168.10.12:8030"]
                base-url = "jdbc:mysql://192.168.10.10:9030/"
                username = root
                password = "xxxxxxxxx"
                database = "example_db"
                table = "ads_test_hive_starrocks_ds"
                batch_max_rows = 500000
                batch_max_bytes = 104857600
                batch_interval_ms = 30000
                starrocks.config = {
                format = "CSV"
                column_separator = "\\x01"
                row_delimiter = "\\x02"
                }
        }
}
  • spark3.x.x
sudo -u hive /app/apache-seatunnel-incubating-2.3.1/bin/start-seatunnel-spark-3-connector-v2.sh \
-m yarn \
-e client \
-c /app/apache-seatunnel-incubating-2.3.1/config/hive_to_sr2.conf 
  • spark2.x.x
sudo -u hive /app/apache-seatunnel-incubating-2.3.1/bin/start-seatunnel-spark-2-connector-v2.sh \
-m yarn \
-e client \
-c /app/apache-seatunnel-incubating-2.3.1/config/hive_to_sr2.conf 

五、總結(jié)

1、問題總結(jié)

  • A. 中文亂碼問題

設(shè)置環(huán)境變量來解決中文亂碼問題??梢栽趀nv中添加以下參數(shù),這將設(shè)置Java虛擬機(jī)的編碼格式為UTF-8,以便正確處理中文字符。

spark.executor.extraJavaOptions = "-Dfile.encoding=UTF-8"
spark.driver.extraJavaOptions = "-Dfile.encoding=UTF-8"

Seatunnel實(shí)戰(zhàn):hive_to_starrocks,starrocks,hive,大數(shù)據(jù)

  • B. 內(nèi)存限制而被YARN殺死

Spark 程序在 YARN 集群上運(yùn)行時(shí),由于超出了內(nèi)存限制而丟失了一個(gè)執(zhí)行器。考慮增加 spark.yarn.executor.memoryOverhead 用于指定每個(gè)執(zhí)行器保留的用于內(nèi)部元數(shù)據(jù)、用戶數(shù)據(jù)結(jié)構(gòu)和其他堆外內(nèi)存需求的堆外內(nèi)存量。該參數(shù)的值將添加到執(zhí)行器內(nèi)存中,以確定每個(gè)執(zhí)行器對(duì) YARN 的完整內(nèi)存請(qǐng)求。建議不要將此值設(shè)置得過高,因?yàn)檫@可能會(huì)導(dǎo)致過多的垃圾收集開銷和性能降低。

spark.yarn.executor.memoryOverhead = 800M

Seatunnel實(shí)戰(zhàn):hive_to_starrocks,starrocks,hive,大數(shù)據(jù)

  • C. db 2153532 is 100 larger than limit 100?

錯(cuò)誤信息表明您在嘗試將數(shù)據(jù)刷新到 StarRocks 時(shí)遇到了問題。在 db 2153532 上運(yùn)行的事務(wù)數(shù)為 100,超過了限制 100??梢試L試減少并發(fā)事務(wù)的數(shù)量,以減輕集群的壓力。另外可以調(diào)整相關(guān)參數(shù)。

-- 修改事務(wù)數(shù)
ADMIN SHOW FRONTEND CONFIG ('max_running_txn_num_per_db' = '300')
-- 查看參數(shù)是否調(diào)整
ADMIN SHOW FRONTEND CONFIG LIKE '%max_running_txn_nu%';

Seatunnel實(shí)戰(zhàn):hive_to_starrocks,starrocks,hive,大數(shù)據(jù)
Seatunnel實(shí)戰(zhàn):hive_to_starrocks,starrocks,hive,大數(shù)據(jù)

  • D. sink導(dǎo)入starrocks數(shù)據(jù)量不對(duì)

剛開始遇到這個(gè)問題挺納悶,懷疑是Seatunnel設(shè)置的任務(wù)重試導(dǎo)致starrocks這邊數(shù)據(jù)量變多,看到官方文檔有設(shè)置重試參數(shù)為max_retries,將其設(shè)置為0,重跑還是有問題,明明在yarn上面提交的任務(wù)application是成功的。
Seatunnel實(shí)戰(zhàn):hive_to_starrocks,starrocks,hive,大數(shù)據(jù)
Seatunnel實(shí)戰(zhàn):hive_to_starrocks,starrocks,hive,大數(shù)據(jù)

后來查看spark ui看到有3個(gè)Failed Tasks之后才明白,原來是spark某幾個(gè)tasks內(nèi)存不足導(dǎo)致tasks失敗重試,導(dǎo)致sink導(dǎo)入的數(shù)據(jù)量變多。

Seatunnel實(shí)戰(zhàn):hive_to_starrocks,starrocks,hive,大數(shù)據(jù)
Seatunnel實(shí)戰(zhàn):hive_to_starrocks,starrocks,hive,大數(shù)據(jù)
Seatunnel實(shí)戰(zhàn):hive_to_starrocks,starrocks,hive,大數(shù)據(jù)
解決方案:
1、使用starrocks主鍵模型進(jìn)行數(shù)據(jù)去重特性,保證數(shù)據(jù)唯一性的(目前官方最新版本2.3.1還不支持sink_starrocks exectly-once保障數(shù)據(jù)冪等性)

2、將spark參數(shù)重試機(jī)制設(shè)置為0,這樣設(shè)置后,當(dāng)任務(wù)執(zhí)行失敗時(shí),Spark不再嘗試重新執(zhí)行該任務(wù)。

spark.task.maxFailures=0

Seatunnel實(shí)戰(zhàn):hive_to_starrocks,starrocks,hive,大數(shù)據(jù)
3、要確定spark.task.maxFailures=0是否在Spark任務(wù)中生效,可以查看Spark任務(wù)的日志文件。當(dāng)Spark任務(wù)失敗時(shí),應(yīng)該可以在日志中找到類似于以下內(nèi)容的行:

23/05/12 15:17:55 ERROR scheduler.TaskSetManager: Task 165 in stage 1.0 failed 0 times; aborting job

可以看出任務(wù)失敗了一次,但是并沒有重試,因?yàn)槿罩局械腻e(cuò)誤信息是Task 165 in stage 1.0 failed 0 times。這是因?yàn)閷park.task.maxFailures設(shè)置為0,表示任務(wù)失敗后不進(jìn)行重試。文章來源地址http://www.zghlxwxcb.cn/news/detail-625783.html

2、使用總結(jié)

  • 本篇文章帶大家了解使用Seatunnel將Hive中的數(shù)據(jù)導(dǎo)入到StarRocks中,除此之外,Seatunnel還有很多種數(shù)據(jù)源可以支持,也有很多種導(dǎo)入方式,例如DataX 、CloudCanal 等
  • 將數(shù)倉(cāng)中跑完的Hive的相關(guān)表每天導(dǎo)入到StarRocks中,可以使用以下場(chǎng)景:
    1、不更新歷史數(shù)據(jù): 如果是分區(qū)表,我們?cè)隽繉?dǎo)入到 StarRocks 中即可。非分區(qū)表全量導(dǎo)入。
    2、更新歷史數(shù)據(jù): 這種情況主要存在分區(qū)表中,往往會(huì)更改前幾個(gè)月數(shù)據(jù)或者時(shí)間更久的數(shù)據(jù),這種情況下,就不得不將該表重新同步一邊,使StarRocks中的數(shù)據(jù)和hive中的數(shù)據(jù)保持一致。hive中表的元數(shù)據(jù)發(fā)生變化,和StarRocks中的表結(jié)構(gòu)不一致: 這種情況下,就需要我們刪除重新建表或者truncate歷史分區(qū),重新同步數(shù)據(jù)。

到了這里,關(guān)于Seatunnel實(shí)戰(zhàn):hive_to_starrocks的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 使用Apache SeaTunnel進(jìn)行數(shù)據(jù)庫(kù)同步(MySQL to MySQL)

    Apache SeaTunnel 起到的主要作用是什么? 目前,大數(shù)據(jù)體系里有各種各樣的數(shù)據(jù)引擎,有大數(shù)據(jù)生態(tài)的 Hadoop、Hive、Kudu、Kafka、HDFS,也有泛大數(shù)據(jù)庫(kù)體系的 MongoDB、Redis、ClickHouse、Doris,更有云上的 AWS S3、Redshift、BigQuery、Snowflake,還有各種各樣數(shù)據(jù)生態(tài) MySQL、PostgresSQL、IoTDB、

    2024年02月15日
    瀏覽(23)
  • Kafka To HBase To Hive

    目錄 1.在HBase中創(chuàng)建表 2.寫入API 2.1普通模式寫入hbase(逐條寫入) 2.2普通模式寫入hbase(buffer寫入) 2.3設(shè)計(jì)模式寫入hbase(buffer寫入) 3.HBase表映射至Hive中 hbase(main):003:0 create_namespace \\\'events_db\\\'????????????????????????????????????????????????? hbase(main)

    2024年02月08日
    瀏覽(13)
  • Hive to StarRocks

    將Hive 中的數(shù)據(jù)導(dǎo)入到 StarRocks 中的Broker Load的導(dǎo)入方式。在Broker Load模式下,通過部署的Broker程序,StarRocks可讀取對(duì)應(yīng)數(shù)據(jù)源(如HDFS, S3)上的數(shù)據(jù),利用自身的計(jì)算資源對(duì)數(shù)據(jù)進(jìn)行預(yù)處理和導(dǎo)入。這是一種 異步 的導(dǎo)入方式,用戶需要通過MySQL協(xié)議創(chuàng)建導(dǎo)入,并通過查看導(dǎo)入

    2024年02月11日
    瀏覽(17)
  • Spark 增量抽取 Mysql To Hive

    抽取ds_db01庫(kù)中customer_inf的增量數(shù)據(jù)進(jìn)入Hive的ods庫(kù)中表customer_inf。根據(jù)ods.user_info表中modified_time作為增量字段,只將新增的數(shù)據(jù)抽入,字段名稱、類型不變,同時(shí)添加靜態(tài)分區(qū),分區(qū)字段為etl_date,類型為String,且值為當(dāng)前日期的前一天日期(分區(qū)字段格式為yyyyMMdd)。使用h

    2024年02月05日
    瀏覽(19)
  • 【Flink-Kafka-To-Hive】使用 Flink 實(shí)現(xiàn) Kafka 數(shù)據(jù)寫入 Hive

    需求描述: 1、數(shù)據(jù)從 Kafka 寫入 Hive。 2、相關(guān)配置存放于 Mysql 中,通過 Mysql 進(jìn)行動(dòng)態(tài)讀取。 3、此案例中的 Kafka 是進(jìn)行了 Kerberos 安全認(rèn)證的,如果不需要自行修改。 4、Flink 集成 Kafka 寫入 Hive 需要進(jìn)行 checkpoint 才能落盤至 HDFS。 5、先在 Hive 中創(chuàng)建表然后動(dòng)態(tài)獲取 Hive 的表

    2024年02月03日
    瀏覽(23)
  • 【Hive實(shí)戰(zhàn)】Hive 物化視圖

    始于Hive3.0.0 傳統(tǒng)上,用于加速數(shù)據(jù)倉(cāng)庫(kù)查詢處理的最強(qiáng)大的技術(shù)之一是預(yù)先計(jì)算相關(guān)的摘要或物化視圖。 在Apache Hive 3.0.0中,主要是在項(xiàng)目中引入物化視圖和基于這些物化的自動(dòng)查詢重寫。特別是,物化視圖可以原生存儲(chǔ)在Hive中,也可以使用自定義存儲(chǔ)處理程序存儲(chǔ)在其他

    2024年02月12日
    瀏覽(16)
  • 【Hive實(shí)戰(zhàn)】Hive的事務(wù)表

    在升級(jí)到Hive 3之前,需要把在事務(wù)表上Major Compaction。主要是為了合并掉增量文件。更準(zhǔn)確地說,自上次Major Compaction以來在其上執(zhí)行過任何更新/刪除/合并語(yǔ)句的任何分區(qū)都必須進(jìn)行另一次Major Compaction。在 Hive 升級(jí)到 Hive 3 之前,此分區(qū)上不會(huì)再發(fā)生更新/刪除/合并。 ACID 代表

    2024年02月15日
    瀏覽(18)
  • 【Hive實(shí)戰(zhàn)】Hive的壓縮池與鎖

    【Hive實(shí)戰(zhàn)】Hive的壓縮池與鎖

    Compaction pooling 可以將壓縮請(qǐng)求和工作線程分配到池中。 分配給特定池的工作線程將僅處理該池中的壓縮請(qǐng)求。 沒有分配池的工作線程和壓縮請(qǐng)求隱式屬于 默認(rèn)池 。 池概念允許對(duì)處理壓縮請(qǐng)求進(jìn)行微調(diào)。 例如,可以創(chuàng)建一個(gè)名稱為“ 高優(yōu)先級(jí)壓縮 ”的池,為其分配一些經(jīng)

    2024年02月15日
    瀏覽(13)
  • 【Hive實(shí)戰(zhàn)】Hive治理方向探討(請(qǐng)留意見)

    全篇數(shù)據(jù)已20230618分區(qū)的數(shù)據(jù)為準(zhǔn) 治理臨時(shí)性質(zhì)的表 臨時(shí)表未及時(shí)清理,或按時(shí)間維度建表(亦是分區(qū)表類) 臨時(shí)屬性卻以常規(guī)表標(biāo)記。 訴求: 能夠更好的識(shí)別出臨時(shí)表,對(duì)超期臨時(shí)表進(jìn)行刪除程序。 控制分區(qū)表的分區(qū)數(shù)量和分區(qū)層級(jí) 內(nèi)部表的分區(qū)表數(shù)量 99,709 內(nèi)部表的

    2024年02月10日
    瀏覽(20)
  • Hive初始化問題 Failed to get schema version.

    Hive初始化問題 Failed to get schema version.

    Hive初始化問題 Failed to get schema version. 引起Failed to get schema version.的原因有很多,我遇到的如下: 1.Public Key Retrieval is not allowed 原因分析: 如果用戶使用了 sha256_password 認(rèn)證,密碼在傳輸過程中必須使用 TLS 協(xié)議保護(hù),但是如果 RSA 公鑰不可用,可以使用服務(wù)器提供的公鑰;可

    2024年02月11日
    瀏覽(21)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包