国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

數(shù)據(jù)湖Iceberg介紹和使用(集成Hive、SparkSQL、FlinkSQL)

這篇具有很好參考價值的文章主要介紹了數(shù)據(jù)湖Iceberg介紹和使用(集成Hive、SparkSQL、FlinkSQL)。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

簡介

概述

為了解決數(shù)據(jù)存儲和計算引擎之間的適配的問題,Netflix開發(fā)了Iceberg,2018年11月16日進入Apache孵化器,2020 年5月19日從孵化器畢業(yè),成為Apache的頂級項目。

Iceberg是一個面向海量數(shù)據(jù)分析場景的開放表格式(Table Format)。表格式(Table Format)可以理解為元數(shù)據(jù)以及數(shù)據(jù)文件的一種組織方式,處于計算框架(Flink,Spark…)之下,數(shù)據(jù)文件之上。

作用

大數(shù)據(jù)領(lǐng)域發(fā)展至今已經(jīng)經(jīng)歷了相當(dāng)長時間的發(fā)展和探索,雖然大數(shù)據(jù)技術(shù)的出現(xiàn)和迭代降低了用戶處理海量數(shù)據(jù)的門檻,但是有一個問題不能忽視,數(shù)據(jù)格式對不同引擎適配的對接。

也就是說我們在使用不同的引擎進行計算時,需要將數(shù)據(jù)根據(jù)引擎進行適配。這是相當(dāng)棘手的問題。

為此出現(xiàn)了一種新的解決方案:介于上層計算引擎和底層存儲格式之間的一個中間層。這個中間層不是數(shù)據(jù)存儲的方式,只是定義了數(shù)據(jù)的元數(shù)據(jù)組織方式,并且向引擎層面提供統(tǒng)一的類似傳統(tǒng)數(shù)據(jù)庫中"表"的語義。它的底層仍然是Parquet、ORC等存儲格式?;诖?,Netflix開發(fā)了Iceberg,目前已經(jīng)是Apache的頂級項目。

特性

數(shù)據(jù)存儲、計算引擎插件化

Iceberg提供一個開放通用的表格式(Table Format)實現(xiàn)方案,不和特定的數(shù)據(jù)存儲、計算引擎綁定。目前大數(shù)據(jù)領(lǐng)域的常見數(shù)據(jù)存儲(HDFS、S3…),計算引擎(Flink、Spark…)都可以接入Iceberg。

在生產(chǎn)環(huán)境中,可選擇不同的組件搭使用。甚至可以不通過計算引擎,直接讀取存在文件系統(tǒng)上的數(shù)據(jù)。

實時流批一體

Iceberg上游組件將數(shù)據(jù)寫入完成后,下游組件及時可讀,可查詢??梢詽M足實時場景。并且Iceberg同時提供了流/批讀接口、流/批寫接口。可以在同一個流程里, 同時處理流數(shù)據(jù)和批數(shù)據(jù),大大簡化了ETL鏈路。

數(shù)據(jù)表演化(Table Evolution)

Iceberg可以通過SQL的方式進行表級別模式演進。進行這些操作的時候,代價極低。 不存在讀出數(shù)據(jù)重新寫入或者遷移數(shù)據(jù)這種費時費力的操作。

比如在常用的Hive中,如果我們需要把一個按天分區(qū)的表,改成按小時分區(qū)。此時,不能再原表之上直接修改,只能新建一個按小時分區(qū)的表,然后再把數(shù)據(jù)Insert到新的小時分區(qū)表。而且,即使我們通過Rename的命令把新表的名字改為原表,使用原表的上次層應(yīng)用, 也可能由于分區(qū)字段修改,導(dǎo)致需要修改 SQL,這樣花費的經(jīng)歷是非常繁瑣的。

模式演化(Schema Evolution)

Iceberg支持下面幾種模式演化:

  • ADD:向表或者嵌套結(jié)構(gòu)增加新列

  • Drop:從表中或者嵌套結(jié)構(gòu)中移除一列

  • Rename:重命名表中或者嵌套結(jié)構(gòu)中的一列

  • Update:將復(fù)雜結(jié)構(gòu)(struct, map<key, value>, list)中的基本類型擴展類型長度, 比如tinyint修改成int.

  • Reorder:改變列或者嵌套結(jié)構(gòu)中字段的排列順序

Iceberg保證模式演化(Schema Evolution)是沒有副作用的獨立操作流程, 一個元數(shù)據(jù)操作, 不會涉及到重寫數(shù)據(jù)文件的過程。具體的如下:

  • 增加列時候,不會從另外一個列中讀取已存在的的數(shù)據(jù)

  • 刪除列或者嵌套結(jié)構(gòu)中字段的時候,不會改變?nèi)魏纹渌械闹?/p>

  • 更新列或者嵌套結(jié)構(gòu)中字段的時候,不會改變?nèi)魏纹渌械闹?/p>

  • 改變列列或者嵌套結(jié)構(gòu)中字段順序的時候,不會改變相關(guān)聯(lián)的值

在表中Iceberg 使用唯一ID來定位每一列的信息。新增一個列的時候,會新分配給它一個唯一ID, 并且絕對不會使用已經(jīng)被使用的ID。

使用名稱或者位置信息來定位列的, 都會存在一些問題, 比如使用名稱的話,名稱可能會重復(fù), 使用位置的話, 不能修改順序并且廢棄的字段也不能刪除。

分區(qū)演化(Partition Evolution)

Iceberg可以在一個已存在的表上直接修改,因為Iceberg的查詢流程并不和分區(qū)信息直接關(guān)聯(lián)。

當(dāng)我們改變一個表的分區(qū)策略時,對應(yīng)修改分區(qū)之前的數(shù)據(jù)不會改變, 依然會采用老的分區(qū)策略,新的數(shù)據(jù)會采用新的分區(qū)策略,也就是說同一個表會有兩種分區(qū)策略,舊數(shù)據(jù)采用舊分區(qū)策略,新數(shù)據(jù)采用新新分區(qū)策略, 在元數(shù)據(jù)里兩個分區(qū)策略相互獨立,不重合。

在查詢數(shù)據(jù)的時候,如果存在跨分區(qū)策略的情況,則會解析成兩個不同執(zhí)行計劃,如Iceberg官網(wǎng)提供圖所示:

iceberg數(shù)據(jù)湖,大數(shù)據(jù),數(shù)據(jù)庫,hive,數(shù)據(jù)倉庫,數(shù)據(jù)湖,lceberg

圖中booking_table表2008年按月分區(qū),進入2009年后改為按天分區(qū),這種中分區(qū)策略共存于該表中。

借助Iceberg的隱藏分區(qū)(Hidden Partition),在寫SQL 查詢的時候,不需要在SQL中特別指定分區(qū)過濾條件,Iceberg會自動分區(qū),過濾掉不需要的數(shù)據(jù)。

Iceberg分區(qū)演化操作同樣是一個元數(shù)據(jù)操作, 不會重寫數(shù)據(jù)文件。

列順序演化(Sort Order Evolution)

Iceberg可以在一個已經(jīng)存在的表上修改排序策略。修改了排序策略之后, 舊數(shù)據(jù)依舊采用老排序策略不變。往Iceberg里寫數(shù)據(jù)的計算引擎總是會選擇最新的排序策略, 但是當(dāng)排序的代價極其高昂的時候, 就不進行排序了。

隱藏分區(qū)(Hidden Partition)

Iceberg的分區(qū)信息并不需要人工維護, 它可以被隱藏起來. 不同其他類似Hive 的分區(qū)策略, Iceberg的分區(qū)字段/策略(通過某一個字段計算出來),可以不是表的字段和表數(shù)據(jù)存儲目錄也沒有關(guān)系。在建表或者修改分區(qū)策略之后,新的數(shù)據(jù)會自動計算所屬于的分區(qū)。在查詢的時候同樣不用關(guān)系表的分區(qū)是什么字段/策略,只需要關(guān)注業(yè)務(wù)邏輯,Iceberg會自動過濾不需要的分區(qū)數(shù)據(jù)。

正是由于Iceberg的分區(qū)信息和表數(shù)據(jù)存儲目錄是獨立的,使得Iceberg的表分區(qū)可以被修改,而且不和涉及到數(shù)據(jù)遷移。

鏡像數(shù)據(jù)查詢(Time Travel)

Iceberg提供了查詢表歷史某一時間點數(shù)據(jù)鏡像(snapshot)的能力。通過該特性可以將最新的SQL邏輯,應(yīng)用到歷史數(shù)據(jù)上。

支持事務(wù)(ACID)

Iceberg通過提供事務(wù)(ACID)的機制,使其具備了upsert的能力并且使得邊寫邊讀成為可能,從而數(shù)據(jù)可以更快的被下游組件消費。通過事務(wù)保證了下游組件只能消費已commit的數(shù)據(jù),而不會讀到部分甚至未提交的數(shù)據(jù)。

