国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

大數(shù)據(jù)Hadoop之——Apache Hudi 數(shù)據(jù)湖實(shí)戰(zhàn)操作(FlinkCDC)

這篇具有很好參考價(jià)值的文章主要介紹了大數(shù)據(jù)Hadoop之——Apache Hudi 數(shù)據(jù)湖實(shí)戰(zhàn)操作(FlinkCDC)。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

一、概述

Hudi(Hadoop Upserts Deletes and Incrementals),簡(jiǎn)稱Hudi,是一個(gè)流式數(shù)據(jù)湖平臺(tái),關(guān)于Hudi的更多介紹可以參考我以下幾篇文章:

  • 大數(shù)據(jù)Hadoop之——新一代流式數(shù)據(jù)湖平臺(tái) Apache Hudi
  • 大數(shù)據(jù)Hadoop之——Apache Hudi 數(shù)據(jù)湖實(shí)戰(zhàn)操作(Spark,F(xiàn)link與Hudi整合)

這里主要講解Hive、Trino、Starrocks與Hudi的整合操作,其實(shí)主要分為四大塊:

  • 數(shù)據(jù)處理:計(jì)算引擎,例如:flink、spark等。
  • 數(shù)據(jù)存儲(chǔ):HDFS、云存儲(chǔ)、AWS S3、對(duì)象存儲(chǔ)等。
  • 數(shù)據(jù)管理:Apache Hudi。
  • 數(shù)據(jù)查詢:查詢引擎,例如:Spark、Trino(Presto)、Hive、Starrocks(Doris)等。

大數(shù)據(jù)Hadoop之——Apache Hudi 數(shù)據(jù)湖實(shí)戰(zhàn)操作(FlinkCDC)

二、Hudi 數(shù)據(jù)管理

Hudi表的數(shù)據(jù)文件,可以使用操作系統(tǒng)的文件系統(tǒng)存儲(chǔ),也可以使用HDFS這種分布式的文件系統(tǒng)存儲(chǔ)。為了后續(xù)分析性能和數(shù)據(jù)的可靠性,一般使用HDFS進(jìn)行存儲(chǔ)。以HDFS存儲(chǔ)來(lái)看,一個(gè)Hudi表的存儲(chǔ)文件分為兩類。
大數(shù)據(jù)Hadoop之——Apache Hudi 數(shù)據(jù)湖實(shí)戰(zhàn)操作(FlinkCDC)

  • .hoodie 文件: 由于CRUD的零散性, 每一次的操作都會(huì)生成一個(gè)文件,這些小文件越來(lái)越多后,會(huì)嚴(yán)重影響HDFS的性能,Hudi設(shè)計(jì)了一套文件合并機(jī)制。.hoodie文件夾中存放了對(duì)應(yīng)的文件合并操作相關(guān)的日志文件。
  • americasasia相關(guān)的路徑是實(shí)際的數(shù)據(jù)文件,按分區(qū)存儲(chǔ),分區(qū)的路徑key是可以指定的。

1).hoodie文件

Hudi把隨著時(shí)間流逝,對(duì)表的一系列CRUD操作叫做Timeline, Timeline中某一次的操作,叫做Instant。

大數(shù)據(jù)Hadoop之——Apache Hudi 數(shù)據(jù)湖實(shí)戰(zhàn)操作(FlinkCDC)

  • Instant Action,記錄本次操作是一次數(shù)據(jù)提交(COMMITS),還是文件合并(COMPACTION),或者是文件清理(CLEANS);
  • Instant Time,本次操作發(fā)生的時(shí)間;
  • State,操作的狀態(tài),發(fā)起(REQUESTED),進(jìn)行中(INFLIGHT),還是已完成(COMPLETED);

2)數(shù)據(jù)文件

Hudi真實(shí)的數(shù)據(jù)文件使用Parquet文件格式存儲(chǔ)
大數(shù)據(jù)Hadoop之——Apache Hudi 數(shù)據(jù)湖實(shí)戰(zhàn)操作(FlinkCDC)

  • 其中包含一個(gè)metadata元數(shù)據(jù)文件和數(shù)據(jù)文件parquet列式存儲(chǔ)。
  • Hudi為了實(shí)現(xiàn)數(shù)據(jù)的CRUD,需要能夠唯一標(biāo)識(shí)一條記錄,Hudi將把數(shù)據(jù)集中的唯一字段(record key ) +數(shù)據(jù)所在分區(qū)(partitionPath)聯(lián)合起來(lái)當(dāng)做數(shù)據(jù)的唯一鍵。

