- 注意事項:一般都是用基于Flink的Hive Catalog,使用HMS存儲表模型數(shù)據(jù)
1、集成方式
(1)下載jar包
iceberg-flink-runtime-1.14-1.0.0jar
flink-sql-connector-hive-2.3.6_2.12-1.11.2.jar
- 下載地址
https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/connectors/table/hive/overview/
(2)啟動FlinkSQL
①StandLone模式啟動
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
./bin/sql-client.sh embedded -j <flink-runtime-directory>/iceberg-flink-runtime-xxx.jar shell
②Flink On Yarn模式啟動
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
# 第一步 - 在Yarn集群上生成一個Standlone集群
./yarn-session.sh -s 2 -jm 2048 -tm 2048 -nm flinksql1 -d
# 第二步 - 指定yarn-session模式啟動sql-client
./sql-client.sh embedded -s yarn-session -j ../lib/iceberg-flink-runtime-1.14-0.14.1.jar shell
2、基本使用
2.1、創(chuàng)建catalog
- 核心:可創(chuàng)建hive、hadoop、自定義等目錄,創(chuàng)建模板如下
CREATE CATALOG <catalog_name> WITH (
'type'='iceberg',
`<config_key>`=`<config_value>`
);
-
type
: 必須的iceberg
。(必需的) -
catalog-type
:hive
或hadoop
用于內置目錄,或未設置用于使用 catalog-impl 的自定義目錄實現(xiàn)。(可選的) -
catalog-impl
:自定義目錄實現(xiàn)的完全限定類名。如果未設置,則必須catalog-type
設置。(可選的) -
property-version
: 描述屬性版本的版本號。如果屬性格式發(fā)生變化,此屬性可用于向后兼容。當前的屬性版本是1
. (可選的) -
cache-enabled
: 是否啟用目錄緩存,默認值為true
2.2、創(chuàng)建基于Hive的Catalog
(1)創(chuàng)建Catalog
CREATE CATALOG hive_iceberg WITH (
'type'='iceberg',
'catalog-type'='hive',
'uri'='thrift://leidi01:9083',
'clients'='5',
'property-version'='1',
'hive-conf-dir'='/usr/hdp/3.1.0.0-78/hive/conf'
);
show catalogs;
-
uri
: Hive 元存儲的 thrift URI。(必需的) -
clients
:Hive Metastore 客戶端池大小,默認值為 2。(可選) -
warehouse
:Hive 倉庫位置,如果既不設置hive-conf-dir
指定包含hive-site.xml
配置文件的位置也不添加正確hive-site.xml
的類路徑,用戶應指定此路徑。 -
hive-conf-dir``hive-site.xml
:包含將用于提供自定義 Hive 配置值的配置文件的目錄的路徑。如果同時設置和創(chuàng)建冰山目錄時,hive.metastore.warehouse.dir
from/hive-site.xml
(或來自類路徑的 hive 配置文件)的值將被該值覆蓋。warehouse``hive-conf-dir``warehouse
- 創(chuàng)建結果
(2)多客戶端共享驗證
- 客戶端一對應庫表
- 客戶端二可見對應庫表
2.3、創(chuàng)建基于Hadoop的calalog
(1)創(chuàng)建Catalog
CREATE CATALOG hadoop_catalog WITH (
'type'='iceberg',
'catalog-type'='hadoop',
'warehouse'='hdfs://leidi01:8020/warehouse/iceberg_catalog',
'property-version'='1'
);
-
warehouse
:HDFS目錄,存放元數(shù)據(jù)文件和數(shù)據(jù)文件。(必需的)
- 創(chuàng)建結果
2.4、其余創(chuàng)建方式
(1)創(chuàng)建自定義目錄
- 核心:通過指定
catalog-impl
屬性來加載自定義的 Iceberg實現(xiàn)
REATE CATALOG my_catalog WITH (
'type'='iceberg',
'catalog-impl'='com.my.custom.CatalogImpl',
'my-additional-catalog-config'='my-value'
);
(2)通過SQL文件創(chuàng)建目錄
-- define available catalogs
CREATE CATALOG hive_catalog WITH (
'type'='iceberg',
'catalog-type'='hive',
'uri'='thrift://leidi01:9083',
'warehouse'='hdfs://leidi01:8020/user/flink/iceberg'
);
USE CATALOG hive_catalog;
-
注意事項:
sql-client-defaults.yaml
在 flink 1.14 中刪除了該文件,需要初始化才能有文件。
3、Flink SQL語句
3.1、DDL語句
(1)建庫建表
use catalog iceberg;
CREATE DATABASE iceberg_db;
USE iceberg_db;
CREATE TABLE iceberg.iceberg_db.iceberg_001 (
id BIGINT COMMENT 'unique id',
data STRING
) WITH ('connector'='iceberg','write.format.default'='ORC');
(2)創(chuàng)建分區(qū)table
CREATE TABLE iceberg.iceberg_db.iceberg_003 (
id BIGINT COMMENT 'unique id',
data STRING
) PARTITIONED BY (data);
(3)更改table
--1、CREATE TABLE LIKE
CREATE TABLE `hive_catalog`.`default`.`sample` (
id BIGINT COMMENT 'unique id',
data STRING
);
CREATE TABLE `hive_catalog`.`default`.`sample_like` LIKE `hive_catalog`.`default`.`sample`
--2、alter table
ALTER TABLE `hive_catalog`.`default`.`sample` SET ('write.format.default'='avro')
--3、ALTER TABLE .. RENAME TO
ALTER TABLE `hive_catalog`.`default`.`sample` RENAME TO `hive_catalog`.`default`.`new_sample`;
--4、DROP TABLE
DROP TABLE `hive_catalog`.`default`.`sample`;
3.2、DML語句
(1)插入數(shù)據(jù)
- insert into
INSERT INTO `iceberg`.`iceberg_db`.`iceberg_001` VALUES (1, 'a');
--分區(qū)表插入語句
INSERT INTO `iceberg`.`iceberg_db`.`iceberg_001`() values(2,'b')
- insert overwrite
INSERT OVERWRITE sample VALUES (1, 'a');
(2)查詢數(shù)據(jù)
- 執(zhí)行類型:流模式 VS 批模式
-- Execute the flink job in streaming mode for current session context
SET execution.runtime-mode = streaming;
-- Execute the flink job in batch mode for current session context
SET execution.runtime-mode = batch;
Ⅰ、批量讀?。和ㄟ^提交 flink批處理作業(yè)來檢查 iceberg 表中的所有行
SET execution.runtime-mode = batch;
SELECT * FROM sample;
Ⅱ、流式讀?。褐С痔幚韽臍v史快照 id 開始的 flink 流作業(yè)中的增量數(shù)據(jù)
- monitor-interval:連續(xù)監(jiān)控新提交的數(shù)據(jù)文件的時間間隔(默認值:‘10s’)。
- start-snapshot-id:流作業(yè)開始的快照 id。
-- Submit the flink job in streaming mode for current session.
SET execution.runtime-mode = streaming;
-- Enable this switch because streaming read SQL will provide few job options in flink SQL hint options.
SET table.dynamic-table-options.enabled=true;
-- Read all the records from the iceberg current snapshot, and then read incremental data starting from that snapshot.
SELECT * FROM sample /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/ ;
-- Read all incremental data starting from the snapshot-id '3821550127947089987' (records from this snapshot will be excluded).
SELECT * FROM sample /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'start-snapshot-id'='3821550127947089987')*/ ;
(3)更新數(shù)據(jù)
-
前提:啟動更新模式
-
模式一:啟用
UPSERT
模式作為表級屬性write.upsert.enabled
CREATE TABLE `hive_catalog`.`default`.`sample` (
`id` INT UNIQUE COMMENT 'unique id',
`data` STRING NOT NULL,
PRIMARY KEY(`id`) NOT ENFORCED
) with ('format-version'='2', 'write.upsert.enabled'='true');
- 模式二:在
write options
中使用啟用UPSERT
模式upsert-enabled
提供了比表級配置更大的靈活性。
INSERT INTO tableName /*+ OPTIONS('upsert-enabled'='true') */
...
4、Flink集成Iceberg的Hadoop Catalog實戰(zhàn)案例
4.1、創(chuàng)建catalog的存儲格式
(1)創(chuàng)建Catalog
CREATE CATALOG hadoop_catalog WITH (
'type'='iceberg',
'catalog-type'='hadoop',
'warehouse'='hdfs://leidi01:8020/warehouse/iceberg_catalog',
'property-version'='1'
);
- 創(chuàng)捷結果:
一個catalog + 一個默認的default數(shù)據(jù)庫
(2)查看HDFS結構目錄
4.2、建庫建表
(1)建庫建表
create database hadoop_test;
use hadoop_test;
CREATE TABLE `hadoopdemo` (
> id BIGINT COMMENT 'unique id',
> data STRING
> );
- 創(chuàng)建結果
(2)查看對應HDFS目錄
- 驗證:
catalog為一級目錄、數(shù)據(jù)庫為二級目錄、表為三級目錄
,建Catalog、建庫、建表時沒有flink任務生成。
4.3、插入數(shù)據(jù)
(1)插入數(shù)據(jù)
INSERT INTO `iceberg`.`iceberg_db`.`iceberg_001` VALUES (1, 'a');
- 運行結果
(2)HDFS目錄
- 驗證結果:分別生成data和metadata兩個目錄
①data目錄文件結構
- 存儲:以parquent格式存儲的數(shù)據(jù)文件
②metadata目錄文件結構
- 存儲:metadata目錄存放元數(shù)據(jù)管理層的數(shù)據(jù),表的元數(shù)據(jù)是不可修改的,并且始終向前迭代;當前的快照可以回退。
- 文件詳述
文件名稱 | 文件描述 | 備注 |
---|---|---|
version[number].metadata.json | 存儲每個版本的數(shù)據(jù)更改項 | |
snap-[snapshotID]-[attemptID]-[commitUUID].avro | 存儲快照snapshot文件; | |
[commitUUID]-[attemptID]-[manifestCount].avro | 清單文件,每次更新操作都會產生清單文件 | |
version-hint.text |
5、Catalog設置相關
? Hive metastore 中的表可以表示加載 Iceberg 表的三種不同方式,具體取決于表的iceberg.catalog
屬性:
5.1、不指定任何Catalog類型,直接創(chuàng)建表
? 如果在Hive中創(chuàng)建Iceberg格式表時不指定Iceberg.catalog屬性,將使用HiveCatalog
與 Hive 環(huán)境中配置的 Metastore 相對應的表加載該表iceberg.catalog
,那么數(shù)據(jù)存儲在對應的Hive Warehouse路徑下。
-- 1、在Hive中創(chuàng)建Iceberg格式表
create table test_iceberg_tbl1(
id int,
name string,
age int)
partitioned by (dt string)
stored by 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler';
-- 2、在Hive中加載如下兩個包,在向Hive中插入數(shù)據(jù)時執(zhí)行MR程序時需要使用到
add jar /usr/hdp/3.1.0.0-78/hive/lib/iceberg-hive-runtime-0.14.1.jar
add jar /usr/hdp/3.1.0.0-78/hive/lib/libfb303-0.9.3.jar
-- 3、向表中插入數(shù)據(jù)
insert into test_iceberg_tbl1 values(1,"sz",18,"beijing")
-- 4、查詢表中數(shù)據(jù)
select * from test_iceberg_tbl1
- 查看表元數(shù)據(jù)存儲信息
5.2、iceberg.catalog
如果設置為Hive目錄名稱,將使用自定義目錄加載該表
? 在Hive中創(chuàng)建Iceberg格式表時,如果指定了iceberg.catalog屬性值,那么數(shù)據(jù)存儲在指定的catalog名稱對應配置的目錄下。
-- 1、注冊一個HiveCatalog叫another_hive
set iceberg.catalog.another_hive.type=hive;
SET iceberg.catalog.another_hive.uri=thrift://10.201.0.202:49153;
SET iceberg.catalog.another_hive.warehouse=s3a://faas-ethan/warehouse/;
SET hive.vectorized.execution.enabled=false;
-- 2、在Hive中創(chuàng)建iceberg格式表
create table test_iceberg_tbl2(
id int,
name string,
age int
)
partitioned by (dt string)
stored by 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
location 's3a://faas-ethan/warehouse/default/sample_hive_table_1'
tblproperties ('iceberg.catalog'='another_hive');
-- 3、插入數(shù)據(jù),并查詢
hive> insert into test_iceberg_tbl2 values (2,"ls",20,"20211212");
hive> select * from test_iceberg_tbl2;
- 查看本地HMS中表元數(shù)據(jù)存儲信息:
- 查看遠端HMS中表數(shù)據(jù)存儲信息
? 在Hive中創(chuàng)建Iceberg表,會在兩邊HMS分別存儲一份元數(shù)據(jù),只有這樣,遠端HMS中的Iceberg表才對本地HMS可見,所以必須保證遠端HMS存在對應的數(shù)據(jù)庫。
-
問題:如果只有遠端HMS的Iceberg表,如何在本地HMS訪問?
-
解決方案:通過如下創(chuàng)建external外表的形式在本地HMS生成元數(shù)據(jù)。
CREATE EXTERNAL TABLE default.sample_hive_table_1(
id bigint, name string
)
PARTITIONED BY(
dept string
)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
location 's3a://faas-ethan/warehouse/default/sample_hive_table_1'
TBLPROPERTIES ('iceberg.catalog'='another_hive');
- 震驚:通過以下Hive SQL實現(xiàn)了跨HMS的聯(lián)邦查詢?。。?/strong>
select * from default.sample_local_hive_table_1,sample_hive_table_1;
5.3、iceberg.catalog
如果設置為location_based_table,則可以使用表的根位置直接加載表location_based_table
? 如果HDFS中已經存在iceberg格式表,我們可以通過在Hive中創(chuàng)建Icerberg格式表指定對應的location路徑映射數(shù)據(jù)。
CREATE TABLE test_iceberg_tbl4 (
id int,
name string,
age int,
dt string
)STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
LOCATION 'hdfs://leidi01:8020/flinkiceberg/iceberg_db/flink_iceberg_tbl2'
TBLPROPERTIES ('iceberg.catalog'='location_based_table');
--指定的location路徑下必須是iceberg格式表數(shù)據(jù),并且需要有元數(shù)據(jù)目錄才可以。不能將其他數(shù)據(jù)映射到Hive iceberg格式表。
- 注意事項
? 由于Hive建表語句分區(qū)語法Partitioned by
的限制,如果使用Hive創(chuàng)建Iceberg格式表,目前只能按照Hive語法來寫,底層轉換成Iceberg標識分區(qū),這種情況下不能使用Iceberge的分區(qū)轉換,例如:days(timestamp),如果想要使用Iceberg格式表的分區(qū)轉換標識分區(qū),需要使用Spark或者Flink引擎創(chuàng)建表。
5.4、附加:注冊Hadoop類型的Catalog
SET iceberg.catalog.hadoop_cat.type=hadoop;
SET iceberg.catalog.hadoop_cat.warehouse=s3a://faas-ethan/warehouse;
- 使用Hadoop Catalog建表
CREATE TABLE default.sample_hadoop_table_1(
id bigint, name string
) PARTITIONED BY (
dept string
)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
LOCATION 's3a://faas-ethan/warehouse/default/sample_hadoop_table_1'
TBLPROPERTIES ('iceberg.catalog'='hadoop_cat');
- 查看HMS中表元數(shù)據(jù)存儲信息
? Hadoop Catalog
相比Hive Catalog
建立的表相比,少了metadata_location
屬性,同時元數(shù)據(jù)文件多了 version-hint.text
。文章來源:http://www.zghlxwxcb.cn/news/detail-477315.html
文章來源地址http://www.zghlxwxcb.cn/news/detail-477315.html
到了這里,關于第二章 Flink集成Iceberg的集成方式及基本SQL使用的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!