国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Iceberg從入門到精通系列之十八:一篇文章深入了解Flink對Iceberg的支持

這篇具有很好參考價值的文章主要介紹了Iceberg從入門到精通系列之十八:一篇文章深入了解Flink對Iceberg的支持。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

Apache Iceberg 支持 Apache Flink 的 DataStream API 和 Table API。

一、Flink支持的iceberg功能

功能支持 Flink 注意事項
SQL create catalog ??
SQL create database ??
SQL create table ??
SQL create table like ??
SQL alter table ?? 僅支持更改表屬性,不支持列和分區(qū)更改
SQL drop_table ??
SQL select ?? 支持流式和批處理模式
SQL insert into ?? 支持流式和批處理模式
SQL insert overwrite ??
DataStream read ??
DataStream append ??
DataStream overwrite ??
Metadata tables ??
Rewrite files action ??

二、使用Flink SQL Client時的準備

在 Flink 中創(chuàng)建 Iceberg 表,建議使用 Flink SQL Client,這樣用戶更容易理解概念。

從 Apache 下載頁面下載 Flink。 Iceberg 在編譯 Apache Iceberg-flink-runtime jar 時使用 Scala 2.12,因此建議使用與 Scala 2.12 捆綁在一起的 Flink 1.16。

FLINK_VERSION=1.16.1
SCALA_VERSION=2.12
APACHE_FLINK_URL=https://archive.apache.org/dist/flink/
wget ${APACHE_FLINK_URL}/flink-${FLINK_VERSION}/flink-${FLINK_VERSION}-bin-scala_${SCALA_VERSION}.tgz
tar xzvf flink-${FLINK_VERSION}-bin-scala_${SCALA_VERSION}.tgz

在 Hadoop 環(huán)境中啟動獨立的 Flink 集群:

# HADOOP_HOME is your hadoop root directory after unpack the binary package.
APACHE_HADOOP_URL=https://archive.apache.org/dist/hadoop/
HADOOP_VERSION=2.8.5
wget ${APACHE_HADOOP_URL}/common/hadoop-${HADOOP_VERSION}/hadoop-${HADOOP_VERSION}.tar.gz
tar xzvf hadoop-${HADOOP_VERSION}.tar.gz
HADOOP_HOME=`pwd`/hadoop-${HADOOP_VERSION}

export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`

# Start the flink standalone cluster
./bin/start-cluster.sh

啟動 Flink SQL 客戶端。 Iceberg項目中有一個單獨的flink-runtime模塊來生成捆綁的jar,可以直接由Flink SQL客戶端加載。要手動構建 flink-runtime 捆綁的 jar,請構建 Iceberg 項目,它將在 <iceberg-root-dir>/flink-runtime/build/libs 下生成 jar?;蛘邚?Apache 存儲庫下載 flink-runtime jar。

# HADOOP_HOME is your hadoop root directory after unpack the binary package.
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`   

./bin/sql-client.sh embedded -j <flink-runtime-directory>/iceberg-flink-runtime-1.16-1.3.0.jar shell

默認情況下,Iceberg 附帶用于 Hadoop 目錄的 Hadoop jar。要使用 Hive 目錄,請在打開 Flink SQL 客戶端時加載 Hive jar。幸運的是,F(xiàn)link 為 SQL 客戶端提供了捆綁的 Hive jar。有關如何下載依賴項并開始使用的示例:

# HADOOP_HOME is your hadoop root directory after unpack the binary package.
export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`

ICEBERG_VERSION=1.3.0
MAVEN_URL=https://repo1.maven.org/maven2
ICEBERG_MAVEN_URL=${MAVEN_URL}/org/apache/iceberg
ICEBERG_PACKAGE=iceberg-flink-runtime
wget ${ICEBERG_MAVEN_URL}/${ICEBERG_PACKAGE}-${FLINK_VERSION_MAJOR}/${ICEBERG_VERSION}/${ICEBERG_PACKAGE}-${FLINK_VERSION_MAJOR}-${ICEBERG_VERSION}.jar -P lib/

HIVE_VERSION=2.3.9
SCALA_VERSION=2.12
FLINK_VERSION=1.16.1
FLINK_CONNECTOR_URL=${MAVEN_URL}/org/apache/flink
FLINK_CONNECTOR_PACKAGE=flink-sql-connector-hive
wget ${FLINK_CONNECTOR_URL}/${FLINK_CONNECTOR_PACKAGE}-${HIVE_VERSION}_${SCALA_VERSION}/${FLINK_VERSION}/${FLINK_CONNECTOR_PACKAGE}-${HIVE_VERSION}_${SCALA_VERSION}-${FLINK_VERSION}.jar

./bin/sql-client.sh embedded shell

三、Flink’s Python API

使用 pip 安裝 Apache Flink 依賴項:

pip install apache-flink==1.16.1

提供iceberg-flink-runtime jar的file://路徑,可以通過構建項目并查看/flink-runtime/build/libs獲得,或者從Apache官方下載存儲庫。第三方 jar 可以通過以下方式添加到 pyflink:

  • env.add_jars(“文件:///my/jar/path/connector.jar”)
  • table_env.get_config().get_configuration().set_string(“pipeline.jars”, “file:///my/jar/path/connector.jar”)

官方文檔中也提到了這一點。下面的示例使用 env.add_jars(…):

import os

from pyflink.datastream import StreamExecutionEnvironment

env = StreamExecutionEnvironment.get_execution_environment()
iceberg_flink_runtime_jar = os.path.join(os.getcwd(), "iceberg-flink-runtime-1.16-1.3.0.jar")

env.add_jars("file://{}".format(iceberg_flink_runtime_jar))

接下來,創(chuàng)建StreamTableEnvironment并執(zhí)行Flink SQL語句。以下示例展示了如何通過 Python Table API 創(chuàng)建自定義目錄:

from pyflink.table import StreamTableEnvironment
table_env = StreamTableEnvironment.create(env)
table_env.execute_sql("""
CREATE CATALOG my_catalog WITH (
    'type'='iceberg', 
    'catalog-impl'='com.my.custom.CatalogImpl',
    'my-additional-catalog-config'='my-value'
)
""")

運行查詢:

(table_env
    .sql_query("SELECT PULocationID, DOLocationID, passenger_count FROM my_catalog.nyc.taxis LIMIT 5")
    .execute()
    .print()) 
+----+----------------------+----------------------+--------------------------------+
| op |         PULocationID |         DOLocationID |                passenger_count |
+----+----------------------+----------------------+--------------------------------+
| +I |                  249 |                   48 |                            1.0 |
| +I |                  132 |                  233 |                            1.0 |
| +I |                  164 |                  107 |                            1.0 |
| +I |                   90 |                  229 |                            1.0 |
| +I |                  137 |                  249 |                            1.0 |
+----+----------------------+----------------------+--------------------------------+
5 rows in set

四、添加目錄。

Flink 支持使用 Flink SQL 創(chuàng)建目錄。

目錄配置

通過執(zhí)行以下查詢來創(chuàng)建和命名目錄(將 <catalog_name> 替換為您的目錄名稱,將 <config_key>=<config_value> 替換為目錄實現(xiàn)配置):

CREATE CATALOG <catalog_name> WITH (
  'type'='iceberg',
  `<config_key>`=`<config_value>`
); 

以下屬性可以全局設置,并且不限于特定的目錄實現(xiàn):

  • type:必須是iceberg。 (必需的)
  • catalog-type:hive、hadoop 或rest 用于內置目錄,或未設置以使用catalog-impl 實現(xiàn)自定義目錄。 (選修的)
  • Catalog-impl:自定義目錄實現(xiàn)的完全限定類名。如果未設置目錄類型,則必須設置。 (選修的)
  • property-version:描述屬性版本的版本號。如果屬性格式發(fā)生更改,此屬性可用于向后兼容。當前屬性版本為1。(可選)
  • cache-enabled:是否啟用目錄緩存,默認值為true。 (選修的)
  • cache.expiration-interval-ms:目錄條目在本地緩存多長時間,以毫秒為單位;負值如 -1 將禁用過期,值 0 不允許設置。默認值為-1。 (選修的)

五、Hive catalog

這將創(chuàng)建一個名為 hive_catalog 的 Iceberg 目錄,可以使用 ‘catalog-type’=‘hive’ 進行配置,該目錄從 Hive 元存儲加載表:

CREATE CATALOG hive_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hive',
  'uri'='thrift://localhost:9083',
  'clients'='5',
  'property-version'='1',
  'warehouse'='hdfs://nn:8020/warehouse/path'
);

如果使用 Hive 目錄,可以設置以下屬性:

  • uri:Hive 元存儲的 Thrift URI。 (必需的)
  • client:Hive Metastore 客戶端池大小,默認值為 2。(可選)
  • warehouse:Hive倉庫位置,如果既沒有設置hive-conf-dir來指定包含hive-site.xml配置文件的位置,也沒有在classpath中添加正確的hive-site.xml,則應指定此路徑。
  • hive-conf-dir:包含 hive-site.xml 配置文件的目錄路徑,該文件將用于提供自定義 Hive 配置值。如果同時設置 hive-conf-dir 和倉庫,則 /hive-site.xml(或類路徑中的 hive 配置文件)中的 hive.metastore.warehouse.dir 值將被倉庫值覆蓋創(chuàng)建iceberg目錄。
  • hadoop-conf-dir:包含 core-site.xml 和 hdfs-site.xml 配置文件的目錄路徑,這些文件將用于提供自定義 Hadoop 配置值。

創(chuàng)建表

CREATE TABLE `hive_catalog`.`default`.`sample` (
    id BIGINT COMMENT 'unique id',
    data STRING
);

寫數(shù)據(jù)

要將新數(shù)據(jù)附加到具有 Flink 流作業(yè)的表中,請使用 INSERT INTO:

INSERT INTO `hive_catalog`.`default`.`sample` VALUES (1, 'a');
INSERT INTO `hive_catalog`.`default`.`sample` SELECT id, data from other_kafka_table;

要將表中的數(shù)據(jù)替換為查詢結果,請在批處理作業(yè)中使用 INSERT OVERWRITE(flink 流作業(yè)不支持 INSERT OVERWRITE)。覆蓋是 Iceberg 表的原子操作。

具有 SELECT 查詢生成的行的分區(qū)將被替換,例如:

INSERT OVERWRITE `hive_catalog`.`default`.`sample` VALUES (1, 'a');

Iceberg 還支持通過選擇值覆蓋給定分區(qū):

INSERT OVERWRITE `hive_catalog`.`default`.`sample` PARTITION(data='a') SELECT 6;

Flink 原生支持將 DataStream 和 DataStream 寫入iceberg表。

StreamExecutionEnvironment env = ...;

DataStream<RowData> input = ... ;
Configuration hadoopConf = new Configuration();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path", hadoopConf);

FlinkSink.forRowData(input)
    .tableLoader(tableLoader)
    .append();

env.execute("Test Iceberg DataStream");

分支寫入

FlinkSink 中的 toBranch API 還支持寫入 Iceberg 表中的分支。

FlinkSink.forRowData(input)
    .tableLoader(tableLoader)
    .toBranch("audit-branch")
    .append();


使用以下語句提交 Flink 批處理作業(yè):

-- Execute the flink job in batch mode for current session context
SET execution.runtime-mode = batch;
SELECT * FROM `hive_catalog`.`default`.`sample`;

Iceberg 支持處理從歷史快照 ID 開始的 Flink 流作業(yè)中的增量數(shù)據(jù):

-- Submit the flink job in streaming mode for current session.
SET execution.runtime-mode = streaming;

-- Enable this switch because streaming read SQL will provide few job options in flink SQL hint options.
SET table.dynamic-table-options.enabled=true;

-- Read all the records from the iceberg current snapshot, and then read incremental data starting from that snapshot.
SELECT * FROM `hive_catalog`.`default`.`sample` /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/ ;

-- Read all incremental data starting from the snapshot-id '3821550127947089987' (records from this snapshot will be excluded).
SELECT * FROM `hive_catalog`.`default`.`sample` /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'start-snapshot-id'='3821550127947089987')*/ ;

SQL 也是檢查表的推薦方法。要查看表中的所有快照,請使用快照元數(shù)據(jù)表:

SELECT * FROM `hive_catalog`.`default`.`sample`.`snapshots`

Iceberg支持Java API中的流式或批量讀?。?/p>

DataStream<RowData> batch = FlinkSource.forRowData()
     .env(env)
     .tableLoader(tableLoader)
     .streaming(false)
     .build();

六、類型轉換

Iceberg 對 Flink 的集成會自動在 Flink 和 Iceberg 類型之間進行轉換。當寫入 Flink 不支持的類型(例如 UUID)的表時,Iceberg 將接受并轉換 Flink 類型的值。

Flink 到 Iceberg

Flink 類型按照下表轉換為 Iceberg 類型:

Flink Iceberg Notes
boolean boolean
tinyint integer
smallint integer
integer integer
bigint long
float float
double double
char string
varchar string
string string
binary binary
varbinary fixed
decimal decimal
date date
time time
timestamp timestamp without timezone
timestamp_ltz timestamp with timezone
array list
map map
multiset map
row struct
raw Not supported
interval Not supported
structured Not supported
timestamp with zone Not supported
distinct Not supported
null Not supported
symbol Not supported
logical Not supported

Iceberg to Flink

Iceberg 類型按照下表轉換為 Flink 類型:

Iceberg Flink
boolean boolean
struct row
list array
map map
integer integer
long bigint
float float
double double
date date
time time
timestamp without timezone timestamp(6)
timestamp with timezone timestamp_ltz(6)
string varchar(2147483647)
uuid binary(16)
fixed(N) binary(N)
binary varbinary(2147483647)
decimal(P, S) decimal(P, S)

七、待支持的功能

目前的 Flink Iceberg 集成工作尚不支持一些功能:文章來源地址http://www.zghlxwxcb.cn/news/detail-574260.html

  • 不支持創(chuàng)建隱藏分區(qū)的Iceberg表
  • 不支持創(chuàng)建帶有計算列的Iceberg表
  • 不支持創(chuàng)建帶水印的Iceberg表
  • 不支持添加列、刪除列、重命名列、更改列,會在flink 1.18.0版本中支持

到了這里,關于Iceberg從入門到精通系列之十八:一篇文章深入了解Flink對Iceberg的支持的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。如若轉載,請注明出處: 如若內容造成侵權/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領支付寶紅包贊助服務器費用

相關文章

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領取紅包,優(yōu)惠每天領

二維碼1

領取紅包

二維碼2

領紅包