三、數(shù)據(jù)存儲(chǔ)

hudi數(shù)據(jù)集的組織目錄結(jié)構(gòu)與hive非常相似,一份數(shù)據(jù)集對(duì)應(yīng)一個(gè)根目錄。數(shù)據(jù)集被打散為多個(gè)分區(qū),分區(qū)字段以文件夾形式存在,該文件夾包含該分區(qū)的所有文件。

大數(shù)據(jù)Hadoop之——Apache Hudi 數(shù)據(jù)湖實(shí)戰(zhàn)操作(FlinkCDC)
在根目錄下,每個(gè)分區(qū)都有唯一的分區(qū)路徑,每個(gè)分區(qū)數(shù)據(jù)存儲(chǔ)在多個(gè)文件中
大數(shù)據(jù)Hadoop之——Apache Hudi 數(shù)據(jù)湖實(shí)戰(zhàn)操作(FlinkCDC)
每個(gè)文件都有唯一的fileId和生成文件的commit所標(biāo)識(shí)。如果發(fā)生更新操作時(shí),多個(gè)文件共享相同的fileId,但會(huì)有不同的commit
大數(shù)據(jù)Hadoop之——Apache Hudi 數(shù)據(jù)湖實(shí)戰(zhàn)操作(FlinkCDC)
Metadata 元數(shù)據(jù)

以時(shí)間軸(timeline)的形式將數(shù)據(jù)集上的各項(xiàng)操作元數(shù)據(jù)維護(hù)起來(lái),以支持?jǐn)?shù)據(jù)集的瞬態(tài)視圖,這部分元數(shù)據(jù)存儲(chǔ)于根目錄下的元數(shù)據(jù)目錄。一共有三種類型的元數(shù)據(jù):

  • Commits:一個(gè)單獨(dú)的commit包含對(duì)數(shù)據(jù)集上一批數(shù)據(jù)的一次原子寫(xiě)入操作的相關(guān)信息。我們用單調(diào)遞增的時(shí)間戳來(lái)標(biāo)識(shí)commits,標(biāo)的是一次寫(xiě)入操作的開(kāi)始。
  • Cleans:用于清除數(shù)據(jù)集中不再被查詢所用到的舊版本文件的后臺(tái)活動(dòng)。
  • Compactions:用于協(xié)調(diào)hudi內(nèi)部的數(shù)據(jù)結(jié)構(gòu)差異的后臺(tái)活動(dòng)。例如,將更新操作由基于行存的日志文件歸集到列式數(shù)據(jù)上。

大數(shù)據(jù)Hadoop之——Apache Hudi 數(shù)據(jù)湖實(shí)戰(zhàn)操作(FlinkCDC)

四、Hive 與 Hudi 集成使用

關(guān)于Hive的介紹與部署,可以參考我這篇文章:大數(shù)據(jù)Hadoop之——數(shù)據(jù)倉(cāng)庫(kù)Hive

1)安裝mysql數(shù)據(jù)庫(kù)

這里選擇使用mysql on k8s,有不清楚的小伙伴,可以參考我這篇文章:【云原生】MySQL on k8s 環(huán)境部署
創(chuàng)建hive用戶

MYSQL_ROOT_PASSWORD=$(kubectl get secret --namespace mysql mysql -o jsonpath="{.data.mysql-root-password}" | base64 -d)

#登錄pod
kubectl exec -it mysql-primary-0 -n mysql -- bash
# 連接myslq
mysql -u root -p$MYSQL_ROOT_PASSWORD

CREATE USER 'hive'@'%' IDENTIFIED BY '123456';
GRANT ALL ON *.* to 'hive'@'%' WITH GRANT OPTION;
flush privileges;

2)安裝 Hive

1、下載
wget http://archive.apache.org/dist/hive/hive-3.1.3/apache-hive-3.1.3-bin.tar.gz
tar -xf apache-hive-3.1.3-bin.tar.gz
2、配置

hive-site.xml內(nèi)容如下:

<?xml version="1.0"?>  
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>  
<configuration>

<!-- 配置hdfs存儲(chǔ)目錄 -->
<property>  
  <name>hive.metastore.warehouse.dir</name>
  <value>/user/hudi_hive/warehouse</value>  
