Apache Iceberg 支持 Apache Flink 的 DataStream API 和 Table API。
一、Flink支持的iceberg功能
功能支持 | Flink | 注意事項 |
---|---|---|
SQL create catalog | ?? | |
SQL create database | ?? | |
SQL create table | ?? | |
SQL create table like | ?? | |
SQL alter table | ?? | 僅支持更改表屬性,不支持列和分區(qū)更改 |
SQL drop_table | ?? | |
SQL select | ?? | 支持流式和批處理模式 |
SQL insert into | ?? | 支持流式和批處理模式 |
SQL insert overwrite | ?? | |
DataStream read | ?? | |
DataStream append | ?? | |
DataStream overwrite | ?? | |
Metadata tables | ?? | |
Rewrite files action | ?? |
二、使用Flink SQL Client時的準備
在 Flink 中創(chuàng)建 Iceberg 表,建議使用 Flink SQL Client,這樣用戶更容易理解概念。
從 Apache 下載頁面下載 Flink。 Iceberg 在編譯 Apache Iceberg-flink-runtime jar 時使用 Scala 2.12,因此建議使用與 Scala 2.12 捆綁在一起的 Flink 1.16。
FLINK_VERSION=1.16.1
SCALA_VERSION=2.12
APACHE_FLINK_URL=https://archive.apache.org/dist/flink/
wget ${APACHE_FLINK_URL}/flink-${FLINK_VERSION}/flink-${FLINK_VERSION}-bin-scala_${SCALA_VERSION}.tgz
tar xzvf flink-${FLINK_VERSION}-bin-scala_${SCALA_VERSION}.tgz
在 Hadoop 環(huán)境中啟動獨立的 Flink 集群:
# HADOOP_HOME is your hadoop root directory after unpack the binary package.
APACHE_HADOOP_URL=https://archive.apache.org/dist/hadoop/
HADOOP_VERSION=2.8.5
wget ${APACHE_HADOOP_URL}/common/hadoop-${HADOOP_VERSION}/hadoop-${HADOOP_VERSION}.tar.gz
tar xzvf hadoop-${HADOOP_VERSION}.tar.gz
HADOOP_HOME=`pwd`/hadoop-${HADOOP_VERSION}
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
# Start the flink standalone cluster
./bin/start-cluster.sh
啟動 Flink SQL 客戶端。 Iceberg項目中有一個單獨的flink-runtime模塊來生成捆綁的jar,可以直接由Flink SQL客戶端加載。要手動構建 flink-runtime 捆綁的 jar,請構建 Iceberg 項目,它將在 <iceberg-root-dir>/flink-runtime/build/libs 下生成 jar?;蛘邚?Apache 存儲庫下載 flink-runtime jar。
# HADOOP_HOME is your hadoop root directory after unpack the binary package.
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
./bin/sql-client.sh embedded -j <flink-runtime-directory>/iceberg-flink-runtime-1.16-1.3.0.jar shell
默認情況下,Iceberg 附帶用于 Hadoop 目錄的 Hadoop jar。要使用 Hive 目錄,請在打開 Flink SQL 客戶端時加載 Hive jar。幸運的是,F(xiàn)link 為 SQL 客戶端提供了捆綁的 Hive jar。有關如何下載依賴項并開始使用的示例:
# HADOOP_HOME is your hadoop root directory after unpack the binary package.
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`
ICEBERG_VERSION=1.3.0
MAVEN_URL=https://repo1.maven.org/maven2
ICEBERG_MAVEN_URL=${MAVEN_URL}/org/apache/iceberg
ICEBERG_PACKAGE=iceberg-flink-runtime
wget ${ICEBERG_MAVEN_URL}/${ICEBERG_PACKAGE}-${FLINK_VERSION_MAJOR}/${ICEBERG_VERSION}/${ICEBERG_PACKAGE}-${FLINK_VERSION_MAJOR}-${ICEBERG_VERSION}.jar -P lib/
HIVE_VERSION=2.3.9
SCALA_VERSION=2.12
FLINK_VERSION=1.16.1
FLINK_CONNECTOR_URL=${MAVEN_URL}/org/apache/flink
FLINK_CONNECTOR_PACKAGE=flink-sql-connector-hive
wget ${FLINK_CONNECTOR_URL}/${FLINK_CONNECTOR_PACKAGE}-${HIVE_VERSION}_${SCALA_VERSION}/${FLINK_VERSION}/${FLINK_CONNECTOR_PACKAGE}-${HIVE_VERSION}_${SCALA_VERSION}-${FLINK_VERSION}.jar
./bin/sql-client.sh embedded shell
三、Flink’s Python API
使用 pip 安裝 Apache Flink 依賴項:
pip install apache-flink==1.16.1
提供iceberg-flink-runtime jar的file://路徑,可以通過構建項目并查看/flink-runtime/build/libs獲得,或者從Apache官方下載存儲庫。第三方 jar 可以通過以下方式添加到 pyflink:
- env.add_jars(“文件:///my/jar/path/connector.jar”)
- table_env.get_config().get_configuration().set_string(“pipeline.jars”, “file:///my/jar/path/connector.jar”)
官方文檔中也提到了這一點。下面的示例使用 env.add_jars(…):
import os
from pyflink.datastream import StreamExecutionEnvironment
env = StreamExecutionEnvironment.get_execution_environment()
iceberg_flink_runtime_jar = os.path.join(os.getcwd(), "iceberg-flink-runtime-1.16-1.3.0.jar")
env.add_jars("file://{}".format(iceberg_flink_runtime_jar))
接下來,創(chuàng)建StreamTableEnvironment并執(zhí)行Flink SQL語句。以下示例展示了如何通過 Python Table API 創(chuàng)建自定義目錄:
from pyflink.table import StreamTableEnvironment
table_env = StreamTableEnvironment.create(env)
table_env.execute_sql("""
CREATE CATALOG my_catalog WITH (
'type'='iceberg',
'catalog-impl'='com.my.custom.CatalogImpl',
'my-additional-catalog-config'='my-value'
)
""")
運行查詢:
(table_env
.sql_query("SELECT PULocationID, DOLocationID, passenger_count FROM my_catalog.nyc.taxis LIMIT 5")
.execute()
.print())
+----+----------------------+----------------------+--------------------------------+
| op | PULocationID | DOLocationID | passenger_count |
+----+----------------------+----------------------+--------------------------------+
| +I | 249 | 48 | 1.0 |
| +I | 132 | 233 | 1.0 |
| +I | 164 | 107 | 1.0 |
| +I | 90 | 229 | 1.0 |
| +I | 137 | 249 | 1.0 |
+----+----------------------+----------------------+--------------------------------+
5 rows in set
四、添加目錄。
Flink 支持使用 Flink SQL 創(chuàng)建目錄。
目錄配置
通過執(zhí)行以下查詢來創(chuàng)建和命名目錄(將 <catalog_name> 替換為您的目錄名稱,將 <config_key>=<config_value> 替換為目錄實現(xiàn)配置):
CREATE CATALOG <catalog_name> WITH (
'type'='iceberg',
`<config_key>`=`<config_value>`
);
以下屬性可以全局設置,并且不限于特定的目錄實現(xiàn):
- type:必須是iceberg。 (必需的)
- catalog-type:hive、hadoop 或rest 用于內置目錄,或未設置以使用catalog-impl 實現(xiàn)自定義目錄。 (選修的)
- Catalog-impl:自定義目錄實現(xiàn)的完全限定類名。如果未設置目錄類型,則必須設置。 (選修的)
- property-version:描述屬性版本的版本號。如果屬性格式發(fā)生更改,此屬性可用于向后兼容。當前屬性版本為1。(可選)
- cache-enabled:是否啟用目錄緩存,默認值為true。 (選修的)
- cache.expiration-interval-ms:目錄條目在本地緩存多長時間,以毫秒為單位;負值如 -1 將禁用過期,值 0 不允許設置。默認值為-1。 (選修的)
五、Hive catalog
這將創(chuàng)建一個名為 hive_catalog 的 Iceberg 目錄,可以使用 ‘catalog-type’=‘hive’ 進行配置,該目錄從 Hive 元存儲加載表:
CREATE CATALOG hive_catalog WITH (
'type'='iceberg',
'catalog-type'='hive',
'uri'='thrift://localhost:9083',
'clients'='5',
'property-version'='1',
'warehouse'='hdfs://nn:8020/warehouse/path'
);
如果使用 Hive 目錄,可以設置以下屬性:
- uri:Hive 元存儲的 Thrift URI。 (必需的)
- client:Hive Metastore 客戶端池大小,默認值為 2。(可選)
- warehouse:Hive倉庫位置,如果既沒有設置hive-conf-dir來指定包含hive-site.xml配置文件的位置,也沒有在classpath中添加正確的hive-site.xml,則應指定此路徑。
- hive-conf-dir:包含 hive-site.xml 配置文件的目錄路徑,該文件將用于提供自定義 Hive 配置值。如果同時設置 hive-conf-dir 和倉庫,則 /hive-site.xml(或類路徑中的 hive 配置文件)中的 hive.metastore.warehouse.dir 值將被倉庫值覆蓋創(chuàng)建iceberg目錄。
- hadoop-conf-dir:包含 core-site.xml 和 hdfs-site.xml 配置文件的目錄路徑,這些文件將用于提供自定義 Hadoop 配置值。
創(chuàng)建表
CREATE TABLE `hive_catalog`.`default`.`sample` (
id BIGINT COMMENT 'unique id',
data STRING
);
寫數(shù)據(jù)
要將新數(shù)據(jù)附加到具有 Flink 流作業(yè)的表中,請使用 INSERT INTO:
INSERT INTO `hive_catalog`.`default`.`sample` VALUES (1, 'a');
INSERT INTO `hive_catalog`.`default`.`sample` SELECT id, data from other_kafka_table;
要將表中的數(shù)據(jù)替換為查詢結果,請在批處理作業(yè)中使用 INSERT OVERWRITE(flink 流作業(yè)不支持 INSERT OVERWRITE)。覆蓋是 Iceberg 表的原子操作。
具有 SELECT 查詢生成的行的分區(qū)將被替換,例如:
INSERT OVERWRITE `hive_catalog`.`default`.`sample` VALUES (1, 'a');
Iceberg 還支持通過選擇值覆蓋給定分區(qū):
INSERT OVERWRITE `hive_catalog`.`default`.`sample` PARTITION(data='a') SELECT 6;
Flink 原生支持將 DataStream 和 DataStream 寫入iceberg表。
StreamExecutionEnvironment env = ...;
DataStream<RowData> input = ... ;
Configuration hadoopConf = new Configuration();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path", hadoopConf);
FlinkSink.forRowData(input)
.tableLoader(tableLoader)
.append();
env.execute("Test Iceberg DataStream");
分支寫入
FlinkSink 中的 toBranch API 還支持寫入 Iceberg 表中的分支。
FlinkSink.forRowData(input)
.tableLoader(tableLoader)
.toBranch("audit-branch")
.append();
讀
使用以下語句提交 Flink 批處理作業(yè):
-- Execute the flink job in batch mode for current session context
SET execution.runtime-mode = batch;
SELECT * FROM `hive_catalog`.`default`.`sample`;
Iceberg 支持處理從歷史快照 ID 開始的 Flink 流作業(yè)中的增量數(shù)據(jù):
-- Submit the flink job in streaming mode for current session.
SET execution.runtime-mode = streaming;
-- Enable this switch because streaming read SQL will provide few job options in flink SQL hint options.
SET table.dynamic-table-options.enabled=true;
-- Read all the records from the iceberg current snapshot, and then read incremental data starting from that snapshot.
SELECT * FROM `hive_catalog`.`default`.`sample` /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/ ;
-- Read all incremental data starting from the snapshot-id '3821550127947089987' (records from this snapshot will be excluded).
SELECT * FROM `hive_catalog`.`default`.`sample` /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'start-snapshot-id'='3821550127947089987')*/ ;
SQL 也是檢查表的推薦方法。要查看表中的所有快照,請使用快照元數(shù)據(jù)表:
SELECT * FROM `hive_catalog`.`default`.`sample`.`snapshots`
Iceberg支持Java API中的流式或批量讀?。?/p>
DataStream<RowData> batch = FlinkSource.forRowData()
.env(env)
.tableLoader(tableLoader)
.streaming(false)
.build();
六、類型轉換
Iceberg 對 Flink 的集成會自動在 Flink 和 Iceberg 類型之間進行轉換。當寫入 Flink 不支持的類型(例如 UUID)的表時,Iceberg 將接受并轉換 Flink 類型的值。
Flink 到 Iceberg
Flink 類型按照下表轉換為 Iceberg 類型:
Flink | Iceberg | Notes |
---|---|---|
boolean | boolean | |
tinyint | integer | |
smallint | integer | |
integer | integer | |
bigint | long | |
float | float | |
double | double | |
char | string | |
varchar | string | |
string | string | |
binary | binary | |
varbinary | fixed | |
decimal | decimal | |
date | date | |
time | time | |
timestamp | timestamp without timezone | |
timestamp_ltz | timestamp with timezone | |
array | list | |
map | map | |
multiset | map | |
row | struct | |
raw | Not supported | |
interval | Not supported | |
structured | Not supported | |
timestamp with zone | Not supported | |
distinct | Not supported | |
null | Not supported | |
symbol | Not supported | |
logical | Not supported |
Iceberg to Flink
Iceberg 類型按照下表轉換為 Flink 類型:文章來源:http://www.zghlxwxcb.cn/news/detail-574260.html
Iceberg | Flink |
---|---|
boolean | boolean |
struct | row |
list | array |
map | map |
integer | integer |
long | bigint |
float | float |
double | double |
date | date |
time | time |
timestamp without timezone | timestamp(6) |
timestamp with timezone | timestamp_ltz(6) |
string | varchar(2147483647) |
uuid | binary(16) |
fixed(N) | binary(N) |
binary | varbinary(2147483647) |
decimal(P, S) | decimal(P, S) |
七、待支持的功能
目前的 Flink Iceberg 集成工作尚不支持一些功能:文章來源地址http://www.zghlxwxcb.cn/news/detail-574260.html
- 不支持創(chuàng)建隱藏分區(qū)的Iceberg表
- 不支持創(chuàng)建帶有計算列的Iceberg表
- 不支持創(chuàng)建帶水印的Iceberg表
- 不支持添加列、刪除列、重命名列、更改列,會在flink 1.18.0版本中支持
到了這里,關于Iceberg從入門到精通系列之十八:一篇文章深入了解Flink對Iceberg的支持的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!