數(shù)據(jù)湖Iceberg-簡介(1)
數(shù)據(jù)湖Iceberg-存儲(chǔ)結(jié)構(gòu)(2)
數(shù)據(jù)湖Iceberg-Hive集成Iceberg(3)
數(shù)據(jù)湖Iceberg-SparkSQL集成(4)
數(shù)據(jù)湖Iceberg-FlinkSQL集成(5)
數(shù)據(jù)湖Iceberg-FlinkSQL-kafka類型表數(shù)據(jù)無法成功寫入(6)
數(shù)據(jù)湖Iceberg-Flink DataFrame集成(7)
數(shù)據(jù)湖Iceberg-FlinkSQL集成
環(huán)境準(zhǔn)備
Flink與Iceberg的版本對(duì)應(yīng)關(guān)系如下
Flink 版本 | Iceberg 版本 |
---|---|
1.11 | 0.9.0 – 0.12.1 |
1.12 | 0.12.0 – 0.13.1 |
1.13 | 0.13.0 – 1.0.0 |
1.14 | 0.13.0 – 1.1.0 |
1.15 | 0.14.0 – 1.1.0 |
1.16 | 1.1.0 – 1.1.0 |
jar包下載地址
https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-flink-runtime-1.14/
在里面選擇自己的版本即可,這里我使用的是flink 1.14.3 iceberg1.1.0版本
具體下載地址:https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-flink-runtime-1.14/1.1.0/iceberg-flink-runtime-1.14-1.1.0.jar
jar包上傳到Flink lib目錄下
[root@ lib]# pwd
/opt/flink/lib
[root@ lib]# ll
total 252612
-rw-r--r-- 1 flink flink 85584 Jan 11 2022 flink-csv-1.14.3.jar
-rw-r--r-- 1 flink flink 136054094 Jan 11 2022 flink-dist_2.12-1.14.3.jar
-rw-r--r-- 1 flink flink 153145 Jan 11 2022 flink-json-1.14.3.jar
-rw-rw-r-- 1 flink flink 43317025 Jan 13 13:54 flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
-rw-r--r-- 1 flink flink 7709731 Aug 22 2021 flink-shaded-zookeeper-3.4.14.jar
-rw-r--r-- 1 flink flink 39633410 Jan 11 2022 flink-table_2.12-1.14.3.jar
-rw-r--r-- 1 flink flink 29256108 Apr 21 09:21 iceberg-flink-runtime-1.14-1.1.0.jar
-rw-r--r-- 1 flink flink 112758 May 3 2013 javax.ws.rs-api-2.0.jar
-rw-r--r-- 1 flink flink 208006 Jan 9 2022 log4j-1.2-api-2.17.1.jar
-rw-r--r-- 1 flink flink 301872 Jan 9 2022 log4j-api-2.17.1.jar
-rw-r--r-- 1 flink flink 1790452 Jan 9 2022 log4j-core-2.17.1.jar
-rw-r--r-- 1 flink flink 24279 Jan 9 2022 log4j-slf4j-impl-2.17.1.jar
修改flink-conf.yaml配置
修改或添加以下配置
# 禁用 ClassLoader 檢查
classloader.check-leaked-classloader: false
# 每個(gè) TaskManager 上任務(wù)槽數(shù)量,這里為 4
taskmanager.numberOfTaskSlots: 4
# 狀態(tài)后端使用 RocksDB
state.backend: rocksdb
# 每隔 30000 毫秒進(jìn)行一次檢查點(diǎn)
execution.checkpointing.interval: 30000
# 指定檢查點(diǎn)保存的目錄,這里為 HDFS 上的目錄
state.checkpoints.dir: hdfs://hadoop1:8020/ckps
# 啟用增量式檢查點(diǎn),這將在一定程度上提高性能
state.backend.incremental: true
啟動(dòng)flink-sql
注意:
剛剛Flink修改完需要重啟Flink
輸入 ./sql-client.sh embedded
或者./sql-client
sql-client.sh embedded
是啟動(dòng) Flink SQL Client 時(shí)指定的一種模式,即嵌入式模式。在嵌入式模式下,F(xiàn)link SQL Client 會(huì)自動(dòng)啟動(dòng)一個(gè) Flink 集群,無需手動(dòng)啟動(dòng),直接在命令行中交互式地輸入 SQL 命令進(jìn)行查詢和操作。同時(shí),F(xiàn)link SQL Client 會(huì)將所有的數(shù)據(jù)和表定義存儲(chǔ)在本地內(nèi)存中,這意味著不支持持久化數(shù)據(jù)和高可用性。
相反,如果您使用的是獨(dú)立模式,F(xiàn)link SQL Client 會(huì)連接到一個(gè)已經(jīng)運(yùn)行的 Flink 集群。在這種模式下,需要首先手動(dòng)啟動(dòng) Flink 集群,并將 Flink SQL Client 配置為連接到該集群。獨(dú)立模式相對(duì)嵌入式模式更加靈活和可擴(kuò)展,但啟動(dòng)和配置過程可能需要更多的時(shí)間和精力。
總之,嵌入式模式適用于快速原型設(shè)計(jì)和小規(guī)模數(shù)據(jù)探索,而獨(dú)立模式適用于生產(chǎn)環(huán)境和大規(guī)模數(shù)據(jù)處理。
創(chuàng)建和使用Catalog
創(chuàng)建語法說明
CREATE CATALOG <catalog_name> WITH (
'type'='iceberg',
`<config_key>`=`<config_value>`
);
- type: 必須是iceberg。(必須)
- catalog-type: 內(nèi)置了hive和hadoop兩種catalog,也可以使用catalog-impl來自定義catalog。(可選)
- catalog-impl: 自定義catalog實(shí)現(xiàn)的全限定類名。如果未設(shè)置catalog-type,則必須設(shè)置。(可選)
- property-version: 描述屬性版本的版本號(hào)。此屬性可用于向后兼容,以防屬性格式更改。當(dāng)前屬性版本為1。(可選)
- cache-enabled: 是否啟用目錄緩存,默認(rèn)值為true。(可選)
- cache.expiration-interval-ms: 本地緩存catalog條目的時(shí)間(以毫秒為單位);負(fù)值,如-1表示沒有時(shí)間限制,不允許設(shè)為0。默認(rèn)值為-1。(可選)
Hive Catalog
- 上傳hive connector到flink的lib中
下載地址:https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-hive-3.1.2_2.12/1.14.3/flink-sql-connector-hive-3.1.2_2.12-1.14.3.jar
- 重啟Flink集群,進(jìn)入sql-client
如果不上傳jar包重啟服務(wù),后續(xù)可能會(huì)遇到這種錯(cuò)誤
[ERROR] Could not execute SQL statement. Reason: org.apache.hadoop.hive.metastore.api.MetaException: java.lang.reflect.UndeclaredThrowableException
- 創(chuàng)建Hive Catalog
CREATE CATALOG hive_catalog WITH (
'type'='iceberg',
'catalog-type'='hive',
'uri'='thrift://bigdata-24-195:9083',
'clients'='5',
'property-version'='1',
'warehouse'='hdfs://bigdata-24-194:8020/iceberg/iceberg-hive'
);
注意:這里指定的warehouse目錄hive用戶需要有權(quán)限訪問,否則后續(xù)創(chuàng)建庫或者表會(huì)失敗
Flink SQL> CREATE DATABASE iceberg_db; [ERROR] Could not execute SQL statement. Reason: org.apache.hadoop.hive.metastore.api.MetaException: java.lang.reflect.UndeclaredThrowableException
詳細(xì)異常日志可以在hive日志中查看
-
uri: Hive metastore的thrift uri。(必選)
-
clients:Hive metastore客戶端池大小,默認(rèn)為2。(可選)
-
warehouse: 數(shù)倉目錄。
-
hive-conf-dir:包含hive-site.xml配置文件的目錄路徑,hive-site.xml中hive.metastore.warehouse.dir 的值會(huì)被warehouse覆蓋。
-
hadoop-conf-dir:包含core-site.xml和hdfs-site.xml配置文件的目錄路徑。
-
查看catalogs
Flink SQL> show CATALOGS ;
+-----------------+
| catalog name |
+-----------------+
| default_catalog |
| hive_catalog |
+-----------------+
2 rows in set
- 進(jìn)入catalogs
use catalog hive_catalog;
Hadoop Catalog
Iceberg還支持HDFS中基于目錄的catalog,可以使用’catalog-type’='hadoop’配置。
- 創(chuàng)建catalog
CREATE CATALOG hadoop_catalog WITH (
'type'='iceberg',
'catalog-type'='hadoop',
'warehouse'='hdfs://bigdata-24-194:8020/iceberg/iceberg-hadoop',
'property-version'='1'
);
use catalog hadoop_catalog;
create database iceberg_db;
在目錄中可以看到我們創(chuàng)建的庫和catalog
配置sql-client初始化文件
配置初始化文件后,后續(xù)啟動(dòng),不用每次重新啟動(dòng)都創(chuàng)建catalog
在$FLINK_HOME/conf
目錄下創(chuàng)建sql-client-init.sql
文件
CREATE CATALOG hadoop_catalog WITH (
'type'='iceberg',
'catalog-type'='hadoop',
'warehouse'='hdfs://bigdata-24-194:8020/iceberg/iceberg-hadoop',
'property-version'='1'
);
CREATE CATALOG hive_catalog WITH (
'type'='iceberg',
'catalog-type'='hive',
'uri'='thrift://bigdata-24-195:9083',
'clients'='5',
'property-version'='1',
'warehouse'='hdfs://bigdata-24-194:8020/iceberg/iceberg-hive'
);
USE CATALOG hive_catalog;
后續(xù)啟動(dòng)sql-client時(shí),加上 -i sql文件路徑 即可完成catalog的初始化,并進(jìn)入hive_catalog
bin/sql-client.sh embedded -i conf/sql-client-init.sql
DDL語句
創(chuàng)建數(shù)據(jù)庫
CREATE DATABASE iceberg_db;
USE iceberg_db;
創(chuàng)建表
CREATE TABLE `hive_catalog`.`iceberg_db`.`sample` (
id BIGINT COMMENT 'unique id',
data STRING
);
建表命令現(xiàn)在支持最常用的flink建表語法,包括:
- PARTITION BY (column1, column2, …):配置分區(qū),apache flink還不支持隱藏分區(qū)。
- COMMENT ‘table document’:指定表的備注
- WITH (‘key’=‘value’, …):設(shè)置表屬性
目前,不支持計(jì)算列、watermark(支持主鍵)。
創(chuàng)建分區(qū)表
CREATE TABLE `hive_catalog`.`iceberg_db`.`sample1` (
id BIGINT COMMENT 'unique id',
data STRING
) PARTITIONED BY (data);
Apache Iceberg支持隱藏分區(qū),但Apache flink不支持在列上通過函數(shù)進(jìn)行分區(qū),現(xiàn)在無法在flink DDL中支持隱藏分區(qū)。
使用LIKE語法建表
LIKE語法用于創(chuàng)建一個(gè)與另一個(gè)表具有相同schema、分區(qū)和屬性的表。
CREATE TABLE `hive_catalog`.`iceberg_db`.`sample2` (
id BIGINT COMMENT 'unique id',
data STRING
);
CREATE TABLE `hive_catalog`.`iceberg_db`.`sample_like` LIKE `hive_catalog`.`iceberg_db`.`sample2`;
查看表結(jié)構(gòu)
默認(rèn)在FlinkSQL中無法查看到表結(jié)構(gòu)
Flink SQL> desc formatted sample_like;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.sql.parser.impl.ParseException: Encountered "sample_like" at line 1, column 16.
Was expecting one of:
<EOF>
"." ...
我們可以在hive中查看iceberg的表結(jié)構(gòu)
0: jdbc:hive2://bigdata-24-194:2181,bigdata-2> desc formatted sample_like;
INFO : Compiling command(queryId=hive_20230421141659_e372d739-42e8-4cac-97a2-64c10d12c5da): desc formatted sample_like
INFO : Semantic Analysis Completed (retrial = false)
INFO : Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:col_name, type:string, comment:from deserializer), FieldSchema(name:data_type, type:string, comment:from deserializer), FieldSchema(name:comment, type:string, comment:from deserializer)], properties:null)
INFO : Completed compiling command(queryId=hive_20230421141659_e372d739-42e8-4cac-97a2-64c10d12c5da); Time taken: 0.077 seconds
INFO : Executing command(queryId=hive_20230421141659_e372d739-42e8-4cac-97a2-64c10d12c5da): desc formatted sample_like
INFO : Starting task [Stage-0:DDL] in serial mode
INFO : Completed executing command(queryId=hive_20230421141659_e372d739-42e8-4cac-97a2-64c10d12c5da); Time taken: 0.108 seconds
INFO : OK
+-------------------------------+----------------------------------------------------+----------------------------------------------------+
| col_name | data_type | comment |
+-------------------------------+----------------------------------------------------+----------------------------------------------------+
| # col_name | data_type | comment |
| id | bigint | |
| data | string | |
| | NULL | NULL |
| # Detailed Table Information | NULL | NULL |
| Database: | iceberg_db | NULL |
| OwnerType: | USER | NULL |
| Owner: | hdfs | NULL |
| CreateTime: | Fri Apr 22 14:16:17 CST 2023 | NULL |
| LastAccessTime: | Sun Dec 14 04:03:18 CST 1969 | NULL |
| Retention: | 2147483647 | NULL |
| Location: | hdfs://bigdata-24-194:8020/iceberg/iceberg-hive/iceberg_db.db/sample_like | NULL |
| Table Type: | EXTERNAL_TABLE | NULL |
| Table Parameters: | NULL | NULL |
| | EXTERNAL | TRUE |
| | current-schema | {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"id\",\"required\":false,\"type\":\"long\"},{\"id\":2,\"name\":\"data\",\"required\":false,\"type\":\"string\"}]} |
| | metadata_location | hdfs://bigdata-24-194:8020/iceberg/iceberg-hive/iceberg_db.db/sample_like/metadata/00000-0750a26c-8b26-4417-9f27-7786a5775026.metadata.json |
| | numFiles | 1 |
| | snapshot-count | 0 |
| | table_type | ICEBERG |
| | totalSize | 1225 |
| | transient_lastDdlTime | 1682057777 |
| | uuid | 4e0be931-e962-4a94-a176-3969012647c1 |
| | NULL | NULL |
| # Storage Information | NULL | NULL |
| SerDe Library: | org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe | NULL |
| InputFormat: | org.apache.hadoop.mapred.FileInputFormat | NULL |
| OutputFormat: | org.apache.hadoop.mapred.FileOutputFormat | NULL |
| Compressed: | No | NULL |
| Num Buckets: | 0 | NULL |
| Bucket Columns: | [] | NULL |
| Sort Columns: | [] | NULL |
+-------------------------------+----------------------------------------------------+----------------------------------------------------+
32 rows selected (0.289 seconds)
修改表
Flink SQL 目前只支持修改表屬性和表名,其他的暫不支持,需要使用API才可以修改
修改表屬性
ALTER TABLE `hive_catalog`.`iceberg_db`.`sample` SET ('write.format.default'='avro');
修改表名
ALTER TABLE `hive_catalog`.`iceberg_db`.`sample` RENAME TO `hive_catalog`.`iceberg_db`.`new_sample`;
刪除表
DROP TABLE `hive_catalog`.`iceberg_db`.`sample`;
不會(huì)刪除具體數(shù)據(jù),HDFS中文件還繼續(xù)存在
查詢數(shù)據(jù)
Flink On Yarn的問題
這里遇到個(gè)問題,先在這直接說了,剛剛只是操作元數(shù)據(jù)所以沒遇到這個(gè)問題,在插入和查詢都會(huì)遇到這個(gè)問題
我使用的是Ambari集成的Flink,運(yùn)行模式為Flink on Yarn
,直接用查詢、插入語句去操作q數(shù)據(jù)會(huì)提示連接拒絕。
這里幾個(gè)問題點(diǎn)需要注意:
- 1.當(dāng)我們使用
Flink on Yarn
模式提交時(shí)需要指定-s yarn-session
去運(yùn)行,如下
bin/sql-client.sh -s yarn-session -i conf/sql-client-init.sql
指定
-s yarn-session
后,F(xiàn)link會(huì)在當(dāng)前服務(wù)器/tmp/.yarn-properties-flink
文件找到運(yùn)行的yarn-session任務(wù),去提交。內(nèi)容如下:
# cat /tmp/.yarn-properties-flink #Generated YARN properties file #Fri Apr 21 11:11:44 CST 2023 dynamicPropertiesString= applicationID=application_1675237371712_0532
- 2.Ambari 默認(rèn)Flink on Yarn 提交是使用的Flink用戶,我們提交任務(wù)是使用HDFS用戶,還是會(huì)導(dǎo)致提交失敗
解決方法:kill之前使用flink用戶提交的任務(wù),使用HDFS啟動(dòng)Flink on Yarn任務(wù)
這里偷懶了,直接手動(dòng)kill,手動(dòng)啟動(dòng)了,有時(shí)間可以改下Ambari啟動(dòng)Flink方法,這樣才一勞永逸
Flink on yarn 啟動(dòng)命令如下
export HADOOP_CONF_DIR=/etc/hadoop/conf; export HADOOP_CLASSPATH=/usr/hdp/3.1.5.0-152/hadoop/conf:/usr/hdp/3.1.5.0-152/hadoop/lib/*:/usr/hdp/3.1.5.0-152/hadoop/.//*:/usr/hdp/3.1.5.0-152/hadoop-hdfs/./:/usr/hdp/3.1.5.0-152/hadoop-hdfs/lib/*:/usr/hdp/3.1.5.0-152/hadoop-hdfs/.//*:/usr/hdp/3.1.5.0-152/hadoop-mapreduce/lib/*:/usr/hdp/3.1.5.0-152/hadoop-mapreduce/.//*:/usr/hdp/3.1.5.0-152/hadoop-yarn/./:/usr/hdp/3.1.5.0-152/hadoop-yarn/lib/*:/usr/hdp/3.1.5.0-152/hadoop-yarn/.//*:/usr/hdp/3.1.5.0-152/tez/*:/usr/hdp/3.1.5.0-152/tez/lib/*:/usr/hdp/3.1.5.0-152/tez/conf:/usr/hdp/3.1.5.0-152/tez/conf_llap:/usr/hdp/3.1.5.0-152/tez/doc:/usr/hdp/3.1.5.0-152/tez/hadoop-shim-0.9.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/hadoop-shim-2.8-0.9.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/lib:/usr/hdp/3.1.5.0-152/tez/man:/usr/hdp/3.1.5.0-152/tez/tez-api-0.9.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/tez-common-0.9.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/tez-dag-0.9.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/tez-examples-0.9.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/tez-history-parser-0.9.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/tez-javadoc-tools-0.9.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/tez-job-analyzer-0.9.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/tez-mapreduce-0.9.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/tez-protobuf-history-plugin-0.9.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/tez-runtime-internals-0.9.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/tez-runtime-library-0.9.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/tez-tests-0.9.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/tez-yarn-timeline-cache-plugin-0.9.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/tez-yarn-timeline-history-0.9.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/tez-yarn-timeline-history-with-acls-0.9.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/tez-yarn-timeline-history-with-fs-0.9.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/ui:/usr/hdp/3.1.5.0-152/tez/lib/async-http-client-1.9.40.jar:/usr/hdp/3.1.5.0-152/tez/lib/commons-cli-1.2.jar:/usr/hdp/3.1.5.0-152/tez/lib/commons-codec-1.4.jar:/usr/hdp/3.1.5.0-152/tez/lib/commons-collections-3.2.2.jar:/usr/hdp/3.1.5.0-152/tez/lib/commons-collections4-4.1.jar:/usr/hdp/3.1.5.0-152/tez/lib/commons-io-2.4.jar:/usr/hdp/3.1.5.0-152/tez/lib/commons-lang-2.6.jar:/usr/hdp/3.1.5.0-152/tez/lib/commons-math3-3.1.1.jar:/usr/hdp/3.1.5.0-152/tez/lib/gcs-connector-hadoop3-1.9.17.3.1.5.0-152-shaded.jar:/usr/hdp/3.1.5.0-152/tez/lib/guava-28.0-jre.jar:/usr/hdp/3.1.5.0-152/tez/lib/hadoop-aws-3.1.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/lib/hadoop-azure-3.1.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/lib/hadoop-azure-datalake-3.1.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/lib/hadoop-hdfs-client-3.1.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/lib/hadoop-mapreduce-client-common-3.1.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/lib/hadoop-mapreduce-client-core-3.1.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/lib/hadoop-yarn-server-timeline-pluginstorage-3.1.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/lib/jersey-client-1.19.jar:/usr/hdp/3.1.5.0-152/tez/lib/jersey-json-1.19.jar:/usr/hdp/3.1.5.0-152/tez/lib/jettison-1.3.4.jar:/usr/hdp/3.1.5.0-152/tez/lib/jetty-server-9.3.24.v20180605.jar:/usr/hdp/3.1.5.0-152/tez/lib/jetty-util-9.3.24.v20180605.jar:/usr/hdp/3.1.5.0-152/tez/lib/jsr305-3.0.0.jar:/usr/hdp/3.1.5.0-152/tez/lib/metrics-core-3.1.0.jar:/usr/hdp/3.1.5.0-152/tez/lib/protobuf-java-2.5.0.jar:/usr/hdp/3.1.5.0-152/tez/lib/RoaringBitmap-0.4.9.jar:/usr/hdp/3.1.5.0-152/tez/lib/servlet-api-2.5.jar:/usr/hdp/3.1.5.0-152/tez/lib/slf4j-api-1.7.10.jar:/usr/hdp/3.1.5.0-152/tez/lib/tez.tar.gz;
/opt/flink/bin/yarn-session.sh -d -nm flinkapp-from-ambari -n 1 -s 1 -jm 768 -tm 1024 -qu default >> /var/log/flink/flink-setup.log &
Batch模式
SET execution.runtime-mode = batch;
select * from sample;
Streaming模式
-- 設(shè)置 Flink 的運(yùn)行模式為流式計(jì)算
SET execution.runtime-mode = streaming;
-- 啟用動(dòng)態(tài)表選項(xiàng)
SET table.dynamic-table-options.enabled=true;
-- 設(shè)置 Flink SQL 執(zhí)行結(jié)果的輸出格式為 Tableau
SET sql-client.execution.result-mode=tableau;
-- 查詢 Hive Catalog 中的 Iceberg 表 sample 中的所有數(shù)據(jù)
select * from hive_catalog.iceberg_db.sample;
SET table.dynamic-table-options.enabled=true;
從 1.11 開始,用戶可以通過動(dòng)態(tài)參數(shù)的形式靈活地設(shè)置表的屬性參數(shù),覆蓋或者追加原表的 WITH (…) 語句內(nèi)定義的 table options。
基本語法為:
table_path /*+ OPTIONS(key=val [, key=val]*) */
動(dòng)態(tài)參數(shù)的使用沒有語境限制,只要是引用表的地方都可以追加定義。在指定的表后面追加的動(dòng)態(tài)參數(shù)會(huì)自動(dòng)追加到原表定義中
從當(dāng)前快照讀取所有記錄,然后從該快照讀取增量數(shù)據(jù)
SELECT * FROM sample5 /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/ ;
返回格式(會(huì)根據(jù)新增數(shù)據(jù)持續(xù)滾動(dòng))
Flink SQL> SELECT * FROM hive_catalog.iceberg_db.sample /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/ ;
2023-04-21 15:43:33,819 INFO org.apache.hadoop.yarn.client.RMProxy [] - Connecting to ResourceManager at bigdata-24-194/172.16.24.194:8050
2023-04-21 15:43:33,821 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2023-04-21 15:43:33,822 WARN org.apache.flink.yarn.YarnClusterDescriptor [] - Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set.The Flink YARN Client needs one of these to be set to properly load the Hadoop configuration for accessing YARN.
2023-04-21 15:43:33,830 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface bigdata-24-196:6588 of application 'application_1675237371712_0532'.
+----+----------------------+--------------------------------+
| op | id | data |
+----+----------------------+--------------------------------+
| +I | 34 | aeefb |
| +I | 34 | ae |
| +I | 1 | a |
| +I | 1 | a |
| +I | 34 | aee |
| +I | 34 | aeeefdsfafb |
讀取指定快照id(不包含)后的增量數(shù)據(jù)
SELECT * FROM sample /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'start-snapshot-id'='3821550127947089987')*/ ;
-
monitor-interval: 連續(xù)監(jiān)控新提交數(shù)據(jù)文件的時(shí)間間隔(默認(rèn)為10s)。
-
start-snapshot-id: 流作業(yè)開始的快照id。
注意:如果是無界數(shù)據(jù)流式upsert進(jìn)iceberg表(讀kafka,upsert進(jìn)iceberg表),那么再去流讀iceberg表會(huì)存在讀不出數(shù)據(jù)的問題。如果無界數(shù)據(jù)流式append進(jìn)iceberg表(讀kafka,append進(jìn)iceberg表),那么流讀該iceberg表可以正??吹浇Y(jié)果。
插入數(shù)據(jù)
INSERT INTO
INSERT INTO `hive_catalog`.`iceberg_db`.`sample` VALUES (1, 'a');
INSERT INTO `hive_catalog`.`iceberg_db`.`sample` SELECT id, data from sample2;
INSERT OVERWRITE
僅支持Flink的Batch模式
SET execution.runtime-mode = batch;
INSERT OVERWRITE sample VALUES (1, 'a');
INSERT OVERWRITE `hive_catalog`.`default`.`sample` PARTITION(data='a') SELECT 6;
UPSERT
當(dāng)將數(shù)據(jù)寫入v2
表格式時(shí),Iceberg支持基于主鍵的UPSERT。有兩種方法可以啟用upsert。
建表時(shí)指定
CREATE TABLE `hive_catalog`.`iceberg_db`.`sample5` (
`id` INT UNIQUE COMMENT 'unique id',
`data` STRING NOT NULL,
PRIMARY KEY(`id`) NOT ENFORCED
) with (
'format-version'='2',
'write.upsert.enabled'='true'
);
插入時(shí)指定
INSERT INTO `hive_catalog`.`iceberg_db`.`sample5` /*+ OPTIONS('upsert-enabled'='true') */ values(1,'a'),(2,'b'),(3,'c');
結(jié)果:
+----+-------------+--------------------------------+
| op | id | data |
+----+-------------+--------------------------------+
| +I | 1 | a |
| +I | 2 | b |
| +I | 3 | c |
+----+-------------+--------------------------------+
插入
INSERT INTO `hive_catalog`.`iceberg_db`.`sample5` /*+ OPTIONS('upsert-enabled'='true') */ values(1,'abc'));
結(jié)果
+----+-------------+--------------------------------+
| op | id | data |
+----+-------------+--------------------------------+
| +I | 1 | abc |
| +I | 2 | b |
| +I | 3 | c |
+----+-------------+--------------------------------+
插入的表,format-version需要為2。
OVERWRITE和UPSERT不能同時(shí)設(shè)置。在UPSERT模式下,如果對(duì)表進(jìn)行分區(qū),則分區(qū)字段必須也是主鍵。
讀取Kafka流,upsert插入到iceberg表中
下載:https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.12/1.14.3/flink-sql-connector-kafka_2.12-1.14.3.jar
將jar包放到$FLINK_HOME/lib
目錄下,重啟Flink On Yarn
這里先說一個(gè)大坑,Iceberg現(xiàn)階段的一個(gè)Bug
Kafka表必須要在default_catalog.default_database
下,即catalog
名為default_catalog
,數(shù)據(jù)庫(命名空間)為default_database
下,否則kafka類型的表讀取不到數(shù)據(jù)。
如果都在我們自己創(chuàng)建的catalog下創(chuàng)建,則執(zhí)行INSERT INTO hadoop_catalog.iceberg_db.sample6 SELECT * FROM default_catalog.default_database.kafka1;
后,在Flink任務(wù)中看不到一個(gè)持續(xù)執(zhí)行的Flink Job,而正常執(zhí)行該命令Flink會(huì)執(zhí)行一個(gè)持續(xù)執(zhí)行的任務(wù),去消費(fèi)kafka數(shù)據(jù)寫入Iceberg,正常情況如下圖:
所以這里我們kafka表在default_catalog.default_database
下,寫入數(shù)據(jù)的表在我們自己創(chuàng)建的hadoop_catalog.iceberg_db
下
create table default_catalog.default_database.kafka1(
id int,
data string
) with (
'connector' = 'kafka'
,'topic' = 'ttt'
,'properties.zookeeper.connect' = '172.16.24.194:2181'
,'properties.bootstrap.servers' = '172.16.24.194:9092'
,'format' = 'json'
,'properties.group.id'='iceberg1'
,'scan.startup.mode'='earliest-offset'
);
CREATE TABLE `hadoop_catalog`.`iceberg_db`.`sample6` (
`id` INT UNIQUE COMMENT 'unique id',
`data` STRING NOT NULL,
PRIMARY KEY(`id`) NOT ENFORCED
) with (
'format-version'='2',
'write.upsert.enabled'='true'
);
INSERT INTO hadoop_catalog.iceberg_db.sample6 SELECT * FROM default_catalog.default_database.kafka1;
此時(shí)我們往Kafka發(fā)送數(shù)據(jù):
{"id":123,"data":"llalalala"}
{"id":1123,"data":"asdfasfds"}
查看表中數(shù)據(jù)可以看到寫入成功
select * from hadoop_catalog.iceberg_db.sample6;
再次發(fā)送數(shù)據(jù)
{"id":123,"data":"JastData"}
查看表中數(shù)據(jù),發(fā)現(xiàn)修改成功文章來源:http://www.zghlxwxcb.cn/news/detail-425526.html
文章來源地址http://www.zghlxwxcb.cn/news/detail-425526.html
與Flink集成的不足
支持的特性 | Flink | 備注 |
---|---|---|
SQL create catalog | √ | |
SQL create database | √ | |
SQL create table | √ | |
SQL create table like | √ | |
SQL alter table | √ | 只支持修改表屬性,不支持更改列和分區(qū) |
SQL drop_table | √ | |
SQL select | √ | 支持流式和批處理模式 |
SQL insert into | √ | 支持流式和批處理模式 |
SQL insert overwrite | √ | |
DataStream read | √ | |
DataStream append | √ | |
DataStream overwrite | √ | |
Metadata tables | 支持Java API,不支持Flink SQL | |
Rewrite files action | √ |
- 不支持創(chuàng)建隱藏分區(qū)的Iceberg表。
- 不支持創(chuàng)建帶有計(jì)算列的Iceberg表。
- 不支持創(chuàng)建帶watermark的Iceberg表。
- 不支持添加列,刪除列,重命名列,更改列。
- Iceberg目前不支持Flink SQL 查詢表的元數(shù)據(jù)信息,需要使用Java API 實(shí)現(xiàn)。
到了這里,關(guān)于數(shù)據(jù)湖Iceberg-FlinkSQL集成(5)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!