</property>  

<property>
  <name>hive.metastore.local</name>
  <value>false</value>
</property>

<property>  
  <name>hive.metastore.schema.verification</name>  
   <value>false</value>  
</property>
  
<property>  
  <name>hive.metastore.uris</name>  
  <value>thrift://hadoop-hadoop-hdfs-nn-0:9083</value>
</property>

<!-- 所連接的 MySQL 數(shù)據(jù)庫(kù)的地址,hudi_hive是數(shù)據(jù)庫(kù),程序會(huì)自動(dòng)創(chuàng)建,自定義就行 -->
<property>  
  <name>javax.jdo.option.ConnectionURL</name>
  <value>jdbc:mysql://192.168.182.110:30306/hive_metastore?createDatabaseIfNotExist=true&amp;useSSL=false&amp;serverTimezone=Asia/Shanghai</value>
</property>

<!-- MySQL 驅(qū)動(dòng) -->
<property>
  <name>javax.jdo.option.ConnectionDriverName</name>
  <value>com.mysql.cj.jdbc.Driver</value>
</property>

<!-- mysql連接用戶 -->
<property>
  <name>javax.jdo.option.ConnectionUserName</name>
  <value>hive</value>
</property>  

<!-- mysql連接密碼 -->
<property>
  <name>javax.jdo.option.ConnectionPassword</name>
  <value>123456</value>
</property>

<property>
    <name>datanucleus.schema.autoCreateAll</name>
    <value>true</value>
 </property>

<!--元數(shù)據(jù)是否校驗(yàn)-->
<property>
  <name>hive.metastore.schema.verification</name>
  <value>false</value>
</property>

<property>
  <name>system:user.name</name>
  <value>admin</value>
  <description>user name</description>
</property>

<!-- host -->
<property>
  <name>hive.server2.thrift.bind.host</name>
  <value>hadoop-hadoop-hdfs-nn-0</value>
  <description>Bind host on which to run the HiveServer2 Thrift service.</description>
</property>

<!-- hs2端口 默認(rèn)是1000,為了區(qū)別,我這里不使用默認(rèn)端口-->
<property>
  <name>hive.server2.thrift.port</name>
  <value>10000</value>
</property>

</configuration>

hive-env.sh #底部追加兩行

export HADOOP_HOME=/opt/apache/hadoop
export HIVE_CONF_DIR=/opt/apache/apache-hive-3.1.3-bin/conf
export HIV_AUX_JARS_PATH=/opt/apache/apache-hive-3.1.3-bin/lib
3、解決Hive與Hadoop之間guava版本的差異
$ cd /opt/bigdata/hadoop/server
$ ls -l apache-hive-3.1.2-bin/lib/guava-*.jar
$ ls -l hadoop-3.3.1/share/hadoop/common/lib/guava-*.jar
# 刪除hive中g(shù)uava低版本
$ rm -f apache-hive-3.1.2-bin/lib/guava-*.jar
# copy hadoop中的guava到hive
$ cp hadoop-3.3.1/share/hadoop/common/lib/guava-*.jar apache-hive-3.1.2-bin/lib/
$ ls -l apache-hive-3.1.2-bin/lib/guava-*.jar
4、下載對(duì)應(yīng)版本的mysql驅(qū)動(dòng)包

下載地址:https://downloads.mariadb.com/Connectors/java

cd $HIVE_HOME/lib
# 根據(jù)java8版本下載這個(gè)版本,這個(gè)版本已驗(yàn)證可行
wget https://downloads.mariadb.com/Connectors/java/connector-java-1.2.2/mariadb-java-client-1.2.2.jar

# /etc/profile追加以下內(nèi)容,source加載生效
export HIVE_HOME="/opt/apache/hive-3.1.2"
export PATH=$HIVE_HOME/bin:$PATH
5、初始化元數(shù)據(jù)
schematool -initSchema -dbType mysql --verbose
6、修改hadoop配置文件core-site.xml,表示設(shè)置可訪問(wèn)的用戶及用戶組

配置hadoop core-site.xml,再core-site.xml文件中追加如下內(nèi)容

<property>
  <name>hadoop.proxyuser.admin.hosts</name>
  <value>*</value>
</property>
<property>
  <name>hadoop.proxyuser.admin.groups</name>
  <value>*</value>
