簡介
概述
為了解決數(shù)據(jù)存儲和計算引擎之間的適配的問題,Netflix開發(fā)了Iceberg,2018年11月16日進入Apache孵化器,2020 年5月19日從孵化器畢業(yè),成為Apache的頂級項目。
Iceberg是一個面向海量數(shù)據(jù)分析場景的開放表格式(Table Format)。表格式(Table Format)可以理解為元數(shù)據(jù)以及數(shù)據(jù)文件的一種組織方式,處于計算框架(Flink,Spark…)之下,數(shù)據(jù)文件之上。
作用
大數(shù)據(jù)領(lǐng)域發(fā)展至今已經(jīng)經(jīng)歷了相當(dāng)長時間的發(fā)展和探索,雖然大數(shù)據(jù)技術(shù)的出現(xiàn)和迭代降低了用戶處理海量數(shù)據(jù)的門檻,但是有一個問題不能忽視,數(shù)據(jù)格式對不同引擎適配的對接。
也就是說我們在使用不同的引擎進行計算時,需要將數(shù)據(jù)根據(jù)引擎進行適配。這是相當(dāng)棘手的問題。
為此出現(xiàn)了一種新的解決方案:介于上層計算引擎和底層存儲格式之間的一個中間層。這個中間層不是數(shù)據(jù)存儲的方式,只是定義了數(shù)據(jù)的元數(shù)據(jù)組織方式,并且向引擎層面提供統(tǒng)一的類似傳統(tǒng)數(shù)據(jù)庫中"表"的語義。它的底層仍然是Parquet、ORC等存儲格式?;诖?,Netflix開發(fā)了Iceberg,目前已經(jīng)是Apache的頂級項目。
特性
數(shù)據(jù)存儲、計算引擎插件化
Iceberg提供一個開放通用的表格式(Table Format)實現(xiàn)方案,不和特定的數(shù)據(jù)存儲、計算引擎綁定。目前大數(shù)據(jù)領(lǐng)域的常見數(shù)據(jù)存儲(HDFS、S3…),計算引擎(Flink、Spark…)都可以接入Iceberg。
在生產(chǎn)環(huán)境中,可選擇不同的組件搭使用。甚至可以不通過計算引擎,直接讀取存在文件系統(tǒng)上的數(shù)據(jù)。
實時流批一體
Iceberg上游組件將數(shù)據(jù)寫入完成后,下游組件及時可讀,可查詢??梢詽M足實時場景。并且Iceberg同時提供了流/批讀接口、流/批寫接口。可以在同一個流程里, 同時處理流數(shù)據(jù)和批數(shù)據(jù),大大簡化了ETL鏈路。
數(shù)據(jù)表演化(Table Evolution)
Iceberg可以通過SQL的方式進行表級別模式演進。進行這些操作的時候,代價極低。 不存在讀出數(shù)據(jù)重新寫入或者遷移數(shù)據(jù)這種費時費力的操作。
比如在常用的Hive中,如果我們需要把一個按天分區(qū)的表,改成按小時分區(qū)。此時,不能再原表之上直接修改,只能新建一個按小時分區(qū)的表,然后再把數(shù)據(jù)Insert到新的小時分區(qū)表。而且,即使我們通過Rename的命令把新表的名字改為原表,使用原表的上次層應(yīng)用, 也可能由于分區(qū)字段修改,導(dǎo)致需要修改 SQL,這樣花費的經(jīng)歷是非常繁瑣的。
模式演化(Schema Evolution)
Iceberg支持下面幾種模式演化:
-
ADD:向表或者嵌套結(jié)構(gòu)增加新列
-
Drop:從表中或者嵌套結(jié)構(gòu)中移除一列
-
Rename:重命名表中或者嵌套結(jié)構(gòu)中的一列
-
Update:將復(fù)雜結(jié)構(gòu)(struct, map<key, value>, list)中的基本類型擴展類型長度, 比如tinyint修改成int.
-
Reorder:改變列或者嵌套結(jié)構(gòu)中字段的排列順序
Iceberg保證模式演化(Schema Evolution)是沒有副作用的獨立操作流程, 一個元數(shù)據(jù)操作, 不會涉及到重寫數(shù)據(jù)文件的過程。具體的如下:
-
增加列時候,不會從另外一個列中讀取已存在的的數(shù)據(jù)
-
刪除列或者嵌套結(jié)構(gòu)中字段的時候,不會改變?nèi)魏纹渌械闹?/p>
-
更新列或者嵌套結(jié)構(gòu)中字段的時候,不會改變?nèi)魏纹渌械闹?/p>
-
改變列列或者嵌套結(jié)構(gòu)中字段順序的時候,不會改變相關(guān)聯(lián)的值
在表中Iceberg 使用唯一ID來定位每一列的信息。新增一個列的時候,會新分配給它一個唯一ID, 并且絕對不會使用已經(jīng)被使用的ID。
使用名稱或者位置信息來定位列的, 都會存在一些問題, 比如使用名稱的話,名稱可能會重復(fù), 使用位置的話, 不能修改順序并且廢棄的字段也不能刪除。
分區(qū)演化(Partition Evolution)
Iceberg可以在一個已存在的表上直接修改,因為Iceberg的查詢流程并不和分區(qū)信息直接關(guān)聯(lián)。
當(dāng)我們改變一個表的分區(qū)策略時,對應(yīng)修改分區(qū)之前的數(shù)據(jù)不會改變, 依然會采用老的分區(qū)策略,新的數(shù)據(jù)會采用新的分區(qū)策略,也就是說同一個表會有兩種分區(qū)策略,舊數(shù)據(jù)采用舊分區(qū)策略,新數(shù)據(jù)采用新新分區(qū)策略, 在元數(shù)據(jù)里兩個分區(qū)策略相互獨立,不重合。
在查詢數(shù)據(jù)的時候,如果存在跨分區(qū)策略的情況,則會解析成兩個不同執(zhí)行計劃,如Iceberg官網(wǎng)提供圖所示:
圖中booking_table表2008年按月分區(qū),進入2009年后改為按天分區(qū),這種中分區(qū)策略共存于該表中。
借助Iceberg的隱藏分區(qū)(Hidden Partition),在寫SQL 查詢的時候,不需要在SQL中特別指定分區(qū)過濾條件,Iceberg會自動分區(qū),過濾掉不需要的數(shù)據(jù)。
Iceberg分區(qū)演化操作同樣是一個元數(shù)據(jù)操作, 不會重寫數(shù)據(jù)文件。
列順序演化(Sort Order Evolution)
Iceberg可以在一個已經(jīng)存在的表上修改排序策略。修改了排序策略之后, 舊數(shù)據(jù)依舊采用老排序策略不變。往Iceberg里寫數(shù)據(jù)的計算引擎總是會選擇最新的排序策略, 但是當(dāng)排序的代價極其高昂的時候, 就不進行排序了。
隱藏分區(qū)(Hidden Partition)
Iceberg的分區(qū)信息并不需要人工維護, 它可以被隱藏起來. 不同其他類似Hive 的分區(qū)策略, Iceberg的分區(qū)字段/策略(通過某一個字段計算出來),可以不是表的字段和表數(shù)據(jù)存儲目錄也沒有關(guān)系。在建表或者修改分區(qū)策略之后,新的數(shù)據(jù)會自動計算所屬于的分區(qū)。在查詢的時候同樣不用關(guān)系表的分區(qū)是什么字段/策略,只需要關(guān)注業(yè)務(wù)邏輯,Iceberg會自動過濾不需要的分區(qū)數(shù)據(jù)。
正是由于Iceberg的分區(qū)信息和表數(shù)據(jù)存儲目錄是獨立的,使得Iceberg的表分區(qū)可以被修改,而且不和涉及到數(shù)據(jù)遷移。
鏡像數(shù)據(jù)查詢(Time Travel)
Iceberg提供了查詢表歷史某一時間點數(shù)據(jù)鏡像(snapshot)的能力。通過該特性可以將最新的SQL邏輯,應(yīng)用到歷史數(shù)據(jù)上。
支持事務(wù)(ACID)
Iceberg通過提供事務(wù)(ACID)的機制,使其具備了upsert的能力并且使得邊寫邊讀成為可能,從而數(shù)據(jù)可以更快的被下游組件消費。通過事務(wù)保證了下游組件只能消費已commit的數(shù)據(jù),而不會讀到部分甚至未提交的數(shù)據(jù)。
基于樂觀鎖的并發(fā)支持
Iceberg基于樂觀鎖提供了多個程序并發(fā)寫入的能力并且保證數(shù)據(jù)線性一致。
文件級數(shù)據(jù)剪裁
Iceberg的元數(shù)據(jù)里面提供了每個數(shù)據(jù)文件的一些統(tǒng)計信息,比如最大值,最小值,Count計數(shù)等等。因此,查詢SQL的過濾條件除了常規(guī)的分區(qū),列過濾,甚至可以下推到文件級別,大大加快了查詢效率。
其他數(shù)據(jù)湖框架的對比
存儲結(jié)構(gòu)
數(shù)據(jù)文件 data files
數(shù)據(jù)文件是Apache Iceberg表真實存儲數(shù)據(jù)的文件,一般是在表的數(shù)據(jù)存儲目錄的data目錄下,如果我們的文件格式選擇的是parquet,那么文件是以“.parquet”結(jié)尾。
例如:00000-0-atguigu_20230203160458_22ee74c9-643f-4b27-8fc1-9cbd5f64dad4-job_1675409881387_0007-00001.parquet 就是一個數(shù)據(jù)文件。
Iceberg每次更新會產(chǎn)生多個數(shù)據(jù)文件(data files)。
表快照 Snapshot
快照代表一張表在某個時刻的狀態(tài)。每個快照里面會列出表在某個時刻的所有 data files 列表。data files是存儲在不同的manifest files里面,manifest files是存儲在一個Manifest list文件里面,而一個Manifest list文件代表一個快照。
清單列表 Manifest list
manifest list是一個元數(shù)據(jù)文件,它列出構(gòu)建表快照(Snapshot)的清單(Manifest file)。這個元數(shù)據(jù)文件中存儲的是Manifest file列表,每個Manifest file占據(jù)一行。每行中存儲了Manifest file的路徑、其存儲的數(shù)據(jù)文件(data files)的分區(qū)范圍,增加了幾個數(shù)文件、刪除了幾個數(shù)據(jù)文件等信息,這些信息可以用來在查詢時提供過濾,加快速度。
例如:snap-6746266566064388720-1-52f2f477-2585-4e69-be42-bbad9a46ed17.avro就是一個Manifest List文件。
清單文件 Manifest file
Manifest file也是一個元數(shù)據(jù)文件,它列出組成快照(snapshot)的數(shù)據(jù)文件(data files)的列表信息。每行都是每個數(shù)據(jù)文件的詳細描述,包括數(shù)據(jù)文件的狀態(tài)、文件路徑、分區(qū)信息、列級別的統(tǒng)計信息(比如每列的最大最小值、空值數(shù)等)、文件的大小以及文件里面數(shù)據(jù)行數(shù)等信息。其中列級別的統(tǒng)計信息可以在掃描表數(shù)據(jù)時過濾掉不必要的文件。
Manifest file是以avro格式進行存儲的,以“.avro”后綴結(jié)尾,例如:52f2f477-2585-4e69-be42-bbad9a46ed17-m0.avro。
與 Hive集成
環(huán)境準(zhǔn)備
(1)Hive與Iceberg的版本對應(yīng)關(guān)系如下
Hive 版本 | 官方推薦Hive版本 | Iceberg 版本 |
---|---|---|
2.x | 2.3.8 | 0.8.0-incubating – 1.1.0 |
3.x | 3.1.2 | 0.10.0 – 1.1.0 |
Iceberg與Hive 2和Hive 3.1.2/3的集成,支持以下特性:
-
創(chuàng)建表
-
刪除表
-
讀取表
-
插入表(INSERT into)
更多功能需要Hive 4.x(目前alpha版本)才能支持。
(2)上傳jar包,拷貝到Hive的auxlib目錄中
mkdir auxlib
cp iceberg-hive-runtime-1.1.0.jar /opt/module/hive/auxlib
cp libfb303-0.9.3.jar /opt/module/hive/auxlibcp iceberg-hive-runtime-1.1.0.jar /opt/module/hive/auxlibcp libfb303-0.9.3.jar /opt/module/hive/auxlib
(3)修改hive-site.xml,添加配置項
<property>
<name>iceberg.engine.hive.enabled</name>
<value>true</value>
</property>
<property>
<name>hive.aux.jars.path</name>
<value>/opt/module/hive/auxlib</value>
</property>
使用TEZ引擎注意事項:
-
使用Hive版本>=3.1.2,需要TEZ版本>=0.10.1
-
指定tez更新配置:
<property> <name>tez.mrreader.config.update.properties</name> <value>hive.io.file.readcolumn.names,hive.io.file.readcolumn.ids</value> </property>
-
從Iceberg 0.11.0開始,如果Hive使用Tez引擎,需要關(guān)閉向量化執(zhí)行:
<property> <name>hive.vectorized.execution.enabled</name> <value>false</value> </property>
(4)啟動HMS服務(wù)
(5)啟動 Hadoop
創(chuàng)建和管理 Catalog
Iceberg支持多種不同的Catalog類型,例如:Hive、Hadoop、亞馬遜的AWS Glue和自定義Catalog。
根據(jù)不同配置,分為三種情況:
- 沒有設(shè)置iceberg.catalog,默認使用HiveCatalog
配置項 | 說明 |
---|---|
iceberg.catalog.<catalog_name>.type | Catalog的類型: hive, hadoop, 如果使用自定義Catalog,則不設(shè)置 |
iceberg.catalog.<catalog_name>.catalog-impl | Catalog的實現(xiàn)類, 如果上面的type沒有設(shè)置,則此參數(shù)必須設(shè)置 |
iceberg.catalog.<catalog_name>.<key> | Catalog的其他配置項 |
-
設(shè)置了 iceberg.catalog的類型,使用指定的Catalog類型,如下表格:
-
設(shè)置 iceberg.catalog=location_based_table,直接通過指定的根路徑來加載Iceberg表
默認使用 HiveCatalog
CREATE TABLE iceberg_test1 (i int) STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler';
INSERT INTO iceberg_test1 values(1);
查看HDFS可以發(fā)現(xiàn),表目錄在默認的hive倉庫路徑下。
指定 Catalog 類型
(1)使用 HiveCatalog
set iceberg.catalog.iceberg_hive.type=hive;
set iceberg.catalog.iceberg_hive.uri=thrift://hadoop1:9083;
set iceberg.catalog.iceberg_hive.clients=10;
set iceberg.catalog.iceberg_hive.warehouse=hdfs://hadoop1:8020/warehouse/iceberg-hive;
CREATE TABLE iceberg_test2 (i int)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
TBLPROPERTIES('iceberg.catalog'='iceberg_hive');
INSERT INTO iceberg_test2 values(1);
(2)使用 HadoopCatalog
set iceberg.catalog.iceberg_hadoop.type=hadoop;
set iceberg.catalog.iceberg_hadoop.warehouse=hdfs://hadoop1:8020/warehouse/iceberg-hadoop;
CREATE TABLE iceberg_test3 (i int)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
LOCATION 'hdfs://hadoop1:8020/warehouse/iceberg-hadoop/default/iceberg_test3'
TBLPROPERTIES('iceberg.catalog'='iceberg_hadoop');
INSERT INTO iceberg_test3 values(1);
指定路徑加載
如果HDFS中已經(jīng)存在iceberg格式表,我們可以通過在Hive中創(chuàng)建Icerberg格式表指定對應(yīng)的location路徑映射數(shù)據(jù)。
CREATE EXTERNAL TABLE iceberg_test4 (i int)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
LOCATION 'hdfs://hadoop1:8020/warehouse/iceberg-hadoop/default/iceberg_test3'
TBLPROPERTIES ('iceberg.catalog'='location_based_table');
基本操作
創(chuàng)建表
(1)創(chuàng)建外部表
CREATE EXTERNAL TABLE iceberg_create1 (i int)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler';
describe formatted iceberg_create1;
(2)創(chuàng)建內(nèi)部表
CREATE TABLE iceberg_create2 (i int)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler';
describe formatted iceberg_create2;
(3)創(chuàng)建分區(qū)表
CREATE EXTERNAL TABLE iceberg_create3 (id int,name string)
PARTITIONED BY (age int)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler';
describe formatted iceberg_create3;
Hive語法創(chuàng)建分區(qū)表,不會在HMS中創(chuàng)建分區(qū),而是將分區(qū)數(shù)據(jù)轉(zhuǎn)換為Iceberg標(biāo)識分區(qū)。這種情況下不能使用Iceberg的分區(qū)轉(zhuǎn)換,例如:days(timestamp),如果想要使用Iceberg格式表的分區(qū)轉(zhuǎn)換標(biāo)識分區(qū),需要使用Spark或者Flink引擎創(chuàng)建表。
修改表
只支持HiveCatalog表修改表屬性,Iceberg表屬性和Hive表屬性存儲在HMS中是同步的。
ALTER TABLE iceberg_create1 SET TBLPROPERTIES('external.table.purge'='FALSE');
插入表
支持標(biāo)準(zhǔn)單表INSERT INTO操作:
INSERT INTO iceberg_create2 VALUES (1);
INSERT INTO iceberg_create1 select * from iceberg_create2;
在HIVE 3.x中,INSERT OVERWRITE雖然能執(zhí)行,但其實是追加。
刪除表
DROP TABLE iceberg_create1;
與 Spark SQL集成
環(huán)境準(zhǔn)備
(1)安裝 Spark
1)Spark與Iceberg的版本對應(yīng)關(guān)系如下
Spark 版本 | Iceberg 版本 |
---|---|
2.4 | 0.7.0-incubating – 1.1.0 |
3.0 | 0.9.0 – 1.0.0 |
3.1 | 0.12.0 – 1.1.0 |
3.2 | 0.13.0 – 1.1.0 |
3.3 | 0.14.0 – 1.1.0 |
2)上傳并解壓Spark安裝包
tar -zxvf spark-3.3.1-bin-hadoop3.tgz -C /opt/module/
mv /opt/module/spark-3.3.1-bin-hadoop3 /opt/module/spark-3.3.1
3)配置環(huán)境變量
sudo vim /etc/profile.d/my_env.sh
export SPARK_HOME=/opt/module/spark-3.3.1
export PATH=$PATH:$SPARK_HOME/bin
source /etc/profile.d/my_env.sh
4)拷貝iceberg的jar包到Spark的jars目錄
cp /opt/software/iceberg/iceberg-spark-runtime-3.3_2.12-1.1.0.jar /opt/module/spark-3.3.1/jars
(2)啟動 Hadoop
Spark 配置 Catalog
Spark中支持兩種Catalog的設(shè)置:hive和hadoop,Hive Catalog就是Iceberg表存儲使用Hive默認的數(shù)據(jù)路徑,Hadoop Catalog需要指定Iceberg格式表存儲路徑。
vim spark-defaults.conf
Hive Catalog
spark.sql.catalog.hive_prod = org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.hive_prod.type = hive
spark.sql.catalog.hive_prod.uri = thrift://hadoop1:9083
use hive_prod.db;
Hadoop Catalog
spark.sql.catalog.hadoop_prod = org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.hadoop_prod.type = hadoop
spark.sql.catalog.hadoop_prod.warehouse = hdfs://hadoop1:8020/warehouse/spark-iceberg
use hadoop_prod.db;
SQL 操作
創(chuàng)建表
use hadoop_prod;
create database default;
use default;
CREATE TABLE hadoop_prod.default.sample1 (
id bigint COMMENT 'unique id',
data string)
USING iceberg
-
PARTITIONED BY (partition-expressions) :配置分區(qū)
-
LOCATION ‘(fully-qualified-uri)’ :指定表路徑
-
COMMENT ‘table documentation’ :配置表備注
-
TBLPROPERTIES (‘key’=‘value’, …) :配置表屬性
表屬性:https://iceberg.apache.org/docs/latest/configuration/
對Iceberg表的每次更改都會生成一個新的元數(shù)據(jù)文件(json文件)以提供原子性。默認情況下,舊元數(shù)據(jù)文件作為歷史文件保存不會刪除。
如果要自動清除元數(shù)據(jù)文件,在表屬性中設(shè)置write.metadata.delete-after-commit.enabled=true。這將保留一些元數(shù)據(jù)文件(直到write.metadata.previous-versions-max),并在每個新創(chuàng)建的元數(shù)據(jù)文件之后刪除舊的元數(shù)據(jù)文件。
(1)創(chuàng)建分區(qū)表
1)分區(qū)表
CREATE TABLE hadoop_prod.default.sample2 (
id bigint,
data string,
category string)
USING iceberg
PARTITIONED BY (category)
2)創(chuàng)建隱藏分區(qū)表
CREATE TABLE hadoop_prod.default.sample3 (
id bigint,
data string,
category string,
ts timestamp)
USING iceberg
PARTITIONED BY (bucket(16, id), days(ts), category)
支持的轉(zhuǎn)換有:
-
years(ts):按年劃分
-
months(ts):按月劃分
-
days(ts)或date(ts):等效于dateint分區(qū)
-
hours(ts)或date_hour(ts):等效于dateint和hour分區(qū)
-
bucket(N, col):按哈希值劃分mod N個桶
-
truncate(L, col):按截斷為L的值劃分
字符串被截斷為給定的長度
整型和長型截斷為bin: truncate(10, i)生成分區(qū)0,10,20,30,…
(2)使用 CTAS 語法建表
CREATE TABLE hadoop_prod.default.sample4
USING iceberg
AS SELECT * from hadoop_prod.default.sample3
不指定分區(qū)就是無分區(qū),需要重新指定分區(qū)、表屬性:
CREATE TABLE hadoop_prod.default.sample5
USING iceberg
PARTITIONED BY (bucket(8, id), hours(ts), category)
TBLPROPERTIES ('key'='value')
AS SELECT * from hadoop_prod.default.sample3
(3)使用 Replace table 建表
REPLACE TABLE hadoop_prod.default.sample5
USING iceberg
AS SELECT * from hadoop_prod.default.sample3
REPLACE TABLE hadoop_prod.default.sample5
USING iceberg
PARTITIONED BY (part)
TBLPROPERTIES ('key'='value')
AS SELECT * from hadoop_prod.default.sample3
CREATE OR REPLACE TABLE hadoop_prod.default.sample6
USING iceberg
AS SELECT * from hadoop_prod.default.sample3
刪除表
對于HadoopCatalog而言,運行DROP TABLE將從catalog中刪除表并刪除表內(nèi)容。
CREATE EXTERNAL TABLE hadoop_prod.default.sample7 (
id bigint COMMENT 'unique id',
data string)
USING iceberg
INSERT INTO hadoop_prod.default.sample7 values(1,'a')
DROP TABLE hadoop_prod.default.sample7
對于HiveCatalog而言:
-
在0.14之前,運行DROP TABLE將從catalog中刪除表并刪除表內(nèi)容。
-
從0.14開始,DROP TABLE只會從catalog中刪除表,不會刪除數(shù)據(jù)。為了刪除表內(nèi)容,應(yīng)該使用DROP table PURGE。
CREATE TABLE hive_prod.default.sample7 (
id bigint COMMENT 'unique id',
data string)
USING iceberg
INSERT INTO hive_prod.default.sample7 values(1,'a')
(1)刪除表
DROP TABLE hive_prod.default.sample7
(2)刪除表和數(shù)據(jù)
DROP TABLE hive_prod.default.sample7 PURGE
修改表
Iceberg在Spark 3中完全支持ALTER TABLE,包括:
-
重命名表
-
設(shè)置或刪除表屬性
-
添加、刪除和重命名列
-
添加、刪除和重命名嵌套字段
-
重新排序頂級列和嵌套結(jié)構(gòu)字段
-
擴大int、float和decimal字段的類型
-
將必選列變?yōu)榭蛇x列
此外,還可以使用SQL擴展來添加對分區(qū)演變的支持和設(shè)置表的寫順序。
CREATE TABLE hive_prod.default.sample1 (
id bigint COMMENT 'unique id',
data string)
USING iceberg
(1)修改表名(不支持修改HadoopCatalog的表名)
ALTER TABLE hive_prod.default.sample1 RENAME TO hive_prod.default.sample2
(2)修改表屬性
-
修改表屬性
ALTER TABLE hive_prod.default.sample1 SET TBLPROPERTIES ( 'read.split.target-size'='268435456' ) ALTER TABLE hive_prod.default.sample1 SET TBLPROPERTIES ( 'comment' = 'A table comment.' )
-
刪除表屬性
ALTER TABLE hive_prod.default.sample1 UNSET TBLPROPERTIES ('read.split.target-size')
(3)添加列
ALTER TABLE hadoop_prod.default.sample1
ADD COLUMNS (
category string comment 'new_column'
)
-- 添加struct類型的列
ALTER TABLE hadoop_prod.default.sample1
ADD COLUMN point struct<x: double, y: double>;
-- 往struct類型的列中添加字段
ALTER TABLE hadoop_prod.default.sample1
ADD COLUMN point.z double
-- 創(chuàng)建struct的嵌套數(shù)組列
ALTER TABLE hadoop_prod.default.sample1
ADD COLUMN points array<struct<x: double, y: double>>;
-- 在數(shù)組中的結(jié)構(gòu)中添加一個字段。使用關(guān)鍵字'element'訪問數(shù)組的元素列。
ALTER TABLE hadoop_prod.default.sample1
ADD COLUMN points.element.z double
-- 創(chuàng)建一個包含Map類型的列,key和value都為struct類型
ALTER TABLE hadoop_prod.default.sample1
ADD COLUMN pointsm map<struct<x: int>, struct<a: int>>;
-- 在Map類型的value的struct中添加一個字段。
ALTER TABLE hadoop_prod.default.sample1
ADD COLUMN pointsm.value.b int
在Spark 2.4.4及以后版本中,可以通過添加FIRST或AFTER子句在任何位置添加列:
ALTER TABLE hadoop_prod.default.sample1
ADD COLUMN new_column1 bigint AFTER id
ALTER TABLE hadoop_prod.default.sample1
ADD COLUMN new_column2 bigint FIRST
(4)修改列
-
修改列名
ALTER TABLE hadoop_prod.default.sample1 RENAME COLUMN data TO data1
-
Alter Column修改類型(只允許安全的轉(zhuǎn)換)
ALTER TABLE hadoop_prod.default.sample1 ADD COLUMNS ( idd int ) ALTER TABLE hadoop_prod.default.sample1 ALTER COLUMN idd TYPE bigint
-
Alter Column 修改列的注釋
ALTER TABLE hadoop_prod.default.sample1 ALTER COLUMN id TYPE double COMMENT 'a' ALTER TABLE hadoop_prod.default.sample1 ALTER COLUMN id COMMENT 'b'
-
Alter Column修改列的順序
ALTER TABLE hadoop_prod.default.sample1 ALTER COLUMN id FIRST ALTER TABLE hadoop_prod.default.sample1 ALTER COLUMN new_column2 AFTER new_column1
-
Alter Column修改列是否允許為null
ALTER TABLE hadoop_prod.default.sample1 ALTER COLUMN id DROP NOT NULL
ALTER COLUMN不用于更新struct類型。使用ADD COLUMN和DROP COLUMN添加或刪除struct類型的字段。
(5)刪除列
ALTER TABLE hadoop_prod.default.sample1 DROP COLUMN idd
ALTER TABLE hadoop_prod.default.sample1 DROP COLUMN point.z
(6)添加分區(qū)(Spark3,需要配置擴展)
vim spark-default.conf
spark.sql.extensions = org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
重新進入spark-sql shell:
ALTER TABLE hadoop_prod.default.sample1 ADD PARTITION FIELD category
ALTER TABLE hadoop_prod.default.sample1 ADD PARTITION FIELD bucket(16, id)
ALTER TABLE hadoop_prod.default.sample1 ADD PARTITION FIELD truncate(data, 4)
ALTER TABLE hadoop_prod.default.sample1 ADD PARTITION FIELD years(ts)
ALTER TABLE hadoop_prod.default.sample1 ADD PARTITION FIELD bucket(16, id) AS shard
(7)刪除分區(qū)(Spark3,需要配置擴展)
ALTER TABLE hadoop_prod.default.sample1 DROP PARTITION FIELD category
ALTER TABLE hadoop_prod.default.sample1 DROP PARTITION FIELD bucket(16, id)
ALTER TABLE hadoop_prod.default.sample1 DROP PARTITION FIELD truncate(data, 4)
ALTER TABLE hadoop_prod.default.sample1 DROP PARTITION FIELD years(ts)
ALTER TABLE hadoop_prod.default.sample1 DROP PARTITION FIELD shard
注意,盡管刪除了分區(qū),但列仍然存在于表結(jié)構(gòu)中。
刪除分區(qū)字段是元數(shù)據(jù)操作,不會改變?nèi)魏维F(xiàn)有的表數(shù)據(jù)。新數(shù)據(jù)將被寫入新的分區(qū),但現(xiàn)有數(shù)據(jù)將保留在舊的分區(qū)布局中。
當(dāng)分區(qū)發(fā)生變化時,動態(tài)分區(qū)覆蓋行為也會發(fā)生變化。例如,如果按天劃分分區(qū),而改為按小時劃分分區(qū),那么覆蓋將覆蓋每小時劃分的分區(qū),而不再覆蓋按天劃分的分區(qū)。
刪除分區(qū)字段時要小心,可能導(dǎo)致元數(shù)據(jù)查詢失敗或產(chǎn)生不同的結(jié)果。
(8)修改分區(qū)(Spark3,需要配置擴展)
ALTER TABLE hadoop_prod.default.sample1 REPLACE PARTITION FIELD bucket(16, id) WITH bucket(8, id)
(9)修改表的寫入順序
ALTER TABLE hadoop_prod.default.sample1 WRITE ORDERED BY category, id
ALTER TABLE hadoop_prod.default.sample1 WRITE ORDERED BY category ASC, id DESC
ALTER TABLE hadoop_prod.default.sample1 WRITE ORDERED BY category ASC NULLS LAST, id DESC NULLS FIRST
表寫順序不能保證查詢的數(shù)據(jù)順序。它只影響數(shù)據(jù)寫入表的方式。
WRITE ORDERED BY設(shè)置了一個全局排序,即跨任務(wù)的行排序,就像在INSERT命令中使用ORDER BY一樣:
INSERT INTO hadoop_prod.default.sample1
SELECT id, data, category, ts FROM another_table
ORDER BY ts, category
要在每個任務(wù)內(nèi)排序,而不是跨任務(wù)排序,使用local ORDERED BY:
ALTER TABLE hadoop_prod.default.sample1 WRITE LOCALLY ORDERED BY category, id
(10)按分區(qū)并行寫入
ALTER TABLE hadoop_prod.default.sample1 WRITE DISTRIBUTED BY PARTITION
ALTER TABLE hadoop_prod.default.sample1 WRITE DISTRIBUTED BY PARTITION LOCALLY ORDERED BY category, id
插入數(shù)據(jù)
CREATE TABLE hadoop_prod.default.a (
id bigint,
count bigint)
USING iceberg
CREATE TABLE hadoop_prod.default.b (
id bigint,
count bigint,
flag string)
USING iceberg
(1)Insert Into
INSERT INTO hadoop_prod.default.a VALUES (1, 1), (2, 2), (3, 3);
INSERT INTO hadoop_prod.default.b VALUES (1, 1, 'a'), (2, 2, 'b'), (4, 4, 'd');
(2)MERGE INTO行級更新
MERGE INTO hadoop_prod.default.a t
USING (SELECT * FROM hadoop_prod.default.b) u ON t.id = u.id
WHEN MATCHED AND u.flag='b' THEN UPDATE SET t.count = t.count + u.count
WHEN MATCHED AND u.flag='a' THEN DELETE
WHEN NOT MATCHED THEN INSERT (id,count) values (u.id,u.count)
查詢數(shù)據(jù)
(1)普通查詢
SELECT count(1) as count, data
FROM local.db.table
GROUP BY data
(2)查詢元數(shù)據(jù)
// 查詢表快照
SELECT * FROM hadoop_prod.default.a.snapshots
// 查詢數(shù)據(jù)文件信息
SELECT * FROM hadoop_prod.default.a.files
// 查詢表歷史
SELECT * FROM hadoop_prod.default.a.history
// 查詢 manifest
ELECT * FROM hadoop_prod.default.a.manifests
存儲過程
Procedures可以通過CALL從任何已配置的Iceberg Catalog中使用。所有Procedures都在namespace中。
(1)語法
按照參數(shù)名傳參
CALL catalog_name.system.procedure_name(arg_name_2 => arg_2, arg_name_1 => arg_1)
當(dāng)按位置傳遞參數(shù)時,如果結(jié)束參數(shù)是可選的,則只有結(jié)束參數(shù)可以省略。
CALL catalog_name.system.procedure_name(arg_1, arg_2, ... arg_n)
(2)快照管理
-
回滾到指定的快照id
CALL hadoop_prod.system.rollback_to_snapshot('default.a', 7601163594701794741)
-
回滾到指定時間的快照
CALL hadoop_prod.system.rollback_to_timestamp('db.sample', TIMESTAMP '2021-06-30 00:00:00.000')
-
設(shè)置表的當(dāng)前快照ID
CALL hadoop_prod.system.set_current_snapshot('db.sample', 1)
-
從快照變?yōu)楫?dāng)前表狀態(tài)
CALL hadoop_prod.system.cherrypick_snapshot('default.a', 7629160535368763452) CALL hadoop_prod.system.cherrypick_snapshot(snapshot_id => 7629160535368763452, table => 'default.a' )
(3)元數(shù)據(jù)管理
-
刪除早于指定日期和時間的快照,但保留最近100個快照:
CALL hive_prod.system.expire_snapshots('db.sample', TIMESTAMP '2021-06-30 00:00:00.000', 100)
-
刪除Iceberg表中任何元數(shù)據(jù)文件中沒有引用的文件
#列出所有需要刪除的候選文件 CALL catalog_name.system.remove_orphan_files(table => 'db.sample', dry_run => true) #刪除指定目錄中db.sample表不知道的任何文件 CALL catalog_name.system.remove_orphan_files(table => 'db.sample', location => 'tablelocation/data')
-
合并數(shù)據(jù)文件(合并小文件)
CALL catalog_name.system.rewrite_data_files('db.sample') CALL catalog_name.system.rewrite_data_files(table => 'db.sample', strategy => 'sort', sort_order => 'id DESC NULLS LAST,name ASC NULLS FIRST') CALL catalog_name.system.rewrite_data_files(table => 'db.sample', strategy => 'sort', sort_order => 'zorder(c1,c2)') CALL catalog_name.system.rewrite_data_files(table => 'db.sample', options => map('min-input-files','2')) CALL catalog_name.system.rewrite_data_files(table => 'db.sample', where => 'id = 3 and name = "foo"')
-
重寫表清單來優(yōu)化執(zhí)行計劃
CALL catalog_name.system.rewrite_manifests('db.sample') #重寫表db中的清單。并禁用Spark緩存的使用。這樣做可以避免執(zhí)行程序上的內(nèi)存問題。 CALL catalog_name.system.rewrite_manifests('db.sample', false)
(4)遷移表
-
快照
CALL catalog_name.system.snapshot('db.sample', 'db.snap') CALL catalog_name.system.snapshot('db.sample', 'db.snap', '/tmp/temptable/')
-
遷移
CALL catalog_name.system.migrate('spark_catalog.db.sample', map('foo', 'bar')) CALL catalog_name.system.migrate('db.sample')
-
添加數(shù)據(jù)文件
CALL spark_catalog.system.add_files( table => 'db.tbl', source_table => 'db.src_tbl', partition_filter => map('part_col_1', 'A') ) CALL spark_catalog.system.add_files( table => 'db.tbl', source_table => '`parquet`.`path/to/table`' )
(5)元數(shù)據(jù)信息
-
獲取指定快照的父快照id
CALL spark_catalog.system.ancestors_of('db.tbl')
-
獲取指定快照的所有祖先快照
CALL spark_catalog.system.ancestors_of('db.tbl', 1) CALL spark_catalog.system.ancestors_of(snapshot_id => 1, table => 'db.tbl')
DataFrame 操作
環(huán)境準(zhǔn)備
(1)創(chuàng)建maven工程,配置pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.atguigu.iceberg</groupId>
<artifactId>spark-iceberg-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<scala.binary.version>2.12</scala.binary.version>
<spark.version>3.3.1</spark.version>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<!-- Spark的依賴引入 -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.binary.version}</artifactId>
<scope>provided</scope>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.binary.version}</artifactId>
<scope>provided</scope>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.binary.version}</artifactId>
<scope>provided</scope>
<version>${spark.version}</version>
</dependency>
<!--fastjson <= 1.2.80 存在安全漏洞,-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-spark-runtime-3.3 -->
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-spark-runtime-3.3_2.12</artifactId>
<version>1.1.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- assembly打包插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
<configuration>
<archive>
<manifest>
</manifest>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
</plugin>
<!--Maven編譯scala所需依賴-->
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>3.2.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
(2)配置Catalog
val spark: SparkSession = SparkSession.builder().master("local").appName(this.getClass.getSimpleName)
//指定hive catalog, catalog名稱為iceberg_hive
.config("spark.sql.catalog.iceberg_hive", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.iceberg_hive.type", "hive")
.config("spark.sql.catalog.iceberg_hive.uri", "thrift://hadoop1:9083")
// .config("iceberg.engine.hive.enabled", "true")
//指定hadoop catalog,catalog名稱為iceberg_hadoop
.config("spark.sql.catalog.iceberg_hadoop", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.iceberg_hadoop.type", "hadoop")
.config("spark.sql.catalog.iceberg_hadoop.warehouse", "hdfs://hadoop1:8020/warehouse/spark-iceberg")
.getOrCreate()
讀取表
(1)加載表
spark.read
.format("iceberg")
.load("hdfs://hadoop1:8020/warehouse/spark-iceberg/default/a")
.show()
或
// 僅支持Spark3.0以上
spark.table("iceberg_hadoop.default.a")
.show()
(2)時間旅行:指定時間查詢
spark.read
.option("as-of-timestamp", "499162860000")
.format("iceberg")
.load("hdfs://hadoop1:8020/warehouse/spark-iceberg/default/a")
.show()
(3)時間旅行:指定快照id查詢
spark.read
.option("snapshot-id", 7601163594701794741L)
.format("iceberg")
.load("hdfs://hadoop1:8020/warehouse/spark-iceberg/default/a")
.show()
(4)增量查詢
spark.read
.format("iceberg")
.option("start-snapshot-id", "10963874102873")
.option("end-snapshot-id", "63874143573109")
.load("hdfs://hadoop1:8020/warehouse/spark-iceberg/default/a")
.show()
查詢的表只能是append的方式寫數(shù)據(jù),不支持replace, overwrite, delete操作。
檢查表
(1)查詢元數(shù)據(jù)
spark.read.format("iceberg").load("iceberg_hadoop.default.a.files")
spark.read.format("iceberg").load("hdfs://hadoop1:8020/warehouse/spark-iceberg/default/a#files")
(2)元數(shù)據(jù)表時間旅行查詢
spark.read
.format("iceberg")
.option("snapshot-id", 7601163594701794741L)
.load("iceberg_hadoop.default.a.files")
寫入表
(1)創(chuàng)建樣例類,準(zhǔn)備DF
case class Sample(id:Int,data:String,category:String)
val df: DataFrame = spark.createDataFrame(Seq(Sample(1,'A', 'a'), Sample(2,'B', 'b'), Sample(3,'C', 'c')))
(2)插入數(shù)據(jù)并建表
df.writeTo("iceberg_hadoop.default.table1").create()
import spark.implicits._
df.writeTo("iceberg_hadoop.default.table1")
.tableProperty("write.format.default", "orc")
.partitionedBy($"category")
.createOrReplace()
(3)append追加
df.writeTo("iceberg_hadoop.default.table1").append()
(4)動態(tài)分區(qū)覆蓋
df.writeTo("iceberg_hadoop.default.table1").overwritePartitions()
(5)靜態(tài)分區(qū)覆蓋
import spark.implicits._
df.writeTo("iceberg_hadoop.default.table1").overwrite($"category" === "c")
(6)插入分區(qū)表且分區(qū)內(nèi)排序
df.sortWithinPartitions("category")
.writeTo("iceberg_hadoop.default.table1")
.append()
維護表
(1)獲取Table對象
1)HadoopCatalog
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
val conf = new Configuration()
val catalog = new HadoopCatalog(conf,"hdfs://hadoop1:8020/warehouse/spark-iceberg")
val table: Table = catalog.loadTable(TableIdentifier.of("db","table1"))
2)HiveCatalog
import org.apache.iceberg.hive.HiveCatalog;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;
val catalog = new HiveCatalog()
catalog.setConf(spark.sparkContext.hadoopConfiguration)
val properties = new util.HashMap[String,String]()
properties.put("warehouse", "hdfs://hadoop1:8020/warehouse/spark-iceberg")
properties.put("uri", "thrift://hadoop1:9083")
catalog.initialize("hive", properties)
val table: Table = catalog.loadTable(TableIdentifier.of("db", "table1"))
(2)快照過期清理
每次寫入Iceberg表都會創(chuàng)建一個表的新快照或版本??煺湛梢杂糜跁r間旅行查詢,或者可以將表回滾到任何有效的快照。建議設(shè)置快照過期時間,過期的舊快照將從元數(shù)據(jù)中刪除(不再可用于時間旅行查詢)。
// 1天過期時間
val tsToExpire: Long = System.currentTimeMillis() - (1000 * 60 * 60 * 24)
table.expireSnapshots()
.expireOlderThan(tsToExpire)
.commit()
或使用SparkActions來設(shè)置過期:
//SparkActions可以并行運行大型表的表過期設(shè)置
SparkActions.get()
.expireSnapshots(table)
.expireOlderThan(tsToExpire)
.execute()
(3)刪除無效文件
在Spark和其他分布式處理引擎中,任務(wù)或作業(yè)失敗可能會留下未被表元數(shù)據(jù)引用的文件,在某些情況下,正常的快照過期可能無法確定不再需要并刪除該文件。
SparkActions
.get()
.deleteOrphanFiles(table)
.execute()
(4)合并小文件
數(shù)據(jù)文件過多會導(dǎo)致更多的元數(shù)據(jù)存儲在清單文件中,而較小的數(shù)據(jù)文件會導(dǎo)致不必要的元數(shù)據(jù)量和更低效率的文件打開成本。
SparkActions
.get()
.rewriteDataFiles(table)
.filter(Expressions.equal("category", "a"))
.option("target-file-size-bytes", 1024L.toString) //1KB
.execute()
與 Flink SQL 集成
Apache Iceberg同時支持Apache Flink的DataStream API和Table API。
環(huán)境準(zhǔn)備
(1)安裝 Flink
1)Flink與Iceberg的版本對應(yīng)關(guān)系如下
Flink 版本 | Iceberg 版本 |
---|---|
1.11 | 0.9.0 – 0.12.1 |
1.12 | 0.12.0 – 0.13.1 |
1.13 | 0.13.0 – 1.0.0 |
1.14 | 0.13.0 – 1.1.0 |
1.15 | 0.14.0 – 1.1.0 |
1.16 | 1.1.0 – 1.1.0 |
2)上傳并解壓Flink安裝包
tar -zxvf flink-1.16.0-bin-scala_2.12.tgz -C /opt/module/
3)配置環(huán)境變量
sudo vim /etc/profile.d/my_env.sh
export HADOOP_CLASSPATH=`hadoop classpath`
source /etc/profile.d/my_env.sh
4)拷貝iceberg的jar包到Flink的lib目錄
cp /opt/software/iceberg/iceberg-flink-runtime-1.16-1.1.0.jar /opt/module/flink-1.16.0/lib
(2)啟動 Hadoop
(3)啟動 sql-client
1)修改flink-conf.yaml配置
vim /opt/module/flink-1.16.0/conf/flink-conf.yaml
classloader.check-leaked-classloader: false
taskmanager.numberOfTaskSlots: 4
state.backend: rocksdb
execution.checkpointing.interval: 30000
state.checkpoints.dir: hdfs://hadoop1:8020/ckps
state.backend.incremental: true
2)local模式
(1)修改workers
vim /opt/module/flink-1.16.0/conf/workers
#表示:會在本地啟動3個TaskManager的 local集群
localhost
localhost
localhost
(2)啟動Flink
/opt/module/flink-1.16.0/bin/start-cluster.sh
查看webui:http://hadoop1:8081
(3)啟動Flink的sql-client
/opt/module/flink-1.16.0/bin/sql-client.sh embedded
創(chuàng)建和使用 Catalog
語法說明
CREATE CATALOG <catalog_name> WITH (
'type'='iceberg',
`<config_key>`=`<config_value>`
);
-
type: 必須是iceberg。(必須)
-
catalog-type: 內(nèi)置了hive和hadoop兩種catalog,也可以使用catalog-impl來自定義catalog。(可選)
-
catalog-impl: 自定義catalog實現(xiàn)的全限定類名。如果未設(shè)置catalog-type,則必須設(shè)置。(可選)
-
property-version: 描述屬性版本的版本號。此屬性可用于向后兼容,以防屬性格式更改。當(dāng)前屬性版本為1。(可選)
-
cache-enabled: 是否啟用目錄緩存,默認值為true。(可選)
-
cache.expiration-interval-ms: 本地緩存catalog條目的時間(以毫秒為單位);負值,如-1表示沒有時間限制,不允許設(shè)為0。默認值為-1。(可選)
Hive Catalog
(1)上傳hive connector到flink的lib中
cp flink-sql-connector-hive-3.1.2_2.12-1.16.0.jar /opt/module/flink-1.16.0/lib/
(2)啟動hive metastore服務(wù)
hive --service metastore
(3)創(chuàng)建hive catalog
重啟flink集群,重新進入sql-client
CREATE CATALOG hive_catalog WITH (
'type'='iceberg',
'catalog-type'='hive',
'uri'='thrift://hadoop1:9083',
'clients'='5',
'property-version'='1',
'warehouse'='hdfs://hadoop1:8020/warehouse/iceberg-hive'
);
use catalog hive_catalog;
use catalog hive_catalog;
-
uri: Hive metastore的thrift uri。(必選)
-
clients:Hive metastore客戶端池大小,默認為2。(可選)
-
warehouse: 數(shù)倉目錄。
-
hive-conf-dir:包含hive-site.xml配置文件的目錄路徑,hive-site.xml中hive.metastore.warehouse.dir 的值會被warehouse覆蓋。
-
hadoop-conf-dir:包含core-site.xml和hdfs-site.xml配置文件的目錄路徑。
Hadoop Catalog
Iceberg還支持HDFS中基于目錄的catalog,可以使用’catalog-type’='hadoop’配置。
CREATE CATALOG hadoop_catalog WITH (
'type'='iceberg',
'catalog-type'='hadoop',
'warehouse'='hdfs://hadoop1:8020/warehouse/iceberg-hadoop',
'property-version'='1'
);
use catalog hadoop_catalog;
- warehouse:存放元數(shù)據(jù)文件和數(shù)據(jù)文件的HDFS目錄。(必需)
配置sql-client初始化文件
vim /opt/module/flink-1.16.0/conf/sql-client-init.sql
CREATE CATALOG hive_catalog WITH (
'type'='iceberg',
'catalog-type'='hive',
'uri'='thrift://hadoop1:9083',
'warehouse'='hdfs://hadoop1:8020/warehouse/iceberg-hive'
);
USE CATALOG hive_catalog;
后續(xù)啟動sql-client時,加上 -i sql文件路徑 即可完成catalog的初始化。
/opt/module/flink-1.16.0/bin/sql-client.sh embedded -i conf/sql-client-init.sql
DDL 語句
創(chuàng)建數(shù)據(jù)庫
CREATE DATABASE iceberg_db;
USE iceberg_db;
創(chuàng)建表
CREATE TABLE `hive_catalog`.`default`.`sample` (
id BIGINT COMMENT 'unique id',
data STRING
);
建表命令現(xiàn)在支持最常用的flink建表語法,包括:
-
PARTITION BY (column1, column2, …):配置分區(qū),apache flink還不支持隱藏分區(qū)。
-
COMMENT ‘table document’:指定表的備注
-
WITH (‘key’=‘value’, …):設(shè)置表屬性
目前,不支持計算列、watermark(支持主鍵)。
(1)創(chuàng)建分區(qū)表
CREATE TABLE `hive_catalog`.`default`.`sample` (
id BIGINT COMMENT 'unique id',
data STRING
) PARTITIONED BY (data);
Apache Iceberg支持隱藏分區(qū),但Apache flink不支持在列上通過函數(shù)進行分區(qū),現(xiàn)在無法在flink DDL中支持隱藏分區(qū)。
(2)使用LIKE語法建表
LIKE語法用于創(chuàng)建一個與另一個表具有相同schema、分區(qū)和屬性的表。
CREATE TABLE `hive_catalog`.`default`.`sample` (
id BIGINT COMMENT 'unique id',
data STRING
);
CREATE TABLE `hive_catalog`.`default`.`sample_like` LIKE `hive_catalog`.`default`.`sample`;
修改表
(1)修改表屬性
ALTER TABLE `hive_catalog`.`default`.`sample` SET ('write.format.default'='avro');
(2)修改表名
ALTER TABLE `hive_catalog`.`default`.`sample` RENAME TO `hive_catalog`.`default`.`new_sample`;
刪除表
DROP TABLE `hive_catalog`.`default`.`sample`;
插入語句
INSERT INTO
INSERT INTO `hive_catalog`.`default`.`sample` VALUES (1, 'a');
INSERT INTO `hive_catalog`.`default`.`sample` SELECT id, data from sample2;
INSERT OVERWRITE
僅支持Flink的Batch模式:
SET execution.runtime-mode = batch;
INSERT OVERWRITE sample VALUES (1, 'a');
INSERT OVERWRITE `hive_catalog`.`default`.`sample` PARTITION(data='a') SELECT 6;
UPSERT
當(dāng)將數(shù)據(jù)寫入v2表格式時,Iceberg支持基于主鍵的UPSERT。有兩種方法可以啟用upsert。
(1)建表時指定
CREATE TABLE `hive_catalog`.`test1`.`sample5` (
`id` INT UNIQUE COMMENT 'unique id',
`data` STRING NOT NULL,
PRIMARY KEY(`id`) NOT ENFORCED
) with (
'format-version'='2',
'write.upsert.enabled'='true'
);
(2)插入時指定
INSERT INTO tableName /*+ OPTIONS('upsert-enabled'='true') */
...
插入的表,format-version需要為2。
OVERWRITE和UPSERT不能同時設(shè)置。在UPSERT模式下,如果對表進行分區(qū),則分區(qū)字段必須也是主鍵。
(3)讀取Kafka流,upsert插入到iceberg表中
create table default_catalog.default_database.kafka(
id int,
data string
) with (
'connector' = 'kafka'
,'topic' = 'test111'
,'properties.zookeeper.connect' = 'hadoop1:2181'
,'properties.bootstrap.servers' = 'hadoop1:9092'
,'format' = 'json'
,'properties.group.id'='iceberg'
,'scan.startup.mode'='earliest-offset'
);
INSERT INTO hive_catalog.test1.sample5 SELECT * FROM default_catalog.default_database.kafka;
查詢語句
Iceberg支持Flink的流式和批量讀取。
Batch模式
SET execution.runtime-mode = batch;
select * from sample;
Streaming模式
SET execution.runtime-mode = streaming;
SET table.dynamic-table-options.enabled=true;
SET sql-client.execution.result-mode=tableau;
(1)從當(dāng)前快照讀取所有記錄,然后從該快照讀取增量數(shù)據(jù)
SELECT * FROM sample5 /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/ ;
(2)讀取指定快照id(不包含)后的增量數(shù)據(jù)
SELECT * FROM sample /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'start-snapshot-id'='3821550127947089987')*/ ;
-
monitor-interval: 連續(xù)監(jiān)控新提交數(shù)據(jù)文件的時間間隔(默認為10s)。
-
start-snapshot-id: 流作業(yè)開始的快照id。
**注意:**如果是無界數(shù)據(jù)流式upsert進iceberg表(讀kafka,upsert進iceberg表),那么再去流讀iceberg表會存在讀不出數(shù)據(jù)的問題。如果無界數(shù)據(jù)流式append進iceberg表(讀kafka,append進iceberg表),那么流讀該iceberg表可以正常看到結(jié)果。
與Flink集成的不足
支持的特性 | Flink | 備注 |
---|---|---|
SQL create catalog | √ | |
SQL create database | √ | |
SQL create table | √ | |
SQL create table like | √ | |
SQL alter table | √ | 只支持修改表屬性,不支持更改列和分區(qū) |
SQL drop_table | √ | |
SQL select | √ | 支持流式和批處理模式 |
SQL insert into | √ | 支持流式和批處理模式 |
SQL insert overwrite | √ | |
DataStream read | √ | |
DataStream append | √ | |
DataStream overwrite | √ | |
Metadata tables | 支持Java API,不支持Flink SQL | |
Rewrite files action | √ |
-
不支持創(chuàng)建隱藏分區(qū)的Iceberg表。
-
不支持創(chuàng)建帶有計算列的Iceberg表。
-
不支持創(chuàng)建帶watermark的Iceberg表。
-
不支持添加列,刪除列,重命名列,更改列。
-
Iceberg目前不支持Flink SQL 查詢表的元數(shù)據(jù)信息,需要使用Java API 實現(xiàn)。
與 Flink DataStream 集成
環(huán)境準(zhǔn)備
(1)配置pom文件
新建Maven工程,pom文件配置如下:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.atguigu.iceberg</groupId>
<artifactId>flink-iceberg-demo</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.16.0</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.12</scala.binary.version>
<slf4j.version>1.7.30</slf4j.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope> <!--不會打包到依賴中,只參與編譯,不參與運行 -->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-files</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!--idea運行時也有webui-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime-web</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-to-slf4j</artifactId>
<version>2.14.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.3</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-flink-runtime-1.16 -->
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-flink-runtime-1.16</artifactId>
<version>1.1.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>com.google.code.findbugs:jsr305</exclude>
<exclude>org.slf4j:*</exclude>
<exclude>log4j:*</exclude>
<exclude>org.apache.hadoop:*</exclude>
</excludes>
</artifactSet>
<filters>
<filter>
<!-- Do not copy the signatures in the META-INF folder.
Otherwise, this might cause SecurityExceptions when using the JAR. -->
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers combine.children="append">
<transformer
implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer">
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
(2)配置log4j
resources目錄下新建log4j.properties。
log4j.rootLogger=error,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
讀取數(shù)據(jù)
常規(guī)Source寫法
(1)Batch方式
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://hadoop1:8020/warehouse/spark-iceberg/default/a");
DataStream<RowData> batch = FlinkSource.forRowData()
.env(env)
.tableLoader(tableLoader)
.streaming(false)
.build();
batch.map(r -> Tuple2.of(r.getLong(0),r.getLong(1) ))
.returns(Types.TUPLE(Types.LONG,Types.LONG))
.print();
env.execute("Test Iceberg Read");
(2)Streaming方式
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://hadoop1:8020/warehouse/spark-iceberg/default/a");
DataStream<RowData> stream = FlinkSource.forRowData()
.env(env)
.tableLoader(tableLoader)
.streaming(true)
.startSnapshotId(3821550127947089987L)
.build();
stream.map(r -> Tuple2.of(r.getLong(0),r.getLong(1) ))
.returns(Types.TUPLE(Types.LONG,Types.LONG))
.print();
env.execute("Test Iceberg Read");
FLIP-27 Source寫法
(1)Batch方式
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://hadoop1:8020/warehouse/spark-iceberg/default/a");
IcebergSource<RowData> source1 = IcebergSource.forRowData()
.tableLoader(tableLoader)
.assignerFactory(new SimpleSplitAssignerFactory())
.build();
DataStream<RowData> batch = env.fromSource(
Source1,
WatermarkStrategy.noWatermarks(),
"My Iceberg Source",
TypeInformation.of(RowData.class));
batch.map(r -> Tuple2.of(r.getLong(0), r.getLong(1)))
.returns(Types.TUPLE(Types.LONG, Types.LONG))
.print();
env.execute("Test Iceberg Read");
(2)Streaming方式
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://hadoop1:8020/warehouse/spark-iceberg/default/a");
IcebergSource source2 = IcebergSource.forRowData()
.tableLoader(tableLoader)
.assignerFactory(new SimpleSplitAssignerFactory())
.streaming(true)
.streamingStartingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT)
.monitorInterval(Duration.ofSeconds(60))
.build();
DataStream<RowData> stream = env.fromSource(
Source2,
WatermarkStrategy.noWatermarks(),
"My Iceberg Source",
TypeInformation.of(RowData.class));
stream.map(r -> Tuple2.of(r.getLong(0), r.getLong(1)))
.returns(Types.TUPLE(Types.LONG, Types.LONG))
.print();
env.execute("Test Iceberg Read");
寫入數(shù)據(jù)
目前支持DataStream<RowData>和DataStream<Row>格式的數(shù)據(jù)流寫入Iceberg表。
(1)寫入方式支持 append、overwrite、upsert
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<RowData> input = env.fromElements("")
.map(new MapFunction<String, RowData>() {
@Override
public RowData map(String s) throws Exception {
GenericRowData genericRowData = new GenericRowData(2);
genericRowData.setField(0, 99L);
genericRowData.setField(1, 99L);
return genericRowData;
}
});
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://hadoop1:8020/warehouse/spark-iceberg/default/a");
FlinkSink.forRowData(input)
.tableLoader(tableLoader)
.append() // append方式
//.overwrite(true) // overwrite方式
//.upsert(true) // upsert方式
;
env.execute("Test Iceberg DataStream");
(2)寫入選項
FlinkSink.forRowData(input)
.tableLoader(tableLoader)
.set("write-format", "orc")
.set(FlinkWriteOptions.OVERWRITE_MODE, "true");
可配置選項如下:
選項 | 默認值 | 說明 |
---|---|---|
write-format | Parquet同write.format.default | 寫入操作使用的文件格式:Parquet, avro或orc |
target-file-size-bytes | 536870912(512MB)同write.target-file-size-bytes | 控制生成的文件的大小,目標(biāo)大約為這么多字節(jié) |
upsert-enabled | 同write.upsert.enabled, | |
overwrite-enabled | false | 覆蓋表的數(shù)據(jù),不能和UPSERT模式同時開啟 |
distribution-mode | None同 write.distribution-mode | 定義寫數(shù)據(jù)的分布方式: none:不打亂行; hash:按分區(qū)鍵散列分布;range:如果表有SortOrder,則通過分區(qū)鍵或排序鍵分配 |
compression-codec | 同 write.(fileformat).compression-codec | |
compression-level | 同 write.(fileformat).compression-level | |
compression-strategy | 同write.orc.compression-strategy |
合并小文件
Iceberg現(xiàn)在不支持在flink sql中檢查表,需要使用Iceberg提供的Java API來讀取元數(shù)據(jù)來獲得表信息??梢酝ㄟ^提交Flink批處理作業(yè)將小文件重寫為大文件:文章來源:http://www.zghlxwxcb.cn/news/detail-685960.html
import org.apache.iceberg.flink.actions.Actions;
// 1.獲取 Table對象
// 1.1 創(chuàng)建 catalog對象
Configuration conf = new Configuration();
HadoopCatalog hadoopCatalog = new HadoopCatalog(conf, "hdfs://hadoop1:8020/warehouse/spark-iceberg");
// 1.2 通過 catalog加載 Table對象
Table table = hadoopCatalog.loadTable(TableIdentifier.of("default", "a"));
// 有Table對象,就可以獲取元數(shù)據(jù)、進行維護表的操作
// System.out.println(table.history());
// System.out.println(table.expireSnapshots().expireOlderThan());
// 2.通過 Actions 來操作 合并
Actions.forTable(table)
.rewriteDataFiles()
.targetSizeInBytes(1024L)
.execute();
得到Table對象,就可以獲取元數(shù)據(jù)、進行維護表的操作。更多Iceberg提供的API操作,考:https://iceberg.apache.org/docs/latest/api/文章來源地址http://www.zghlxwxcb.cn/news/detail-685960.html
到了這里,關(guān)于數(shù)據(jù)湖Iceberg介紹和使用(集成Hive、SparkSQL、FlinkSQL)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!