基于樂觀鎖的并發(fā)支持

Iceberg基于樂觀鎖提供了多個程序并發(fā)寫入的能力并且保證數(shù)據(jù)線性一致。

文件級數(shù)據(jù)剪裁

Iceberg的元數(shù)據(jù)里面提供了每個數(shù)據(jù)文件的一些統(tǒng)計信息,比如最大值,最小值,Count計數(shù)等等。因此,查詢SQL的過濾條件除了常規(guī)的分區(qū),列過濾,甚至可以下推到文件級別,大大加快了查詢效率。

其他數(shù)據(jù)湖框架的對比

iceberg數(shù)據(jù)湖,大數(shù)據(jù),數(shù)據(jù)庫,hive,數(shù)據(jù)倉庫,數(shù)據(jù)湖,lceberg

iceberg數(shù)據(jù)湖,大數(shù)據(jù),數(shù)據(jù)庫,hive,數(shù)據(jù)倉庫,數(shù)據(jù)湖,lceberg

存儲結(jié)構(gòu)

iceberg數(shù)據(jù)湖,大數(shù)據(jù),數(shù)據(jù)庫,hive,數(shù)據(jù)倉庫,數(shù)據(jù)湖,lceberg

iceberg數(shù)據(jù)湖,大數(shù)據(jù),數(shù)據(jù)庫,hive,數(shù)據(jù)倉庫,數(shù)據(jù)湖,lceberg

數(shù)據(jù)文件 data files

數(shù)據(jù)文件是Apache Iceberg表真實存儲數(shù)據(jù)的文件,一般是在表的數(shù)據(jù)存儲目錄的data目錄下,如果我們的文件格式選擇的是parquet,那么文件是以“.parquet”結(jié)尾。

例如:00000-0-atguigu_20230203160458_22ee74c9-643f-4b27-8fc1-9cbd5f64dad4-job_1675409881387_0007-00001.parquet 就是一個數(shù)據(jù)文件。

Iceberg每次更新會產(chǎn)生多個數(shù)據(jù)文件(data files)。

表快照 Snapshot

快照代表一張表在某個時刻的狀態(tài)。每個快照里面會列出表在某個時刻的所有 data files 列表。data files是存儲在不同的manifest files里面,manifest files是存儲在一個Manifest list文件里面,而一個Manifest list文件代表一個快照。

清單列表 Manifest list

manifest list是一個元數(shù)據(jù)文件,它列出構(gòu)建表快照(Snapshot)的清單(Manifest file)。這個元數(shù)據(jù)文件中存儲的是Manifest file列表,每個Manifest file占據(jù)一行。每行中存儲了Manifest file的路徑、其存儲的數(shù)據(jù)文件(data files)的分區(qū)范圍,增加了幾個數(shù)文件、刪除了幾個數(shù)據(jù)文件等信息,這些信息可以用來在查詢時提供過濾,加快速度。

例如:snap-6746266566064388720-1-52f2f477-2585-4e69-be42-bbad9a46ed17.avro就是一個Manifest List文件。

清單文件 Manifest file

Manifest file也是一個元數(shù)據(jù)文件,它列出組成快照(snapshot)的數(shù)據(jù)文件(data files)的列表信息。每行都是每個數(shù)據(jù)文件的詳細描述,包括數(shù)據(jù)文件的狀態(tài)、文件路徑、分區(qū)信息、列級別的統(tǒng)計信息(比如每列的最大最小值、空值數(shù)等)、文件的大小以及文件里面數(shù)據(jù)行數(shù)等信息。其中列級別的統(tǒng)計信息可以在掃描表數(shù)據(jù)時過濾掉不必要的文件。

Manifest file是以avro格式進行存儲的,以“.avro”后綴結(jié)尾,例如:52f2f477-2585-4e69-be42-bbad9a46ed17-m0.avro。

與 Hive集成

環(huán)境準(zhǔn)備

(1)Hive與Iceberg的版本對應(yīng)關(guān)系如下

Hive 版本 官方推薦Hive版本 Iceberg 版本
2.x 2.3.8 0.8.0-incubating – 1.1.0
3.x 3.1.2 0.10.0 – 1.1.0

Iceberg與Hive 2和Hive 3.1.2/3的集成,支持以下特性:

  • 創(chuàng)建表

  • 刪除表

  • 讀取表

  • 插入表(INSERT into)

更多功能需要Hive 4.x(目前alpha版本)才能支持。

(2)上傳jar包,拷貝到Hive的auxlib目錄中

mkdir auxlib
cp iceberg-hive-runtime-1.1.0.jar /opt/module/hive/auxlib
cp libfb303-0.9.3.jar /opt/module/hive/auxlibcp iceberg-hive-runtime-1.1.0.jar /opt/module/hive/auxlibcp libfb303-0.9.3.jar /opt/module/hive/auxlib

(3)修改hive-site.xml,添加配置項

<property>
    <name>iceberg.engine.hive.enabled</name>
    <value>true</value>
</property>

<property>
    <name>hive.aux.jars.path</name>
    <value>/opt/module/hive/auxlib</value>
</property>

使用TEZ引擎注意事項:

  • 使用Hive版本>=3.1.2,需要TEZ版本>=0.10.1

  • 指定tez更新配置:

    <property>
        <name>tez.mrreader.config.update.properties</name>
        <value>hive.io.file.readcolumn.names,hive.io.file.readcolumn.ids</value>
    </property>
    
  • 從Iceberg 0.11.0開始,如果Hive使用Tez引擎,需要關(guān)閉向量化執(zhí)行:

    <property>
        <name>hive.vectorized.execution.enabled</name>
        <value>false</value>
    </property>
    

(4)啟動HMS服務(wù)

(5)啟動 Hadoop

創(chuàng)建和管理 Catalog

Iceberg支持多種不同的Catalog類型,例如:Hive、Hadoop、亞馬遜的AWS Glue和自定義Catalog。

根據(jù)不同配置,分為三種情況:

  • 沒有設(shè)置iceberg.catalog,默認使用HiveCatalog
配置項 說明
iceberg.catalog.<catalog_name>.type Catalog的類型: hive, hadoop, 如果使用自定義Catalog,則不設(shè)置
iceberg.catalog.<catalog_name>.catalog-impl Catalog的實現(xiàn)類, 如果上面的type沒有設(shè)置,則此參數(shù)必須設(shè)置
iceberg.catalog.<catalog_name>.<key> Catalog的其他配置項
  • 設(shè)置了 iceberg.catalog的類型,使用指定的Catalog類型,如下表格:

  • 設(shè)置 iceberg.catalog=location_based_table,直接通過指定的根路徑來加載Iceberg表

默認使用 HiveCatalog
CREATE TABLE iceberg_test1 (i int) STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler';
 
INSERT INTO iceberg_test1 values(1);

查看HDFS可以發(fā)現(xiàn),表目錄在默認的hive倉庫路徑下。

指定 Catalog 類型

(1)使用 HiveCatalog

set iceberg.catalog.iceberg_hive.type=hive;
set iceberg.catalog.iceberg_hive.uri=thrift://hadoop1:9083;
set iceberg.catalog.iceberg_hive.clients=10;
set iceberg.catalog.iceberg_hive.warehouse=hdfs://hadoop1:8020/warehouse/iceberg-hive;

CREATE TABLE iceberg_test2 (i int) 
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
TBLPROPERTIES('iceberg.catalog'='iceberg_hive');
 
INSERT INTO iceberg_test2 values(1);

(2)使用 HadoopCatalog

set iceberg.catalog.iceberg_hadoop.type=hadoop;
set iceberg.catalog.iceberg_hadoop.warehouse=hdfs://hadoop1:8020/warehouse/iceberg-hadoop;

CREATE TABLE iceberg_test3 (i int) 
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' 
LOCATION 'hdfs://hadoop1:8020/warehouse/iceberg-hadoop/default/iceberg_test3'
TBLPROPERTIES('iceberg.catalog'='iceberg_hadoop');

INSERT INTO iceberg_test3 values(1);
指定路徑加載

如果HDFS中已經(jīng)存在iceberg格式表,我們可以通過在Hive中創(chuàng)建Icerberg格式表指定對應(yīng)的location路徑映射數(shù)據(jù)。

CREATE EXTERNAL TABLE iceberg_test4 (i int)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
LOCATION 'hdfs://hadoop1:8020/warehouse/iceberg-hadoop/default/iceberg_test3'
TBLPROPERTIES ('iceberg.catalog'='location_based_table');

基本操作

創(chuàng)建表

(1)創(chuàng)建外部表

CREATE EXTERNAL TABLE iceberg_create1 (i int) 
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler';

describe formatted iceberg_create1;