</property>
7、將hudi-hive的jar包放到hive lib目錄下
cp hudi-0.12.0/packaging/hudi-hadoop-mr-bundle/target/hudi-hadoop-mr-bundle-0.12.0.jar $HIVE_HOME/lib/
cp hudi-0.12.0/packaging/hudi-hive-sync-bundle/target/hudi-hive-sync-bundle-0.12.0.jar $HIVE_HOME/lib/
8、啟動(dòng)服務(wù)
# 啟動(dòng)元數(shù)據(jù)服務(wù),默認(rèn)端口9083
nohup hive --service metastore &

# 啟動(dòng)hiveserver2服務(wù),默認(rèn)端口10000
nohup hive --service hiveserver2 > /dev/null 2>&1 &

# 查看日志
tail -f /tmp/admin/hive.log

# 連接
beeline -u jdbc:hive2://localhost:10000  -n admin
9、測(cè)試驗(yàn)證
# 這里使用新命令beeline,跟hive命令差不多
$ hive
show databases;
create table users(id int,name string);
show tables;
insert into users values(1,'zhangsan');

beelive連接

beeline -u jdbc:hive2://hadoop-hadoop-hdfs-nn-0:10000  -n admin

大數(shù)據(jù)Hadoop之——Apache Hudi 數(shù)據(jù)湖實(shí)戰(zhàn)操作(FlinkCDC)

3)通過(guò)Hive sync tool 同步數(shù)據(jù)到Hive

應(yīng)用hudi不可避免地要?jiǎng)?chuàng)建對(duì)應(yīng)的hive表以方便查詢hudi數(shù)據(jù)。一般我們使用flink、spark寫(xiě)入數(shù)據(jù)時(shí),可以配置自動(dòng)建表、同步元數(shù)據(jù)。有時(shí)也會(huì)選擇使用hive sync tool工具離線進(jìn)行操作。

Hive sync tool的介紹

Hudi提供Hive sync tool用于同步hudi最新的元數(shù)據(jù)(包含自動(dòng)建表、增加字段、同步分區(qū)信息)到hive metastore。Hive sync tool提供三種同步模式,Jdbc,Hms,hivesql。推薦使用jdbc、hms。

官網(wǎng)文檔:https://hudi.apache.org/docs/syncing_metastore/

1、JDBC模式同步
cd /opt/apache/hudi-0.12.0/hudi-sync/hudi-hive-sync/
./run_sync_tool.sh \
--base-path hdfs://hadoop-hadoop-hdfs-nn:9000/tmp/hudi_trips_cow \
--database hudi_hive \
--table hudi_trips_cow \
--partitioned-by dt \
--jdbc-url 'jdbc:hive2://hadoop-hadoop-hdfs-nn-0:10000' \
--partition-value-extractor org.apache.hudi.hive.MultiPartKeysValueExtractor \
--user admin \
--pass admin \
--partitioned-by dt
2、HMS 模式同步

hive meta store同步,提供hive metastore的地址,如thrift://hms:9083,通過(guò)hive metastore的接口完成同步。使用時(shí)需要設(shè)置 --sync-mode=hms --use-jdbc=false。

./run_sync_tool.sh  \
--base-path hdfs://hadoop-hadoop-hdfs-nn:9000/tmp/hudi_trips_cow \
--database hudi_hive \
--table hudi_trips_cow \
--jdbc-url thrift://hadoop-hadoop-hdfs-nn:9083  \
--user admin --pass admin \
--partitioned-by dt \
--sync-mode hms

大數(shù)據(jù)Hadoop之——Apache Hudi 數(shù)據(jù)湖實(shí)戰(zhàn)操作(FlinkCDC)

Hive Sync時(shí)會(huì)判斷表不存在時(shí)建外表并添加分區(qū),表存在時(shí)對(duì)比表的schema是否存在差異,存在則替換,對(duì)比分區(qū)是否有新增,有則添加分區(qū)。

因此使用hive sync時(shí)有以下約束:

  • 寫(xiě)入數(shù)據(jù)Schema只允許增加字段,不允許修改、刪除字段。
  • 分區(qū)目錄只能新增,不會(huì)刪除。
  • Overwrite覆寫(xiě)Hudi表不支持同步覆蓋Hive表。
  • Hudi同步Hive表時(shí),不支持使用timestamp類型作為分區(qū)列。

五、基于 Flink CDC 同步 MySQL 分庫(kù)分表構(gòu)建實(shí)時(shí)數(shù)據(jù)湖

1)Flink CDC 是什么?

2020年 Flink cdc 首次在 Flink forward 大會(huì)上官宣, 由 Alibaba的 Jark Wu & Qingsheng Ren 兩位大佬介紹的,官方網(wǎng)址。

Flink CDC 文檔:https://ververica.github.io/flink-cdc-connectors/master/content/about.html
GitHub地址:https://github.com/ververica/flink-cdc-connectors

Flink CDC(Change Data Capture:變更數(shù)據(jù)捕獲) connector 可以捕獲在一個(gè)或多個(gè)表中發(fā)生的所有變更。該模式通常有一個(gè)前記錄和一個(gè)后記錄。Flink CDC connector 可以直接在Flink中以非約束模式(流)使用,而不需要使用類似 kafka 之類的中間件中轉(zhuǎn)數(shù)據(jù)。

大數(shù)據(jù)Hadoop之——Apache Hudi 數(shù)據(jù)湖實(shí)戰(zhàn)操作(FlinkCDC)

大數(shù)據(jù)Hadoop之——Apache Hudi 數(shù)據(jù)湖實(shí)戰(zhàn)操作(FlinkCDC)

2)基于 Flink CDC 同步 MySQL 分庫(kù)分表構(gòu)建實(shí)時(shí)數(shù)據(jù)湖

官方文檔:基于 Flink CDC 同步 MySQL 分庫(kù)分表構(gòu)建實(shí)時(shí)數(shù)據(jù)湖

大數(shù)據(jù)Hadoop之——Apache Hudi 數(shù)據(jù)湖實(shí)戰(zhàn)操作(FlinkCDC)

你也可以使用不同的 source 比如 Oracle/Postgres 和 sink 比如 Hudi 來(lái)構(gòu)建自己的 ETL 流程。

對(duì)上圖進(jìn)行簡(jiǎn)化:
大數(shù)據(jù)Hadoop之——Apache Hudi 數(shù)據(jù)湖實(shí)戰(zhàn)操作(FlinkCDC)

1、添加flink mysql jar包

flink-sql-connector-mysql-cdc jar包下載地址:https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/
大數(shù)據(jù)Hadoop之——Apache Hudi 數(shù)據(jù)湖實(shí)戰(zhàn)操作(FlinkCDC)

cd $FLINK_HOME/lib
wget https://repo1.maven.org/maven2/com/ververica/flink-sql-connector-mysql-cdc/2.2.1/flink-sql-connector-mysql-cdc-2.2.1.jar
2、創(chuàng)建數(shù)據(jù)庫(kù)表,并且配置binlog 文件
[mysqld]
#開(kāi)啟 Binlog,一般放在/var/lib/mysql;比如上面的設(shè)置重啟數(shù)據(jù)庫(kù)會(huì)生成mysql-bin.000001文件,文件名跟log_bin 值對(duì)應(yīng),當(dāng)然也可以指定存儲(chǔ)路徑。
log_bin = mysql-bin
#刪除超出這個(gè)變量保留期之前的全部日志被刪除
expire_logs_days = 7
# 定一個(gè)集群內(nèi)的 MySQL 服務(wù)器 ID,如果做數(shù)據(jù)庫(kù)集群那么必須全局唯一。
server_id = 1024
# mysql復(fù)制主要有三種方式:基于SQL語(yǔ)句的復(fù)制(statement-based replication, SBR),基于行的復(fù)制(row-based replication, RBR),混合模式復(fù)制(mixed-based replication, MBR)。對(duì)應(yīng)的,binlog的格式也有三種:STATEMENT,ROW,MIXED。
binlog_format = ROW

重啟mysql

# 重啟數(shù)據(jù)庫(kù)
systemctl restart mariadb
3、 創(chuàng)建mysql 庫(kù)表
mysql -uhive -h192.168.182.110 -P30306 -p
密碼:123456

CREATE DATABASE hudi_hive;

USE hudi_hive;