(2)創(chuàng)建內(nèi)部表

CREATE TABLE iceberg_create2 (i int) 
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler';

describe formatted iceberg_create2;

(3)創(chuàng)建分區(qū)表

CREATE EXTERNAL TABLE iceberg_create3 (id int,name string)
PARTITIONED BY (age int)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler';

describe formatted iceberg_create3;

Hive語法創(chuàng)建分區(qū)表,不會在HMS中創(chuàng)建分區(qū),而是將分區(qū)數(shù)據(jù)轉(zhuǎn)換為Iceberg標(biāo)識分區(qū)。這種情況下不能使用Iceberg的分區(qū)轉(zhuǎn)換,例如:days(timestamp),如果想要使用Iceberg格式表的分區(qū)轉(zhuǎn)換標(biāo)識分區(qū),需要使用Spark或者Flink引擎創(chuàng)建表。

修改表

只支持HiveCatalog表修改表屬性,Iceberg表屬性和Hive表屬性存儲在HMS中是同步的。

ALTER TABLE iceberg_create1 SET TBLPROPERTIES('external.table.purge'='FALSE');
插入表

支持標(biāo)準(zhǔn)單表INSERT INTO操作:

INSERT INTO iceberg_create2 VALUES (1);
INSERT INTO iceberg_create1 select * from iceberg_create2;

在HIVE 3.x中,INSERT OVERWRITE雖然能執(zhí)行,但其實是追加。

刪除表
DROP TABLE iceberg_create1;

與 Spark SQL集成

環(huán)境準(zhǔn)備

(1)安裝 Spark

1)Spark與Iceberg的版本對應(yīng)關(guān)系如下

Spark 版本 Iceberg 版本
2.4 0.7.0-incubating – 1.1.0
3.0 0.9.0 – 1.0.0
3.1 0.12.0 – 1.1.0
3.2 0.13.0 – 1.1.0
3.3 0.14.0 – 1.1.0

2)上傳并解壓Spark安裝包

tar -zxvf spark-3.3.1-bin-hadoop3.tgz -C /opt/module/
mv /opt/module/spark-3.3.1-bin-hadoop3 /opt/module/spark-3.3.1

3)配置環(huán)境變量

sudo vim /etc/profile.d/my_env.sh

export SPARK_HOME=/opt/module/spark-3.3.1
export PATH=$PATH:$SPARK_HOME/bin

source /etc/profile.d/my_env.sh

4)拷貝iceberg的jar包到Spark的jars目錄

cp /opt/software/iceberg/iceberg-spark-runtime-3.3_2.12-1.1.0.jar /opt/module/spark-3.3.1/jars

(2)啟動 Hadoop

Spark 配置 Catalog

Spark中支持兩種Catalog的設(shè)置:hive和hadoop,Hive Catalog就是Iceberg表存儲使用Hive默認的數(shù)據(jù)路徑,Hadoop Catalog需要指定Iceberg格式表存儲路徑。

vim spark-defaults.conf
Hive Catalog
spark.sql.catalog.hive_prod = org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.hive_prod.type = hive
spark.sql.catalog.hive_prod.uri = thrift://hadoop1:9083

use hive_prod.db;
Hadoop Catalog
spark.sql.catalog.hadoop_prod = org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.hadoop_prod.type = hadoop
spark.sql.catalog.hadoop_prod.warehouse = hdfs://hadoop1:8020/warehouse/spark-iceberg

use hadoop_prod.db;

SQL 操作

創(chuàng)建表
use hadoop_prod;
create database default;
use default;

CREATE TABLE hadoop_prod.default.sample1 (
    id bigint COMMENT 'unique id',
    data string)
USING iceberg
  • PARTITIONED BY (partition-expressions) :配置分區(qū)

  • LOCATION ‘(fully-qualified-uri)’ :指定表路徑

  • COMMENT ‘table documentation’ :配置表備注

  • TBLPROPERTIES (‘key’=‘value’, …) :配置表屬性

表屬性:https://iceberg.apache.org/docs/latest/configuration/

對Iceberg表的每次更改都會生成一個新的元數(shù)據(jù)文件(json文件)以提供原子性。默認情況下,舊元數(shù)據(jù)文件作為歷史文件保存不會刪除。

如果要自動清除元數(shù)據(jù)文件,在表屬性中設(shè)置write.metadata.delete-after-commit.enabled=true。這將保留一些元數(shù)據(jù)文件(直到write.metadata.previous-versions-max),并在每個新創(chuàng)建的元數(shù)據(jù)文件之后刪除舊的元數(shù)據(jù)文件。

(1)創(chuàng)建分區(qū)表

1)分區(qū)表

CREATE TABLE hadoop_prod.default.sample2 (
    id bigint,
    data string,
    category string)
USING iceberg
PARTITIONED BY (category)

2)創(chuàng)建隱藏分區(qū)表

CREATE TABLE hadoop_prod.default.sample3 (
    id bigint,
    data string,
    category string,
    ts timestamp)
USING iceberg
PARTITIONED BY (bucket(16, id), days(ts), category)

支持的轉(zhuǎn)換有:

  • years(ts):按年劃分

  • months(ts):按月劃分

  • days(ts)或date(ts):等效于dateint分區(qū)

  • hours(ts)或date_hour(ts):等效于dateint和hour分區(qū)

  • bucket(N, col):按哈希值劃分mod N個桶

  • truncate(L, col):按截斷為L的值劃分

字符串被截斷為給定的長度

整型和長型截斷為bin: truncate(10, i)生成分區(qū)0,10,20,30,…

(2)使用 CTAS 語法建表

CREATE TABLE hadoop_prod.default.sample4
USING iceberg
AS SELECT * from hadoop_prod.default.sample3

不指定分區(qū)就是無分區(qū),需要重新指定分區(qū)、表屬性:

CREATE TABLE hadoop_prod.default.sample5
USING iceberg
PARTITIONED BY (bucket(8, id), hours(ts), category)
TBLPROPERTIES ('key'='value')
AS SELECT * from hadoop_prod.default.sample3

(3)使用 Replace table 建表

REPLACE TABLE hadoop_prod.default.sample5
USING iceberg
AS SELECT * from hadoop_prod.default.sample3

REPLACE TABLE hadoop_prod.default.sample5
USING iceberg
PARTITIONED BY (part)
TBLPROPERTIES ('key'='value')
AS SELECT * from hadoop_prod.default.sample3


CREATE OR REPLACE TABLE hadoop_prod.default.sample6
USING iceberg
AS SELECT * from hadoop_prod.default.sample3
刪除表

對于HadoopCatalog而言,運行DROP TABLE將從catalog中刪除表并刪除表內(nèi)容。

CREATE EXTERNAL TABLE hadoop_prod.default.sample7 (
    id bigint COMMENT 'unique id',
    data string)
USING iceberg

INSERT INTO hadoop_prod.default.sample7 values(1,'a')
DROP TABLE hadoop_prod.default.sample7

對于HiveCatalog而言:

  • 在0.14之前,運行DROP TABLE將從catalog中刪除表并刪除表內(nèi)容。

  • 從0.14開始,DROP TABLE只會從catalog中刪除表,不會刪除數(shù)據(jù)。為了刪除表內(nèi)容,應(yīng)該使用DROP table PURGE。

CREATE TABLE hive_prod.default.sample7 (
    id bigint COMMENT 'unique id',
    data string)
USING iceberg

INSERT INTO hive_prod.default.sample7 values(1,'a')

(1)刪除表

DROP TABLE hive_prod.default.sample7

(2)刪除表和數(shù)據(jù)

DROP TABLE hive_prod.default.sample7 PURGE
修改表

Iceberg在Spark 3中完全支持ALTER TABLE,包括:

  • 重命名表

  • 設(shè)置或刪除表屬性

  • 添加、刪除和重命名列

  • 添加、刪除和重命名嵌套字段

  • 重新排序頂級列和嵌套結(jié)構(gòu)字段

  • 擴大int、float和decimal字段的類型

  • 將必選列變?yōu)榭蛇x列

此外,還可以使用SQL擴展來添加對分區(qū)演變的支持和設(shè)置表的寫順序。

CREATE TABLE hive_prod.default.sample1 (
    id bigint COMMENT 'unique id',
    data string)
USING iceberg

(1)修改表名(不支持修改HadoopCatalog的表名)

ALTER TABLE hive_prod.default.sample1 RENAME TO hive_prod.default.sample2

(2)修改表屬性

  • 修改表屬性

    ALTER TABLE hive_prod.default.sample1 SET TBLPROPERTIES (
        'read.split.target-size'='268435456'
    )
    
    ALTER TABLE hive_prod.default.sample1 SET TBLPROPERTIES (
        'comment' = 'A table comment.'
    )
    
  • 刪除表屬性

    ALTER TABLE hive_prod.default.sample1 UNSET TBLPROPERTIES ('read.split.target-size')
    