CREATE TABLE `Flink_cdc` (
  `id` BIGINT(64) AUTO_INCREMENT PRIMARY KEY,
  `name` VARCHAR(64)  NULL,
  `age` INT(20) NULL,
  `birthday` TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL,
   `ts` TIMESTAMP DEFAULT CURRENT_TIMESTAMP NOT NULL
) ;
INSERT INTO `hudi_hive`.`Flink_cdc`(NAME,age) VALUES("flink",18) ;

大數(shù)據(jù)Hadoop之——Apache Hudi 數(shù)據(jù)湖實(shí)戰(zhàn)操作(FlinkCDC)

4、在 Flink SQL CLI 中使用 Flink DDL 創(chuàng)建表
# 添加環(huán)境變量
export HADOOP_HOME=/opt/apache/hadoop
export HADOOP_DIR_CONF=${HADOOP_HOME}/etc/hadoop
export HADOOP_CLASSPATH=`hadoop classpath`

# 啟動(dòng)單點(diǎn)flink
cd $FLINK_HOME
./bin/start-cluster.sh

# 測(cè)試可用性
# ./bin/flink run  examples/batch/WordCount.jar

# 登錄flink-sql CLI
./bin/sql-client.sh embedded -j ../hudi-0.12.0/packaging/hudi-flink-bundle/target/hudi-flink1.14-bundle-0.12.0.jar shell

# 設(shè)置表輸出格式
SET 'sql-client.execution.result-mode' = 'tableau';

CREATE TABLE source_mysql (
   id BIGINT PRIMARY KEY NOT ENFORCED,
   name STRING,
   age INT,
   birthday TIMESTAMP(3),
   ts TIMESTAMP(3)
 ) WITH (
 'connector' = 'mysql-cdc',
 'hostname' = '192.168.182.110',
 'port' = '30306',
 'username' = 'hive',
 'password' = '123456',
 'server-time-zone' = 'Asia/Shanghai',
 'debezium.snapshot.mode' = 'initial',
 'database-name' = 'hudi_hive',
 'table-name' = 'Flink_cdc'
 );

# 創(chuàng)建flinksql 中的 flinkcdc 視圖
create view view_source_flinkcdc_mysql AS SELECT *, DATE_FORMAT(birthday, 'yyyyMMdd') as part FROM source_mysql;

# 查mysql數(shù)據(jù)
SELECT id, name,age,birthday, ts, part FROM view_source_flinkcdc_mysql ;

大數(shù)據(jù)Hadoop之——Apache Hudi 數(shù)據(jù)湖實(shí)戰(zhàn)操作(FlinkCDC)

5、創(chuàng)建輸出表,關(guān)聯(lián)Hudi表,并且自動(dòng)同步到Hive表

使用下面的 Flink SQL 語(yǔ)句將數(shù)據(jù)從 MySQL 寫(xiě)入 hudi 中,并同步到hive

CREATE TABLE flink_cdc_sink_hudi_hive(
id bigint ,
name string,
age int,
birthday TIMESTAMP(3),
ts TIMESTAMP(3),
part VARCHAR(20),
primary key(id) not enforced
)
PARTITIONED BY (part)
with(
'connector'='hudi',
'path'= 'hdfs://hadoop-hadoop-hdfs-nn-0:9000/flink_cdc_sink_hudi_hive', 
'table.type'= 'MERGE_ON_READ',
'hoodie.datasource.write.recordkey.field'= 'id', 
'write.precombine.field'= 'ts',
'write.tasks'= '1',
'write.rate.limit'= '2000', 
'compaction.tasks'= '1', 
'compaction.async.enabled'= 'true',
'compaction.trigger.strategy'= 'num_commits',
'compaction.delta_commits'= '1',
'changelog.enabled'= 'true',
'read.streaming.enabled'= 'true',
'read.streaming.check-interval'= '3',
'hive_sync.enable'= 'true',
'hive_sync.mode'= 'hms',
'hive_sync.metastore.uris'= 'thrift://hadoop-hadoop-hdfs-nn-0:9083',
'hive_sync.jdbc_url'= 'jdbc:hive2://hadoop-hadoop-hdfs-nn-0:10000',
'hive_sync.table'= 'flink_cdc_sink_hudi_hive',
'hive_sync.db'= 'db_hive',
'hive_sync.username'= 'admin',
'hive_sync.password'= '123456',
'hive_sync.support_timestamp'= 'true'
);
6、查詢視圖數(shù)據(jù),添加數(shù)據(jù)到輸出表
# 將mysql數(shù)據(jù)同步到hudi和hive
INSERT INTO flink_cdc_sink_hudi_hive SELECT id, name,age,birthday, ts, part FROM view_source_flinkcdc_mysql ;
7、查看hive數(shù)據(jù)
beeline -u jdbc:hive2://localhost:10000  -n admin
show tables from db_hive;