(3)添加列

ALTER TABLE hadoop_prod.default.sample1
ADD COLUMNS (
    category string comment 'new_column'
)

-- 添加struct類型的列
ALTER TABLE hadoop_prod.default.sample1
ADD COLUMN point struct<x: double, y: double>;

-- 往struct類型的列中添加字段
ALTER TABLE hadoop_prod.default.sample1
ADD COLUMN point.z double

-- 創(chuàng)建struct的嵌套數(shù)組列
ALTER TABLE hadoop_prod.default.sample1
ADD COLUMN points array<struct<x: double, y: double>>;

-- 在數(shù)組中的結(jié)構(gòu)中添加一個字段。使用關(guān)鍵字'element'訪問數(shù)組的元素列。
ALTER TABLE hadoop_prod.default.sample1
ADD COLUMN points.element.z double

-- 創(chuàng)建一個包含Map類型的列,key和value都為struct類型
ALTER TABLE hadoop_prod.default.sample1
ADD COLUMN pointsm map<struct<x: int>, struct<a: int>>;

-- 在Map類型的value的struct中添加一個字段。
ALTER TABLE hadoop_prod.default.sample1
ADD COLUMN pointsm.value.b int

在Spark 2.4.4及以后版本中,可以通過添加FIRST或AFTER子句在任何位置添加列:

ALTER TABLE hadoop_prod.default.sample1
ADD COLUMN new_column1 bigint AFTER id

ALTER TABLE hadoop_prod.default.sample1
ADD COLUMN new_column2 bigint FIRST

(4)修改列

  • 修改列名

    ALTER TABLE hadoop_prod.default.sample1 RENAME COLUMN data TO data1
    
  • Alter Column修改類型(只允許安全的轉(zhuǎn)換)

    ALTER TABLE hadoop_prod.default.sample1
    ADD COLUMNS (
        idd int
      )
    ALTER TABLE hadoop_prod.default.sample1 ALTER COLUMN idd TYPE bigint
    
  • Alter Column 修改列的注釋

    ALTER TABLE hadoop_prod.default.sample1 ALTER COLUMN id TYPE double COMMENT 'a'
    ALTER TABLE hadoop_prod.default.sample1 ALTER COLUMN id COMMENT 'b'
    
  • Alter Column修改列的順序

    ALTER TABLE hadoop_prod.default.sample1 ALTER COLUMN id FIRST
    ALTER TABLE hadoop_prod.default.sample1 ALTER COLUMN new_column2 AFTER new_column1
    
  • Alter Column修改列是否允許為null

    ALTER TABLE hadoop_prod.default.sample1 ALTER COLUMN id DROP NOT NULL
    

    ALTER COLUMN不用于更新struct類型。使用ADD COLUMN和DROP COLUMN添加或刪除struct類型的字段。

(5)刪除列

ALTER TABLE hadoop_prod.default.sample1 DROP COLUMN idd
ALTER TABLE hadoop_prod.default.sample1 DROP COLUMN point.z

(6)添加分區(qū)(Spark3,需要配置擴展)

vim spark-default.conf
spark.sql.extensions = org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions

重新進入spark-sql shell:

ALTER TABLE hadoop_prod.default.sample1 ADD PARTITION FIELD category 

ALTER TABLE hadoop_prod.default.sample1 ADD PARTITION FIELD bucket(16, id)
ALTER TABLE hadoop_prod.default.sample1 ADD PARTITION FIELD truncate(data, 4)
ALTER TABLE hadoop_prod.default.sample1 ADD PARTITION FIELD years(ts)

ALTER TABLE hadoop_prod.default.sample1 ADD PARTITION FIELD bucket(16, id) AS shard

(7)刪除分區(qū)(Spark3,需要配置擴展)

ALTER TABLE hadoop_prod.default.sample1 DROP PARTITION FIELD category
ALTER TABLE hadoop_prod.default.sample1 DROP PARTITION FIELD bucket(16, id)
ALTER TABLE hadoop_prod.default.sample1 DROP PARTITION FIELD truncate(data, 4)
ALTER TABLE hadoop_prod.default.sample1 DROP PARTITION FIELD years(ts)
ALTER TABLE hadoop_prod.default.sample1 DROP PARTITION FIELD shard

注意,盡管刪除了分區(qū),但列仍然存在于表結(jié)構(gòu)中。

刪除分區(qū)字段是元數(shù)據(jù)操作,不會改變?nèi)魏维F(xiàn)有的表數(shù)據(jù)。新數(shù)據(jù)將被寫入新的分區(qū),但現(xiàn)有數(shù)據(jù)將保留在舊的分區(qū)布局中。

當(dāng)分區(qū)發(fā)生變化時,動態(tài)分區(qū)覆蓋行為也會發(fā)生變化。例如,如果按天劃分分區(qū),而改為按小時劃分分區(qū),那么覆蓋將覆蓋每小時劃分的分區(qū),而不再覆蓋按天劃分的分區(qū)。

刪除分區(qū)字段時要小心,可能導(dǎo)致元數(shù)據(jù)查詢失敗或產(chǎn)生不同的結(jié)果。

(8)修改分區(qū)(Spark3,需要配置擴展)

ALTER TABLE hadoop_prod.default.sample1 REPLACE PARTITION FIELD bucket(16, id) WITH bucket(8, id)

(9)修改表的寫入順序

ALTER TABLE hadoop_prod.default.sample1 WRITE ORDERED BY category, id
ALTER TABLE hadoop_prod.default.sample1 WRITE ORDERED BY category ASC, id DESC
ALTER TABLE hadoop_prod.default.sample1 WRITE ORDERED BY category ASC NULLS LAST, id DESC NULLS FIRST

表寫順序不能保證查詢的數(shù)據(jù)順序。它只影響數(shù)據(jù)寫入表的方式。

WRITE ORDERED BY設(shè)置了一個全局排序,即跨任務(wù)的行排序,就像在INSERT命令中使用ORDER BY一樣:

INSERT INTO hadoop_prod.default.sample1
SELECT id, data, category, ts FROM another_table
ORDER BY ts, category

要在每個任務(wù)內(nèi)排序,而不是跨任務(wù)排序,使用local ORDERED BY:

ALTER TABLE hadoop_prod.default.sample1 WRITE LOCALLY ORDERED BY category, id

(10)按分區(qū)并行寫入

ALTER TABLE hadoop_prod.default.sample1 WRITE DISTRIBUTED BY PARTITION
ALTER TABLE hadoop_prod.default.sample1 WRITE DISTRIBUTED BY PARTITION LOCALLY ORDERED BY category, id
插入數(shù)據(jù)
CREATE TABLE hadoop_prod.default.a (
    id bigint,
    count bigint)
USING iceberg

CREATE TABLE hadoop_prod.default.b (
    id bigint,
    count bigint,
    flag string)
USING iceberg

(1)Insert Into

INSERT INTO hadoop_prod.default.a VALUES (1, 1), (2, 2), (3, 3);
INSERT INTO hadoop_prod.default.b VALUES (1, 1, 'a'), (2, 2, 'b'), (4, 4, 'd');

(2)MERGE INTO行級更新

MERGE INTO hadoop_prod.default.a t 
USING (SELECT * FROM hadoop_prod.default.b) u ON t.id = u.id
WHEN MATCHED AND u.flag='b' THEN UPDATE SET t.count = t.count + u.count
WHEN MATCHED AND u.flag='a' THEN DELETE
WHEN NOT MATCHED THEN INSERT (id,count) values (u.id,u.count)
查詢數(shù)據(jù)

(1)普通查詢

SELECT count(1) as count, data
FROM local.db.table
GROUP BY data

(2)查詢元數(shù)據(jù)

// 查詢表快照
SELECT * FROM hadoop_prod.default.a.snapshots

// 查詢數(shù)據(jù)文件信息
SELECT * FROM hadoop_prod.default.a.files

// 查詢表歷史
SELECT * FROM hadoop_prod.default.a.history

// 查詢 manifest
ELECT * FROM hadoop_prod.default.a.manifests
存儲過程

Procedures可以通過CALL從任何已配置的Iceberg Catalog中使用。所有Procedures都在namespace中。

(1)語法

按照參數(shù)名傳參

CALL catalog_name.system.procedure_name(arg_name_2 => arg_2, arg_name_1 => arg_1)

當(dāng)按位置傳遞參數(shù)時,如果結(jié)束參數(shù)是可選的,則只有結(jié)束參數(shù)可以省略。

CALL catalog_name.system.procedure_name(arg_1, arg_2, ... arg_n)