hive 會(huì)有兩張表:flink_cdc_sink_hudi_hive_ro類型是讀優(yōu)化查詢 , flink_cdc_sink_hudi_hive_rt 類型快照查詢。

關(guān)于FlinkCDC,hive,mysql與hudi的整合就先到這里了,有任何疑問(wèn)的小伙伴歡迎給我留言,后續(xù)會(huì)持續(xù)更新【大數(shù)據(jù)+云原生】相關(guān)的文檔,請(qǐng)小伙伴耐心等待~文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-419222.html

到了這里,關(guān)于大數(shù)據(jù)Hadoop之——Apache Hudi 數(shù)據(jù)湖實(shí)戰(zhàn)操作(FlinkCDC)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • Apache Hudi初探(五)(與flink的結(jié)合)--Flink 中hudi clean操作

    本文主要是具體說(shuō)說(shuō)Flink中的clean操作的實(shí)現(xiàn) 在flink中主要是 CleanFunction 函數(shù): open函數(shù) writeClient =FlinkWriteClients.createWriteClient(conf, getRuntimeContext()) 創(chuàng)建FlinkWriteClient,用于寫(xiě)hudi數(shù)據(jù) this.executor = NonThrownExecutor.builder(LOG).waitForTasksFinish(true).build(); 創(chuàng)建一個(gè)只有一個(gè)線程的線程池,改

    2024年02月06日
    瀏覽(31)
  • 性能提升30%!袋鼠云數(shù)?;?Apache Hudi 的性能優(yōu)化實(shí)戰(zhàn)解析

    性能提升30%!袋鼠云數(shù)棧基于 Apache Hudi 的性能優(yōu)化實(shí)戰(zhàn)解析

    Apache Hudi 是一款開(kāi)源的數(shù)據(jù)湖解決方案,它能夠幫助企業(yè)更好地管理和分析海量數(shù)據(jù),支持高效的數(shù)據(jù)更新和查詢。并提供多種數(shù)據(jù)壓縮和存儲(chǔ)格式以及索引功能,從而為企業(yè)數(shù)據(jù)倉(cāng)庫(kù)實(shí)踐提供更加靈活和高效的數(shù)據(jù)處理方式。 在金融領(lǐng)域,企業(yè)可以使用 Hudi 來(lái)處理大量需要

    2024年02月09日
    瀏覽(22)
  • Apache Hudi 在袋鼠云數(shù)據(jù)湖平臺(tái)的設(shè)計(jì)與實(shí)踐

    Apache Hudi 在袋鼠云數(shù)據(jù)湖平臺(tái)的設(shè)計(jì)與實(shí)踐

    在大數(shù)據(jù)處理中,實(shí)時(shí)數(shù)據(jù)分析是一個(gè)重要的需求。隨著數(shù)據(jù)量的不斷增長(zhǎng),對(duì)于實(shí)時(shí)分析的挑戰(zhàn)也在不斷加大,傳統(tǒng)的批處理方式已經(jīng)不能滿足實(shí)時(shí)數(shù)據(jù)處理的需求,需要一種更加高效的技術(shù)來(lái)解決這個(gè)問(wèn)題。Apache Hudi(Hadoop Upserts Deletes and Incremental Processing)就是這樣一種

    2024年02月06日
    瀏覽(20)
  • Apache Hudi DeltaStreamer 接入CDC數(shù)據(jù)時(shí)如何完成 Kafka 的身份認(rèn)證?

    題目有些拗口,簡(jiǎn)短截說(shuō),我們對(duì)于Apache Hudi DeltaStreamer在接入CDC數(shù)據(jù)時(shí),對(duì)于其如何通過(guò) Kafka 的身份認(rèn)證,做了一系列測(cè)試和研究,有如下明確結(jié)論: .?.?.?.?.?.?.?.?.?.?.?.?.?.?.?.?.?.?.?.?.?.?.?.?.?.?.?.?.?.?.?.?.?.?.?.?.?.?.?.?

    2024年02月16日
    瀏覽(24)
  • 探索在Apache SeaTunnel上使用Hudi連接器,高效管理大數(shù)據(jù)的技術(shù)

    探索在Apache SeaTunnel上使用Hudi連接器,高效管理大數(shù)據(jù)的技術(shù)

    Apache Hudi是一個(gè)數(shù)據(jù)湖處理框架,通過(guò)提供簡(jiǎn)單的方式來(lái)進(jìn)行數(shù)據(jù)的插入、更新和刪除操作,Hudi能夠幫助數(shù)據(jù)工程師和科學(xué)家更高效地處理大數(shù)據(jù),并支持實(shí)時(shí)查詢。 Spark Flink SeaTunnel Zeta 批處理 流處理 精確一次性 列投影 并行處理 支持用戶自定義切分 Hudi Source 連接器專為從

    2024年04月28日
    瀏覽(24)
  • Apache hudi 核心功能點(diǎn)分析

    Apache hudi 核心功能點(diǎn)分析

    文中部分代碼對(duì)應(yīng) 0.14.0 版本 初始的需求是Uber公司會(huì)有很多記錄級(jí)別的更新場(chǎng)景,Hudi 在Uber 內(nèi)部主要的一個(gè)場(chǎng)景,就是乘客打車下單和司機(jī)接單的匹配,乘客和司機(jī)分別是兩條數(shù)據(jù)流,通過(guò) Hudi 的 Upsert 能力和增量讀取功能,可以分鐘級(jí)地將這兩條數(shù)據(jù)流進(jìn)行拼接,得到乘客

    2024年02月02日
    瀏覽(23)
  • Apache Hudi Timeline Server介紹

    Hudi 有一個(gè)中央時(shí)間線服務(wù)器,在驅(qū)動(dòng)程序節(jié)點(diǎn)中運(yùn)行并作為 Rest 服務(wù)。它有多種好處,第一個(gè)用例是提供 FileSystemView api。 Hudi 的核心是維護(hù)一個(gè) TableFileSystemView,它暴露 API 來(lái)獲取給定數(shù)據(jù)集的文件狀態(tài),驅(qū)動(dòng)程序和執(zhí)行程序?qū)⒃趯?xiě)入和表服務(wù)生命周期的不同時(shí)間點(diǎn)查詢?cè)摖?/p>

    2024年02月12日
    瀏覽(25)
  • 提升 Apache Hudi Upsert 性能的三個(gè)建議

    Apache Hudi 社區(qū)一直在快速發(fā)展,各公司正在尋找方法來(lái)利用其強(qiáng)大的功能來(lái)有效地?cái)z取和管理大規(guī)模數(shù)據(jù)集。 每周社區(qū)都會(huì)收到一些常見(jiàn)問(wèn)題,最常見(jiàn)的問(wèn)題與 Hudi 如何執(zhí)行更新插入有關(guān),以確保以低延遲訪問(wèn)最新數(shù)據(jù)。 快速更新插入的主要考慮因素之一是選擇正確的存儲(chǔ)

    2024年02月05日
    瀏覽(33)
  • Apache Hudi初探(一)(與flink的結(jié)合)

    和 Spark 的使用方式不同, flink 結(jié)合 hudi 的方式,是以 SPI 的方式,所以不需要像使用 Spark 的方式一樣, Spark 的方式如下: (這里不包括 org.apache.spark.sql.sources.DataSourceRegister ) Flink 結(jié)合 Hudi 的方式,只需要引入了對(duì)應(yīng)的jar包即可,以 SPI 的方式: 其中 HoodieTableFactory 是讀寫(xiě) H

    2024年02月16日
    瀏覽(21)
  • Apache Hudi 1.x 版本重磅功能展望與討論

    Apache Hudi 社區(qū)正在對(duì)Apache Hudi 1.x版本功能進(jìn)行討論,歡迎感興趣同學(xué)參與討論,PR鏈接:https://github.com/apache/hudi/pull/8679/files 此 RFC 提議對(duì) Hudi 中的事務(wù)數(shù)據(jù)庫(kù)層進(jìn)行令人興奮和強(qiáng)大的重構(gòu),以推動(dòng)未來(lái)幾年整個(gè)社區(qū)的持續(xù)創(chuàng)新。 在過(guò)去的幾年里,社區(qū)成長(zhǎng)(https://git-contributo

    2024年02月07日
    瀏覽(19)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包