(2)快照管理

  • 回滾到指定的快照id

    CALL hadoop_prod.system.rollback_to_snapshot('default.a', 7601163594701794741)
    
  • 回滾到指定時間的快照

    CALL hadoop_prod.system.rollback_to_timestamp('db.sample', TIMESTAMP '2021-06-30 00:00:00.000')
    
  • 設(shè)置表的當(dāng)前快照ID

    CALL hadoop_prod.system.set_current_snapshot('db.sample', 1)
    
  • 從快照變?yōu)楫?dāng)前表狀態(tài)

    CALL hadoop_prod.system.cherrypick_snapshot('default.a', 7629160535368763452)
    CALL hadoop_prod.system.cherrypick_snapshot(snapshot_id => 7629160535368763452, table => 'default.a' )
    

(3)元數(shù)據(jù)管理

  • 刪除早于指定日期和時間的快照,但保留最近100個快照:

    CALL hive_prod.system.expire_snapshots('db.sample', TIMESTAMP '2021-06-30 00:00:00.000', 100)
    
  • 刪除Iceberg表中任何元數(shù)據(jù)文件中沒有引用的文件

    #列出所有需要刪除的候選文件
    CALL catalog_name.system.remove_orphan_files(table => 'db.sample', dry_run => true)
    #刪除指定目錄中db.sample表不知道的任何文件
    CALL catalog_name.system.remove_orphan_files(table => 'db.sample', location => 'tablelocation/data')
    
  • 合并數(shù)據(jù)文件(合并小文件)

    CALL catalog_name.system.rewrite_data_files('db.sample')
    CALL catalog_name.system.rewrite_data_files(table => 'db.sample', strategy => 'sort', sort_order => 'id DESC NULLS LAST,name ASC NULLS FIRST')
    CALL catalog_name.system.rewrite_data_files(table => 'db.sample', strategy => 'sort', sort_order => 'zorder(c1,c2)')
    CALL catalog_name.system.rewrite_data_files(table => 'db.sample', options => map('min-input-files','2'))
    CALL catalog_name.system.rewrite_data_files(table => 'db.sample', where => 'id = 3 and name = "foo"')
    
  • 重寫表清單來優(yōu)化執(zhí)行計劃

    CALL catalog_name.system.rewrite_manifests('db.sample')
    
    #重寫表db中的清單。并禁用Spark緩存的使用。這樣做可以避免執(zhí)行程序上的內(nèi)存問題。
    CALL catalog_name.system.rewrite_manifests('db.sample', false)
    

(4)遷移表

  • 快照

    CALL catalog_name.system.snapshot('db.sample', 'db.snap')
    CALL catalog_name.system.snapshot('db.sample', 'db.snap', '/tmp/temptable/')
    
  • 遷移

    CALL catalog_name.system.migrate('spark_catalog.db.sample', map('foo', 'bar'))
    CALL catalog_name.system.migrate('db.sample')
    
  • 添加數(shù)據(jù)文件

    CALL spark_catalog.system.add_files(
        table => 'db.tbl',
        source_table => 'db.src_tbl',
        partition_filter => map('part_col_1', 'A')
    )
    
    CALL spark_catalog.system.add_files(
        table => 'db.tbl',
        source_table => '`parquet`.`path/to/table`'
    )
    

(5)元數(shù)據(jù)信息

  • 獲取指定快照的父快照id

    CALL spark_catalog.system.ancestors_of('db.tbl')
    
  • 獲取指定快照的所有祖先快照

    CALL spark_catalog.system.ancestors_of('db.tbl', 1)
    CALL spark_catalog.system.ancestors_of(snapshot_id => 1, table => 'db.tbl')
    

DataFrame 操作

環(huán)境準(zhǔn)備

(1)創(chuàng)建maven工程,配置pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.atguigu.iceberg</groupId>
    <artifactId>spark-iceberg-demo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <scala.binary.version>2.12</scala.binary.version>
        <spark.version>3.3.1</spark.version>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <!-- Spark的依賴引入 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.binary.version}</artifactId>
            <scope>provided</scope>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.binary.version}</artifactId>
            <scope>provided</scope>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_${scala.binary.version}</artifactId>
            <scope>provided</scope>
            <version>${spark.version}</version>
        </dependency>

        <!--fastjson <= 1.2.80 存在安全漏洞,-->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.83</version>
        </dependency>


        <!-- https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-spark-runtime-3.3 -->
        <dependency>
            <groupId>org.apache.iceberg</groupId>
            <artifactId>iceberg-spark-runtime-3.3_2.12</artifactId>
            <version>1.1.0</version>
        </dependency>


    </dependencies>

    <build>
        <plugins>
            <!-- assembly打包插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <archive>
                        <manifest>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
            </plugin>

            <!--Maven編譯scala所需依賴-->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <goals>
                            <goal>compile</goal>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

(2)配置Catalog

val spark: SparkSession = SparkSession.builder().master("local").appName(this.getClass.getSimpleName)
  //指定hive catalog, catalog名稱為iceberg_hive
  .config("spark.sql.catalog.iceberg_hive", "org.apache.iceberg.spark.SparkCatalog")
  .config("spark.sql.catalog.iceberg_hive.type", "hive")
  .config("spark.sql.catalog.iceberg_hive.uri", "thrift://hadoop1:9083")
  //    .config("iceberg.engine.hive.enabled", "true")
  //指定hadoop catalog,catalog名稱為iceberg_hadoop 
  .config("spark.sql.catalog.iceberg_hadoop", "org.apache.iceberg.spark.SparkCatalog")
  .config("spark.sql.catalog.iceberg_hadoop.type", "hadoop")
  .config("spark.sql.catalog.iceberg_hadoop.warehouse", "hdfs://hadoop1:8020/warehouse/spark-iceberg")
  .getOrCreate()
讀取表

(1)加載表

spark.read
.format("iceberg")
.load("hdfs://hadoop1:8020/warehouse/spark-iceberg/default/a")
.show()

// 僅支持Spark3.0以上
spark.table("iceberg_hadoop.default.a")
.show()

(2)時間旅行:指定時間查詢

spark.read
    .option("as-of-timestamp", "499162860000")
    .format("iceberg")
.load("hdfs://hadoop1:8020/warehouse/spark-iceberg/default/a")
.show()

(3)時間旅行:指定快照id查詢

spark.read
    .option("snapshot-id", 7601163594701794741L)
    .format("iceberg")
.load("hdfs://hadoop1:8020/warehouse/spark-iceberg/default/a")
.show()

(4)增量查詢

spark.read
.format("iceberg")
.option("start-snapshot-id", "10963874102873")
.option("end-snapshot-id", "63874143573109")
.load("hdfs://hadoop1:8020/warehouse/spark-iceberg/default/a")
.show()

查詢的表只能是append的方式寫數(shù)據(jù),不支持replace, overwrite, delete操作。

檢查表

(1)查詢元數(shù)據(jù)

spark.read.format("iceberg").load("iceberg_hadoop.default.a.files")
spark.read.format("iceberg").load("hdfs://hadoop1:8020/warehouse/spark-iceberg/default/a#files")

(2)元數(shù)據(jù)表時間旅行查詢

spark.read
.format("iceberg")
.option("snapshot-id", 7601163594701794741L)
.load("iceberg_hadoop.default.a.files")
寫入表

(1)創(chuàng)建樣例類,準(zhǔn)備DF

case class Sample(id:Int,data:String,category:String)

val df: DataFrame = spark.createDataFrame(Seq(Sample(1,'A', 'a'), Sample(2,'B', 'b'), Sample(3,'C', 'c')))

(2)插入數(shù)據(jù)并建表

df.writeTo("iceberg_hadoop.default.table1").create()

import spark.implicits._
df.writeTo("iceberg_hadoop.default.table1")
  .tableProperty("write.format.default", "orc")
  .partitionedBy($"category")
  .createOrReplace()

(3)append追加

df.writeTo("iceberg_hadoop.default.table1").append()

(4)動態(tài)分區(qū)覆蓋

df.writeTo("iceberg_hadoop.default.table1").overwritePartitions()

(5)靜態(tài)分區(qū)覆蓋

import spark.implicits._
df.writeTo("iceberg_hadoop.default.table1").overwrite($"category" === "c")

(6)插入分區(qū)表且分區(qū)內(nèi)排序

df.sortWithinPartitions("category")
    .writeTo("iceberg_hadoop.default.table1")
    .append()
維護表

(1)獲取Table對象

1)HadoopCatalog

import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.hadoop.HadoopCatalog;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;

val conf = new Configuration()
val catalog = new HadoopCatalog(conf,"hdfs://hadoop1:8020/warehouse/spark-iceberg")
val table: Table = catalog.loadTable(TableIdentifier.of("db","table1"))

2)HiveCatalog

import org.apache.iceberg.hive.HiveCatalog;
import org.apache.iceberg.Table;
import org.apache.iceberg.catalog.TableIdentifier;

val catalog = new HiveCatalog()
catalog.setConf(spark.sparkContext.hadoopConfiguration)

val properties = new util.HashMap[String,String]()
properties.put("warehouse", "hdfs://hadoop1:8020/warehouse/spark-iceberg")
properties.put("uri", "thrift://hadoop1:9083")

catalog.initialize("hive", properties)
val table: Table = catalog.loadTable(TableIdentifier.of("db", "table1"))

(2)快照過期清理

每次寫入Iceberg表都會創(chuàng)建一個表的新快照或版本??煺湛梢杂糜跁r間旅行查詢,或者可以將表回滾到任何有效的快照。建議設(shè)置快照過期時間,過期的舊快照將從元數(shù)據(jù)中刪除(不再可用于時間旅行查詢)。

// 1天過期時間
val tsToExpire: Long = System.currentTimeMillis() - (1000 * 60 * 60 * 24)

table.expireSnapshots()
  .expireOlderThan(tsToExpire)
  .commit()

或使用SparkActions來設(shè)置過期:

//SparkActions可以并行運行大型表的表過期設(shè)置
SparkActions.get()
  .expireSnapshots(table)
  .expireOlderThan(tsToExpire)
  .execute()

(3)刪除無效文件

在Spark和其他分布式處理引擎中,任務(wù)或作業(yè)失敗可能會留下未被表元數(shù)據(jù)引用的文件,在某些情況下,正常的快照過期可能無法確定不再需要并刪除該文件。

SparkActions
    .get()
    .deleteOrphanFiles(table)
    .execute()

(4)合并小文件

數(shù)據(jù)文件過多會導(dǎo)致更多的元數(shù)據(jù)存儲在清單文件中,而較小的數(shù)據(jù)文件會導(dǎo)致不必要的元數(shù)據(jù)量和更低效率的文件打開成本。

SparkActions
    .get()
    .rewriteDataFiles(table)
    .filter(Expressions.equal("category", "a"))
    .option("target-file-size-bytes", 1024L.toString) //1KB
    .execute()

與 Flink SQL 集成

Apache Iceberg同時支持Apache Flink的DataStream API和Table API。

環(huán)境準(zhǔn)備

(1)安裝 Flink

1)Flink與Iceberg的版本對應(yīng)關(guān)系如下

Flink 版本 Iceberg 版本
1.11 0.9.0 – 0.12.1
1.12 0.12.0 – 0.13.1
1.13 0.13.0 – 1.0.0
1.14 0.13.0 – 1.1.0
1.15 0.14.0 – 1.1.0
1.16 1.1.0 – 1.1.0

2)上傳并解壓Flink安裝包

tar -zxvf flink-1.16.0-bin-scala_2.12.tgz -C /opt/module/

3)配置環(huán)境變量

sudo vim /etc/profile.d/my_env.sh
export HADOOP_CLASSPATH=`hadoop classpath`
source /etc/profile.d/my_env.sh

4)拷貝iceberg的jar包到Flink的lib目錄

cp /opt/software/iceberg/iceberg-flink-runtime-1.16-1.1.0.jar /opt/module/flink-1.16.0/lib

(2)啟動 Hadoop

(3)啟動 sql-client

1)修改flink-conf.yaml配置

vim /opt/module/flink-1.16.0/conf/flink-conf.yaml

classloader.check-leaked-classloader: false
taskmanager.numberOfTaskSlots: 4

state.backend: rocksdb
execution.checkpointing.interval: 30000
state.checkpoints.dir: hdfs://hadoop1:8020/ckps
state.backend.incremental: true

2)local模式

(1)修改workers

vim /opt/module/flink-1.16.0/conf/workers
#表示:會在本地啟動3個TaskManager的 local集群
localhost
localhost
localhost

(2)啟動Flink

/opt/module/flink-1.16.0/bin/start-cluster.sh

查看webui:http://hadoop1:8081

(3)啟動Flink的sql-client

/opt/module/flink-1.16.0/bin/sql-client.sh embedded

創(chuàng)建和使用 Catalog

語法說明
CREATE CATALOG <catalog_name> WITH (
  'type'='iceberg',
  `<config_key>`=`<config_value>`
); 
  • type: 必須是iceberg。(必須)

  • catalog-type: 內(nèi)置了hive和hadoop兩種catalog,也可以使用catalog-impl來自定義catalog。(可選)

  • catalog-impl: 自定義catalog實現(xiàn)的全限定類名。如果未設(shè)置catalog-type,則必須設(shè)置。(可選)

  • property-version: 描述屬性版本的版本號。此屬性可用于向后兼容,以防屬性格式更改。當(dāng)前屬性版本為1。(可選)

  • cache-enabled: 是否啟用目錄緩存,默認值為true。(可選)

  • cache.expiration-interval-ms: 本地緩存catalog條目的時間(以毫秒為單位);負值,如-1表示沒有時間限制,不允許設(shè)為0。默認值為-1。(可選)

Hive Catalog

(1)上傳hive connector到flink的lib中

cp flink-sql-connector-hive-3.1.2_2.12-1.16.0.jar /opt/module/flink-1.16.0/lib/

(2)啟動hive metastore服務(wù)

hive --service metastore

(3)創(chuàng)建hive catalog

重啟flink集群,重新進入sql-client

CREATE CATALOG hive_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hive',
  'uri'='thrift://hadoop1:9083',
  'clients'='5',
  'property-version'='1',
  'warehouse'='hdfs://hadoop1:8020/warehouse/iceberg-hive'
);

use catalog hive_catalog;

use catalog hive_catalog;

  • uri: Hive metastore的thrift uri。(必選)

  • clients:Hive metastore客戶端池大小,默認為2。(可選)

  • warehouse: 數(shù)倉目錄。

  • hive-conf-dir:包含hive-site.xml配置文件的目錄路徑,hive-site.xml中hive.metastore.warehouse.dir 的值會被warehouse覆蓋。

  • hadoop-conf-dir:包含core-site.xml和hdfs-site.xml配置文件的目錄路徑。

Hadoop Catalog

Iceberg還支持HDFS中基于目錄的catalog,可以使用’catalog-type’='hadoop’配置。

CREATE CATALOG hadoop_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hadoop',
  'warehouse'='hdfs://hadoop1:8020/warehouse/iceberg-hadoop',
  'property-version'='1'
);

use catalog hadoop_catalog;
  • warehouse:存放元數(shù)據(jù)文件和數(shù)據(jù)文件的HDFS目錄。(必需)
配置sql-client初始化文件
vim /opt/module/flink-1.16.0/conf/sql-client-init.sql

CREATE CATALOG hive_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hive',
  'uri'='thrift://hadoop1:9083',
  'warehouse'='hdfs://hadoop1:8020/warehouse/iceberg-hive'
);

USE CATALOG hive_catalog;

后續(xù)啟動sql-client時,加上 -i sql文件路徑 即可完成catalog的初始化。

/opt/module/flink-1.16.0/bin/sql-client.sh embedded -i conf/sql-client-init.sql

DDL 語句

創(chuàng)建數(shù)據(jù)庫
CREATE DATABASE iceberg_db;
USE iceberg_db;
創(chuàng)建表
CREATE TABLE `hive_catalog`.`default`.`sample` (
    id BIGINT COMMENT 'unique id',
    data STRING
);

建表命令現(xiàn)在支持最常用的flink建表語法,包括:

  • PARTITION BY (column1, column2, …):配置分區(qū),apache flink還不支持隱藏分區(qū)。

  • COMMENT ‘table document’:指定表的備注

  • WITH (‘key’=‘value’, …):設(shè)置表屬性

目前,不支持計算列、watermark(支持主鍵)。

(1)創(chuàng)建分區(qū)表

CREATE TABLE `hive_catalog`.`default`.`sample` (
    id BIGINT COMMENT 'unique id',
    data STRING
) PARTITIONED BY (data);

Apache Iceberg支持隱藏分區(qū),但Apache flink不支持在列上通過函數(shù)進行分區(qū),現(xiàn)在無法在flink DDL中支持隱藏分區(qū)。

(2)使用LIKE語法建表

LIKE語法用于創(chuàng)建一個與另一個表具有相同schema、分區(qū)和屬性的表。

CREATE TABLE `hive_catalog`.`default`.`sample` (
    id BIGINT COMMENT 'unique id',
    data STRING
);

CREATE TABLE  `hive_catalog`.`default`.`sample_like` LIKE `hive_catalog`.`default`.`sample`;
修改表

(1)修改表屬性

ALTER TABLE `hive_catalog`.`default`.`sample` SET ('write.format.default'='avro');

(2)修改表名

ALTER TABLE `hive_catalog`.`default`.`sample` RENAME TO `hive_catalog`.`default`.`new_sample`;
刪除表
DROP TABLE `hive_catalog`.`default`.`sample`;

插入語句

INSERT INTO
INSERT INTO `hive_catalog`.`default`.`sample` VALUES (1, 'a');
INSERT INTO `hive_catalog`.`default`.`sample` SELECT id, data from sample2;
INSERT OVERWRITE

僅支持Flink的Batch模式:

SET execution.runtime-mode = batch;
INSERT OVERWRITE sample VALUES (1, 'a');
INSERT OVERWRITE `hive_catalog`.`default`.`sample` PARTITION(data='a') SELECT 6;
UPSERT

當(dāng)將數(shù)據(jù)寫入v2表格式時,Iceberg支持基于主鍵的UPSERT。有兩種方法可以啟用upsert。

(1)建表時指定

CREATE TABLE `hive_catalog`.`test1`.`sample5` (
    `id`  INT UNIQUE COMMENT 'unique id',
    `data` STRING NOT NULL,
    PRIMARY KEY(`id`) NOT ENFORCED
) with (
    'format-version'='2', 
    'write.upsert.enabled'='true'
);

(2)插入時指定

INSERT INTO tableName /*+ OPTIONS('upsert-enabled'='true') */
...

插入的表,format-version需要為2。

OVERWRITE和UPSERT不能同時設(shè)置。在UPSERT模式下,如果對表進行分區(qū),則分區(qū)字段必須也是主鍵。

(3)讀取Kafka流,upsert插入到iceberg表中

create table default_catalog.default_database.kafka(
    id int,
    data string
) with (
    'connector' = 'kafka'
    ,'topic' = 'test111'
    ,'properties.zookeeper.connect' = 'hadoop1:2181'
    ,'properties.bootstrap.servers' = 'hadoop1:9092'
    ,'format' = 'json'
    ,'properties.group.id'='iceberg'
    ,'scan.startup.mode'='earliest-offset'
);


INSERT INTO hive_catalog.test1.sample5 SELECT * FROM default_catalog.default_database.kafka;

查詢語句

Iceberg支持Flink的流式和批量讀取。

Batch模式
SET execution.runtime-mode = batch;
select * from sample;
Streaming模式
SET execution.runtime-mode = streaming;
SET table.dynamic-table-options.enabled=true;
SET sql-client.execution.result-mode=tableau;

(1)從當(dāng)前快照讀取所有記錄,然后從該快照讀取增量數(shù)據(jù)

SELECT * FROM sample5 /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/ ;

(2)讀取指定快照id(不包含)后的增量數(shù)據(jù)

SELECT * FROM sample /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'start-snapshot-id'='3821550127947089987')*/ ;
  • monitor-interval: 連續(xù)監(jiān)控新提交數(shù)據(jù)文件的時間間隔(默認為10s)。

  • start-snapshot-id: 流作業(yè)開始的快照id。

**注意:**如果是無界數(shù)據(jù)流式upsert進iceberg表(讀kafka,upsert進iceberg表),那么再去流讀iceberg表會存在讀不出數(shù)據(jù)的問題。如果無界數(shù)據(jù)流式append進iceberg表(讀kafka,append進iceberg表),那么流讀該iceberg表可以正常看到結(jié)果。

與Flink集成的不足

支持的特性 Flink 備注
SQL create catalog
SQL create database
SQL create table
SQL create table like
SQL alter table 只支持修改表屬性,不支持更改列和分區(qū)
SQL drop_table
SQL select 支持流式和批處理模式
SQL insert into 支持流式和批處理模式
SQL insert overwrite
DataStream read
DataStream append
DataStream overwrite
Metadata tables 支持Java API,不支持Flink SQL
Rewrite files action
  • 不支持創(chuàng)建隱藏分區(qū)的Iceberg表。

  • 不支持創(chuàng)建帶有計算列的Iceberg表。

  • 不支持創(chuàng)建帶watermark的Iceberg表。

  • 不支持添加列,刪除列,重命名列,更改列。

  • Iceberg目前不支持Flink SQL 查詢表的元數(shù)據(jù)信息,需要使用Java API 實現(xiàn)。

與 Flink DataStream 集成

環(huán)境準(zhǔn)備

(1)配置pom文件

新建Maven工程,pom文件配置如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.atguigu.iceberg</groupId>
    <artifactId>flink-iceberg-demo</artifactId>
    <version>1.0-SNAPSHOT</version>


    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink.version>1.16.0</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <slf4j.version>1.7.30</slf4j.version>
    </properties>

    <dependencies>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>   <!--不會打包到依賴中,只參與編譯,不參與運行 -->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-files</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <!--idea運行時也有webui-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-to-slf4j</artifactId>
            <version>2.14.0</version>
            <scope>provided</scope>
        </dependency>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-statebackend-rocksdb</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>3.1.3</version>
            <scope>provided</scope>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.iceberg/iceberg-flink-runtime-1.16 -->
        <dependency>
            <groupId>org.apache.iceberg</groupId>
            <artifactId>iceberg-flink-runtime-1.16</artifactId>
            <version>1.1.0</version>
        </dependency>

    </dependencies>


    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.4</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>log4j:*</exclude>
                                    <exclude>org.apache.hadoop:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers combine.children="append">
                                <transformer
                                             implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer">
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

(2)配置log4j

resources目錄下新建log4j.properties。

log4j.rootLogger=error,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n

讀取數(shù)據(jù)

常規(guī)Source寫法

(1)Batch方式

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://hadoop1:8020/warehouse/spark-iceberg/default/a");
DataStream<RowData> batch = FlinkSource.forRowData()
     .env(env)
     .tableLoader(tableLoader)
     .streaming(false)
     .build();

batch.map(r -> Tuple2.of(r.getLong(0),r.getLong(1) ))
      .returns(Types.TUPLE(Types.LONG,Types.LONG))
      .print();

env.execute("Test Iceberg Read");

(2)Streaming方式

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://hadoop1:8020/warehouse/spark-iceberg/default/a"); 
DataStream<RowData> stream = FlinkSource.forRowData()
    .env(env)
    .tableLoader(tableLoader)
    .streaming(true)
    .startSnapshotId(3821550127947089987L)
    .build();

stream.map(r -> Tuple2.of(r.getLong(0),r.getLong(1) ))
    .returns(Types.TUPLE(Types.LONG,Types.LONG))
    .print();

env.execute("Test Iceberg Read");
FLIP-27 Source寫法

(1)Batch方式

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://hadoop1:8020/warehouse/spark-iceberg/default/a");

IcebergSource<RowData> source1 = IcebergSource.forRowData()
    .tableLoader(tableLoader)
    .assignerFactory(new SimpleSplitAssignerFactory())
    .build();

DataStream<RowData> batch = env.fromSource(
    Source1,
    WatermarkStrategy.noWatermarks(),
    "My Iceberg Source",
    TypeInformation.of(RowData.class));

batch.map(r -> Tuple2.of(r.getLong(0), r.getLong(1)))
    .returns(Types.TUPLE(Types.LONG, Types.LONG))
    .print();

env.execute("Test Iceberg Read");

(2)Streaming方式

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://hadoop1:8020/warehouse/spark-iceberg/default/a");

IcebergSource source2 = IcebergSource.forRowData()
    .tableLoader(tableLoader)
    .assignerFactory(new SimpleSplitAssignerFactory())
    .streaming(true)
    .streamingStartingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT)
    .monitorInterval(Duration.ofSeconds(60))
    .build();

DataStream<RowData> stream = env.fromSource(
    Source2,
    WatermarkStrategy.noWatermarks(),
    "My Iceberg Source",
    TypeInformation.of(RowData.class));

stream.map(r -> Tuple2.of(r.getLong(0), r.getLong(1)))
    .returns(Types.TUPLE(Types.LONG, Types.LONG))
    .print();

env.execute("Test Iceberg Read");

寫入數(shù)據(jù)

目前支持DataStream<RowData>和DataStream<Row>格式的數(shù)據(jù)流寫入Iceberg表。

(1)寫入方式支持 append、overwrite、upsert

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);


SingleOutputStreamOperator<RowData> input = env.fromElements("")
    .map(new MapFunction<String, RowData>() {
        @Override
        public RowData map(String s) throws Exception {
            GenericRowData genericRowData = new GenericRowData(2);
            genericRowData.setField(0, 99L);
            genericRowData.setField(1, 99L);

            return genericRowData;
        }
    });

TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://hadoop1:8020/warehouse/spark-iceberg/default/a");


FlinkSink.forRowData(input)
    .tableLoader(tableLoader)
    .append()            // append方式
    //.overwrite(true)   // overwrite方式
    //.upsert(true)       // upsert方式
    ;

env.execute("Test Iceberg DataStream");

(2)寫入選項

FlinkSink.forRowData(input)
    .tableLoader(tableLoader)
    .set("write-format", "orc")
    .set(FlinkWriteOptions.OVERWRITE_MODE, "true");

可配置選項如下:

選項 默認值 說明
write-format Parquet同write.format.default 寫入操作使用的文件格式:Parquet, avro或orc
target-file-size-bytes 536870912(512MB)同write.target-file-size-bytes 控制生成的文件的大小,目標(biāo)大約為這么多字節(jié)
upsert-enabled 同write.upsert.enabled,
overwrite-enabled false 覆蓋表的數(shù)據(jù),不能和UPSERT模式同時開啟
distribution-mode None同 write.distribution-mode 定義寫數(shù)據(jù)的分布方式: none:不打亂行; hash:按分區(qū)鍵散列分布;range:如果表有SortOrder,則通過分區(qū)鍵或排序鍵分配
compression-codec 同 write.(fileformat).compression-codec
compression-level 同 write.(fileformat).compression-level
compression-strategy 同write.orc.compression-strategy

合并小文件

Iceberg現(xiàn)在不支持在flink sql中檢查表,需要使用Iceberg提供的Java API來讀取元數(shù)據(jù)來獲得表信息??梢酝ㄟ^提交Flink批處理作業(yè)將小文件重寫為大文件:

import org.apache.iceberg.flink.actions.Actions;

// 1.獲取 Table對象
// 1.1 創(chuàng)建 catalog對象
Configuration conf = new Configuration();
HadoopCatalog hadoopCatalog = new HadoopCatalog(conf, "hdfs://hadoop1:8020/warehouse/spark-iceberg");

// 1.2 通過 catalog加載 Table對象
Table table = hadoopCatalog.loadTable(TableIdentifier.of("default", "a"));

// 有Table對象,就可以獲取元數(shù)據(jù)、進行維護表的操作
//        System.out.println(table.history());
//        System.out.println(table.expireSnapshots().expireOlderThan());

// 2.通過 Actions 來操作 合并
Actions.forTable(table)
    .rewriteDataFiles()
    .targetSizeInBytes(1024L)
    .execute();

得到Table對象,就可以獲取元數(shù)據(jù)、進行維護表的操作。更多Iceberg提供的API操作,考:https://iceberg.apache.org/docs/latest/api/文章來源地址http://www.zghlxwxcb.cn/news/detail-685960.html

到了這里,關(guān)于數(shù)據(jù)湖Iceberg介紹和使用(集成Hive、SparkSQL、FlinkSQL)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • 數(shù)據(jù)湖08:Apache Iceberg原理和功能介紹

    數(shù)據(jù)湖08:Apache Iceberg原理和功能介紹

    ?系列專題:數(shù)據(jù)湖系列文章 ????????在使用不同的引擎進行大數(shù)據(jù)計算時,需要將數(shù)據(jù)根據(jù)計算引擎進行適配。這是一個相當(dāng)棘手的問題,為此出現(xiàn)了一種新的解決方案: 介于上層計算引擎和底層存儲格式之間的一個中間層 。這個中間層不是數(shù)據(jù)存儲的方式,只是定義

    2023年04月09日
    瀏覽(33)
  • Iceberg從入門到精通系列之六:Flink集成Iceberg

    下載Flink: https://www.apache.org/dyn/closer.lua/flink/flink-1.17.1/flink-1.17.1-bin-scala_2.12.tgz 下載Iceberg flink jar包:iceberg-flink-runtime-1.17-1.3.0.jar https://iceberg.apache.org/releases/ 修改配置文件flink-conf.yaml local模式 修改workers 至此FLink成功集成Iceberg

    2024年02月16日
    瀏覽(25)
  • iceberg對比hive優(yōu)勢

    從事務(wù)性上來說,iceberg具有更高的數(shù)據(jù)質(zhì)量。 因為iceberg本質(zhì)是一種table format,屏蔽了底層的存儲細節(jié),寫入數(shù)據(jù)時候需要嚴格按照schema寫入。而hive可以先寫入底層數(shù)據(jù),然后使用load partition的方式來加載分區(qū)。這樣就可能造成hive的實際存儲數(shù)據(jù)與schema不一致。 另外,hive的

    2024年02月14日
    瀏覽(15)
  • CHD6.2.1集群 Hive開啟Iceberg

    CHD6.2.1集群 Hive開啟Iceberg

    下載jar包 https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-hive-runtime/1.0.0/iceberg-hive-runtime-1.0.0.jar 存放在/opt/cloudera/parcels/CDH/lib/hive/auxlib/? ? ?CDH集群修改hive配置 選擇xml格式 粘貼即可 propertynameiceberg.engine.hive.enabled/namevaluetrue/valuefinaltrue/finaldescription集成iceberg/description/propertyproper

    2024年02月15日
    瀏覽(16)
  • Iceberg從入門到精通系列之三:創(chuàng)建Iceberg表、修改表結(jié)構(gòu)、插入數(shù)據(jù)、刪除表

    Hive語法創(chuàng)建分區(qū)表,不會在元數(shù)據(jù)創(chuàng)建分區(qū),而是將分區(qū)數(shù)據(jù)轉(zhuǎn)換為Iceberg標(biāo)識分區(qū)。 這種情況下不能使用Iceberg的分區(qū)轉(zhuǎn)換,例如:days(timestamp),如果想要使用Iceberg格式表的分區(qū)轉(zhuǎn)換標(biāo)識分區(qū),需要使用Spark或者Flink引擎創(chuàng)建表。 只支持HiveCatalog表修改表屬性,Iceberg表屬性和

    2024年02月11日
    瀏覽(111)
  • Iceberg從入門到精通系列之十:flink sql往Iceberg表插入數(shù)據(jù),Batch模式和Streaming模式查詢數(shù)據(jù)

    僅支持Flink的Batch模式 當(dāng)將數(shù)據(jù)寫入v2表格時,Iceberg支持基于主鍵的UPSERT。有兩種方法可以啟用upsert。 建表時指定 UPSERT模式下,如果對表進行分區(qū),則分區(qū)字段必須是主鍵。 Batch模式: Streaming模式: 從當(dāng)前快照讀取所有記錄,然后從該快照讀取增量數(shù)據(jù) 讀取指定快照id(不包

    2024年02月12日
    瀏覽(26)
  • iceberg的java api使用

    iceberg的java api使用

    【前言】 了解一個組件的最好方式是先使用該組件,今天我們就來聊聊如何通過java api對iceberg進行操作。 為什么是選擇api進行介紹,而不是更通用的flink、spark、hive等。一方面是覺得flink、spark使用iceberg的介紹網(wǎng)上已經(jīng)有很多,官網(wǎng)的介紹也比較清晰,而java api的介紹則相對

    2024年02月10日
    瀏覽(7)
  • 火山引擎 Iceberg 數(shù)據(jù)湖的應(yīng)用與實踐

    火山引擎 Iceberg 數(shù)據(jù)湖的應(yīng)用與實踐

    在云原生計算時代,云存儲使得海量數(shù)據(jù)能以低成本進行存儲,但是這也給如何訪問、管理和使用這些云上的數(shù)據(jù)提出了挑戰(zhàn)。而 Iceberg 作為一種云原生的表格式,可以很好地應(yīng)對這些挑戰(zhàn)。本文將介紹火山引擎在云原生計算產(chǎn)品上使用 Iceberg 的實踐,和大家分享高效查詢、

    2024年02月09日
    瀏覽(20)
  • 【大數(shù)據(jù)】Apache Iceberg 概述和源代碼的構(gòu)建

    【大數(shù)據(jù)】Apache Iceberg 概述和源代碼的構(gòu)建

    我們在使用不同的引擎進行大數(shù)據(jù)計算時,需要將數(shù)據(jù)根據(jù)計算引擎進行適配。這是一個相當(dāng)棘手的問題,為此出現(xiàn)了一種新的解決方案:介于上層計算引擎和底層存儲格式之間的一個中間層。這個中間層不是數(shù)據(jù)存儲的方式,只是定義了數(shù)據(jù)的元數(shù)據(jù)組織方式,并向計算引

    2024年02月09日
    瀏覽(24)
  • Flink + Iceberg打造流批一體的數(shù)據(jù)湖架構(gòu)

    Flink + Iceberg打造流批一體的數(shù)據(jù)湖架構(gòu)

    一、背景 1、數(shù)據(jù)倉庫架構(gòu) ????????從Hive表 出倉 到外部系統(tǒng)(ClickHouse、Presto、ES等)帶來的復(fù)雜性和存儲開發(fā)等額外代價,盡量減少這種場景出倉的必要性。 痛點:傳統(tǒng) T+1 任務(wù) 海量的TB級 T+ 1 任務(wù)延遲導(dǎo)致下游數(shù)據(jù)產(chǎn)出時間不穩(wěn)定。 任務(wù)遇到故障重試恢復(fù)代價昂貴 數(shù)

    2024年02月04日
    瀏覽(23)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包