国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

數(shù)據(jù)湖Iceberg-FlinkSQL集成(5)

這篇具有很好參考價(jià)值的文章主要介紹了數(shù)據(jù)湖Iceberg-FlinkSQL集成(5)。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。


數(shù)據(jù)湖Iceberg-簡介(1)
數(shù)據(jù)湖Iceberg-存儲(chǔ)結(jié)構(gòu)(2)
數(shù)據(jù)湖Iceberg-Hive集成Iceberg(3)
數(shù)據(jù)湖Iceberg-SparkSQL集成(4)
數(shù)據(jù)湖Iceberg-FlinkSQL集成(5)
數(shù)據(jù)湖Iceberg-FlinkSQL-kafka類型表數(shù)據(jù)無法成功寫入(6)
數(shù)據(jù)湖Iceberg-Flink DataFrame集成(7)

數(shù)據(jù)湖Iceberg-FlinkSQL集成

環(huán)境準(zhǔn)備

Flink與Iceberg的版本對(duì)應(yīng)關(guān)系如下

Flink 版本 Iceberg 版本
1.11 0.9.0 – 0.12.1
1.12 0.12.0 – 0.13.1
1.13 0.13.0 – 1.0.0
1.14 0.13.0 – 1.1.0
1.15 0.14.0 – 1.1.0
1.16 1.1.0 – 1.1.0

jar包下載地址

https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-flink-runtime-1.14/

在里面選擇自己的版本即可,這里我使用的是flink 1.14.3 iceberg1.1.0版本

具體下載地址:https://repo1.maven.org/maven2/org/apache/iceberg/iceberg-flink-runtime-1.14/1.1.0/iceberg-flink-runtime-1.14-1.1.0.jar

jar包上傳到Flink lib目錄下

[root@ lib]# pwd
/opt/flink/lib
[root@ lib]# ll
total 252612
-rw-r--r-- 1 flink flink     85584 Jan 11  2022 flink-csv-1.14.3.jar
-rw-r--r-- 1 flink flink 136054094 Jan 11  2022 flink-dist_2.12-1.14.3.jar
-rw-r--r-- 1 flink flink    153145 Jan 11  2022 flink-json-1.14.3.jar
-rw-rw-r-- 1 flink flink  43317025 Jan 13 13:54 flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
-rw-r--r-- 1 flink flink   7709731 Aug 22  2021 flink-shaded-zookeeper-3.4.14.jar
-rw-r--r-- 1 flink flink  39633410 Jan 11  2022 flink-table_2.12-1.14.3.jar
-rw-r--r-- 1 flink flink   29256108 Apr 21 09:21 iceberg-flink-runtime-1.14-1.1.0.jar
-rw-r--r-- 1 flink flink    112758 May  3  2013 javax.ws.rs-api-2.0.jar
-rw-r--r-- 1 flink flink    208006 Jan  9  2022 log4j-1.2-api-2.17.1.jar
-rw-r--r-- 1 flink flink    301872 Jan  9  2022 log4j-api-2.17.1.jar
-rw-r--r-- 1 flink flink   1790452 Jan  9  2022 log4j-core-2.17.1.jar
-rw-r--r-- 1 flink flink     24279 Jan  9  2022 log4j-slf4j-impl-2.17.1.jar

修改flink-conf.yaml配置

修改或添加以下配置

# 禁用 ClassLoader 檢查
classloader.check-leaked-classloader: false

# 每個(gè) TaskManager 上任務(wù)槽數(shù)量,這里為 4
taskmanager.numberOfTaskSlots: 4

# 狀態(tài)后端使用 RocksDB
state.backend: rocksdb

# 每隔 30000 毫秒進(jìn)行一次檢查點(diǎn)
execution.checkpointing.interval: 30000

# 指定檢查點(diǎn)保存的目錄,這里為 HDFS 上的目錄
state.checkpoints.dir: hdfs://hadoop1:8020/ckps

# 啟用增量式檢查點(diǎn),這將在一定程度上提高性能
state.backend.incremental: true

啟動(dòng)flink-sql

注意:

剛剛Flink修改完需要重啟Flink

輸入 ./sql-client.sh embedded或者./sql-client

sql-client.sh embedded 是啟動(dòng) Flink SQL Client 時(shí)指定的一種模式,即嵌入式模式。

在嵌入式模式下,F(xiàn)link SQL Client 會(huì)自動(dòng)啟動(dòng)一個(gè) Flink 集群,無需手動(dòng)啟動(dòng),直接在命令行中交互式地輸入 SQL 命令進(jìn)行查詢和操作。同時(shí),F(xiàn)link SQL Client 會(huì)將所有的數(shù)據(jù)和表定義存儲(chǔ)在本地內(nèi)存中,這意味著不支持持久化數(shù)據(jù)和高可用性。

相反,如果您使用的是獨(dú)立模式,F(xiàn)link SQL Client 會(huì)連接到一個(gè)已經(jīng)運(yùn)行的 Flink 集群。在這種模式下,需要首先手動(dòng)啟動(dòng) Flink 集群,并將 Flink SQL Client 配置為連接到該集群。獨(dú)立模式相對(duì)嵌入式模式更加靈活和可擴(kuò)展,但啟動(dòng)和配置過程可能需要更多的時(shí)間和精力。

總之,嵌入式模式適用于快速原型設(shè)計(jì)和小規(guī)模數(shù)據(jù)探索,而獨(dú)立模式適用于生產(chǎn)環(huán)境和大規(guī)模數(shù)據(jù)處理。

數(shù)據(jù)湖Iceberg-FlinkSQL集成(5)

創(chuàng)建和使用Catalog

創(chuàng)建語法說明

CREATE CATALOG <catalog_name> WITH (
  'type'='iceberg',
  `<config_key>`=`<config_value>`
); 
  • type: 必須是iceberg。(必須)
  • catalog-type: 內(nèi)置了hive和hadoop兩種catalog,也可以使用catalog-impl來自定義catalog。(可選)
  • catalog-impl: 自定義catalog實(shí)現(xiàn)的全限定類名。如果未設(shè)置catalog-type,則必須設(shè)置。(可選)
  • property-version: 描述屬性版本的版本號(hào)。此屬性可用于向后兼容,以防屬性格式更改。當(dāng)前屬性版本為1。(可選)
  • cache-enabled: 是否啟用目錄緩存,默認(rèn)值為true。(可選)
  • cache.expiration-interval-ms: 本地緩存catalog條目的時(shí)間(以毫秒為單位);負(fù)值,如-1表示沒有時(shí)間限制,不允許設(shè)為0。默認(rèn)值為-1。(可選)

Hive Catalog

  • 上傳hive connector到flink的lib中

下載地址:https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-hive-3.1.2_2.12/1.14.3/flink-sql-connector-hive-3.1.2_2.12-1.14.3.jar

  • 重啟Flink集群,進(jìn)入sql-client

如果不上傳jar包重啟服務(wù),后續(xù)可能會(huì)遇到這種錯(cuò)誤

[ERROR] Could not execute SQL statement. Reason:
org.apache.hadoop.hive.metastore.api.MetaException: java.lang.reflect.UndeclaredThrowableException
  • 創(chuàng)建Hive Catalog
CREATE CATALOG hive_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hive',
  'uri'='thrift://bigdata-24-195:9083',
  'clients'='5',
  'property-version'='1',
  'warehouse'='hdfs://bigdata-24-194:8020/iceberg/iceberg-hive'
);

注意:這里指定的warehouse目錄hive用戶需要有權(quán)限訪問,否則后續(xù)創(chuàng)建庫或者表會(huì)失敗

Flink SQL> CREATE DATABASE iceberg_db;
[ERROR] Could not execute SQL statement. Reason:
org.apache.hadoop.hive.metastore.api.MetaException: java.lang.reflect.UndeclaredThrowableException

詳細(xì)異常日志可以在hive日志中查看

  • uri: Hive metastore的thrift uri。(必選)

  • clients:Hive metastore客戶端池大小,默認(rèn)為2。(可選)

  • warehouse: 數(shù)倉目錄。

  • hive-conf-dir:包含hive-site.xml配置文件的目錄路徑,hive-site.xml中hive.metastore.warehouse.dir 的值會(huì)被warehouse覆蓋。

  • hadoop-conf-dir:包含core-site.xml和hdfs-site.xml配置文件的目錄路徑。

  • 查看catalogs

Flink SQL> show CATALOGS ;
+-----------------+
|    catalog name |
+-----------------+
| default_catalog |
|    hive_catalog |
+-----------------+
2 rows in set
  • 進(jìn)入catalogs
use catalog hive_catalog;

Hadoop Catalog

Iceberg還支持HDFS中基于目錄的catalog,可以使用’catalog-type’='hadoop’配置。

  • 創(chuàng)建catalog
CREATE CATALOG hadoop_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hadoop',
  'warehouse'='hdfs://bigdata-24-194:8020/iceberg/iceberg-hadoop',
  'property-version'='1'
);

use catalog hadoop_catalog;
create database iceberg_db;

在目錄中可以看到我們創(chuàng)建的庫和catalog

數(shù)據(jù)湖Iceberg-FlinkSQL集成(5)

配置sql-client初始化文件

配置初始化文件后,后續(xù)啟動(dòng),不用每次重新啟動(dòng)都創(chuàng)建catalog

$FLINK_HOME/conf目錄下創(chuàng)建sql-client-init.sql文件

CREATE CATALOG hadoop_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hadoop',
  'warehouse'='hdfs://bigdata-24-194:8020/iceberg/iceberg-hadoop',
  'property-version'='1'
);

CREATE CATALOG hive_catalog WITH (
  'type'='iceberg',
  'catalog-type'='hive',
  'uri'='thrift://bigdata-24-195:9083',
  'clients'='5',
  'property-version'='1',
  'warehouse'='hdfs://bigdata-24-194:8020/iceberg/iceberg-hive'
);

USE CATALOG hive_catalog;

后續(xù)啟動(dòng)sql-client時(shí),加上 -i sql文件路徑 即可完成catalog的初始化,并進(jìn)入hive_catalog

bin/sql-client.sh embedded -i conf/sql-client-init.sql

DDL語句

創(chuàng)建數(shù)據(jù)庫

CREATE DATABASE iceberg_db;
USE iceberg_db;

創(chuàng)建表

CREATE TABLE `hive_catalog`.`iceberg_db`.`sample` (
    id BIGINT COMMENT 'unique id',
    data STRING
);

建表命令現(xiàn)在支持最常用的flink建表語法,包括:

  • PARTITION BY (column1, column2, …):配置分區(qū),apache flink還不支持隱藏分區(qū)。
  • COMMENT ‘table document’:指定表的備注
  • WITH (‘key’=‘value’, …):設(shè)置表屬性
    目前,不支持計(jì)算列、watermark(支持主鍵)。
創(chuàng)建分區(qū)表
CREATE TABLE `hive_catalog`.`iceberg_db`.`sample1` (
    id BIGINT COMMENT 'unique id',
    data STRING
) PARTITIONED BY (data);

Apache Iceberg支持隱藏分區(qū),但Apache flink不支持在列上通過函數(shù)進(jìn)行分區(qū),現(xiàn)在無法在flink DDL中支持隱藏分區(qū)。

使用LIKE語法建表

LIKE語法用于創(chuàng)建一個(gè)與另一個(gè)表具有相同schema、分區(qū)和屬性的表。

CREATE TABLE `hive_catalog`.`iceberg_db`.`sample2` (
    id BIGINT COMMENT 'unique id',
    data STRING
);

CREATE TABLE  `hive_catalog`.`iceberg_db`.`sample_like` LIKE `hive_catalog`.`iceberg_db`.`sample2`;
查看表結(jié)構(gòu)

默認(rèn)在FlinkSQL中無法查看到表結(jié)構(gòu)

Flink SQL> desc formatted sample_like;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.sql.parser.impl.ParseException: Encountered "sample_like" at line 1, column 16.
Was expecting one of:
    <EOF> 
    "." ...
    

我們可以在hive中查看iceberg的表結(jié)構(gòu)

0: jdbc:hive2://bigdata-24-194:2181,bigdata-2> desc formatted sample_like;
INFO  : Compiling command(queryId=hive_20230421141659_e372d739-42e8-4cac-97a2-64c10d12c5da): desc formatted sample_like
INFO  : Semantic Analysis Completed (retrial = false)
INFO  : Returning Hive schema: Schema(fieldSchemas:[FieldSchema(name:col_name, type:string, comment:from deserializer), FieldSchema(name:data_type, type:string, comment:from deserializer), FieldSchema(name:comment, type:string, comment:from deserializer)], properties:null)
INFO  : Completed compiling command(queryId=hive_20230421141659_e372d739-42e8-4cac-97a2-64c10d12c5da); Time taken: 0.077 seconds
INFO  : Executing command(queryId=hive_20230421141659_e372d739-42e8-4cac-97a2-64c10d12c5da): desc formatted sample_like
INFO  : Starting task [Stage-0:DDL] in serial mode
INFO  : Completed executing command(queryId=hive_20230421141659_e372d739-42e8-4cac-97a2-64c10d12c5da); Time taken: 0.108 seconds
INFO  : OK
+-------------------------------+----------------------------------------------------+----------------------------------------------------+
|           col_name            |                     data_type                      |                      comment                       |
+-------------------------------+----------------------------------------------------+----------------------------------------------------+
| # col_name                    | data_type                                          | comment                                            |
| id                            | bigint                                             |                                                    |
| data                          | string                                             |                                                    |
|                               | NULL                                               | NULL                                               |
| # Detailed Table Information  | NULL                                               | NULL                                               |
| Database:                     | iceberg_db                                         | NULL                                               |
| OwnerType:                    | USER                                               | NULL                                               |
| Owner:                        | hdfs                                               | NULL                                               |
| CreateTime:                   | Fri Apr 22 14:16:17 CST 2023                       | NULL                                               |
| LastAccessTime:               | Sun Dec 14 04:03:18 CST 1969                       | NULL                                               |
| Retention:                    | 2147483647                                         | NULL                                               |
| Location:                     | hdfs://bigdata-24-194:8020/iceberg/iceberg-hive/iceberg_db.db/sample_like | NULL                                               |
| Table Type:                   | EXTERNAL_TABLE                                     | NULL                                               |
| Table Parameters:             | NULL                                               | NULL                                               |
|                               | EXTERNAL                                           | TRUE                                               |
|                               | current-schema                                     | {\"type\":\"struct\",\"schema-id\":0,\"fields\":[{\"id\":1,\"name\":\"id\",\"required\":false,\"type\":\"long\"},{\"id\":2,\"name\":\"data\",\"required\":false,\"type\":\"string\"}]} |
|                               | metadata_location                                  | hdfs://bigdata-24-194:8020/iceberg/iceberg-hive/iceberg_db.db/sample_like/metadata/00000-0750a26c-8b26-4417-9f27-7786a5775026.metadata.json |
|                               | numFiles                                           | 1                                                  |
|                               | snapshot-count                                     | 0                                                  |
|                               | table_type                                         | ICEBERG                                            |
|                               | totalSize                                          | 1225                                               |
|                               | transient_lastDdlTime                              | 1682057777                                         |
|                               | uuid                                               | 4e0be931-e962-4a94-a176-3969012647c1               |
|                               | NULL                                               | NULL                                               |
| # Storage Information         | NULL                                               | NULL                                               |
| SerDe Library:                | org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe | NULL                                               |
| InputFormat:                  | org.apache.hadoop.mapred.FileInputFormat           | NULL                                               |
| OutputFormat:                 | org.apache.hadoop.mapred.FileOutputFormat          | NULL                                               |
| Compressed:                   | No                                                 | NULL                                               |
| Num Buckets:                  | 0                                                  | NULL                                               |
| Bucket Columns:               | []                                                 | NULL                                               |
| Sort Columns:                 | []                                                 | NULL                                               |
+-------------------------------+----------------------------------------------------+----------------------------------------------------+
32 rows selected (0.289 seconds)

修改表

Flink SQL 目前只支持修改表屬性和表名,其他的暫不支持,需要使用API才可以修改

修改表屬性
ALTER TABLE `hive_catalog`.`iceberg_db`.`sample` SET ('write.format.default'='avro');
修改表名
ALTER TABLE `hive_catalog`.`iceberg_db`.`sample` RENAME TO `hive_catalog`.`iceberg_db`.`new_sample`;

刪除表

DROP TABLE `hive_catalog`.`iceberg_db`.`sample`;

不會(huì)刪除具體數(shù)據(jù),HDFS中文件還繼續(xù)存在

查詢數(shù)據(jù)

Flink On Yarn的問題

這里遇到個(gè)問題,先在這直接說了,剛剛只是操作元數(shù)據(jù)所以沒遇到這個(gè)問題,在插入和查詢都會(huì)遇到這個(gè)問題

我使用的是Ambari集成的Flink,運(yùn)行模式為Flink on Yarn,直接用查詢、插入語句去操作q數(shù)據(jù)會(huì)提示連接拒絕。

這里幾個(gè)問題點(diǎn)需要注意:

  • 1.當(dāng)我們使用Flink on Yarn模式提交時(shí)需要指定-s yarn-session去運(yùn)行,如下
bin/sql-client.sh -s yarn-session  -i conf/sql-client-init.sql 

指定 -s yarn-session 后,F(xiàn)link會(huì)在當(dāng)前服務(wù)器/tmp/.yarn-properties-flink文件找到運(yùn)行的yarn-session任務(wù),去提交。

內(nèi)容如下:

# cat /tmp/.yarn-properties-flink   
#Generated YARN properties file
#Fri Apr 21 11:11:44 CST 2023
dynamicPropertiesString=
applicationID=application_1675237371712_0532
  • 2.Ambari 默認(rèn)Flink on Yarn 提交是使用的Flink用戶,我們提交任務(wù)是使用HDFS用戶,還是會(huì)導(dǎo)致提交失敗

解決方法:kill之前使用flink用戶提交的任務(wù),使用HDFS啟動(dòng)Flink on Yarn任務(wù)

這里偷懶了,直接手動(dòng)kill,手動(dòng)啟動(dòng)了,有時(shí)間可以改下Ambari啟動(dòng)Flink方法,這樣才一勞永逸

Flink on yarn 啟動(dòng)命令如下

export HADOOP_CONF_DIR=/etc/hadoop/conf; export HADOOP_CLASSPATH=/usr/hdp/3.1.5.0-152/hadoop/conf:/usr/hdp/3.1.5.0-152/hadoop/lib/*:/usr/hdp/3.1.5.0-152/hadoop/.//*:/usr/hdp/3.1.5.0-152/hadoop-hdfs/./:/usr/hdp/3.1.5.0-152/hadoop-hdfs/lib/*:/usr/hdp/3.1.5.0-152/hadoop-hdfs/.//*:/usr/hdp/3.1.5.0-152/hadoop-mapreduce/lib/*:/usr/hdp/3.1.5.0-152/hadoop-mapreduce/.//*:/usr/hdp/3.1.5.0-152/hadoop-yarn/./:/usr/hdp/3.1.5.0-152/hadoop-yarn/lib/*:/usr/hdp/3.1.5.0-152/hadoop-yarn/.//*:/usr/hdp/3.1.5.0-152/tez/*:/usr/hdp/3.1.5.0-152/tez/lib/*:/usr/hdp/3.1.5.0-152/tez/conf:/usr/hdp/3.1.5.0-152/tez/conf_llap:/usr/hdp/3.1.5.0-152/tez/doc:/usr/hdp/3.1.5.0-152/tez/hadoop-shim-0.9.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/hadoop-shim-2.8-0.9.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/lib:/usr/hdp/3.1.5.0-152/tez/man:/usr/hdp/3.1.5.0-152/tez/tez-api-0.9.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/tez-common-0.9.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/tez-dag-0.9.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/tez-examples-0.9.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/tez-history-parser-0.9.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/tez-javadoc-tools-0.9.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/tez-job-analyzer-0.9.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/tez-mapreduce-0.9.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/tez-protobuf-history-plugin-0.9.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/tez-runtime-internals-0.9.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/tez-runtime-library-0.9.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/tez-tests-0.9.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/tez-yarn-timeline-cache-plugin-0.9.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/tez-yarn-timeline-history-0.9.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/tez-yarn-timeline-history-with-acls-0.9.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/tez-yarn-timeline-history-with-fs-0.9.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/ui:/usr/hdp/3.1.5.0-152/tez/lib/async-http-client-1.9.40.jar:/usr/hdp/3.1.5.0-152/tez/lib/commons-cli-1.2.jar:/usr/hdp/3.1.5.0-152/tez/lib/commons-codec-1.4.jar:/usr/hdp/3.1.5.0-152/tez/lib/commons-collections-3.2.2.jar:/usr/hdp/3.1.5.0-152/tez/lib/commons-collections4-4.1.jar:/usr/hdp/3.1.5.0-152/tez/lib/commons-io-2.4.jar:/usr/hdp/3.1.5.0-152/tez/lib/commons-lang-2.6.jar:/usr/hdp/3.1.5.0-152/tez/lib/commons-math3-3.1.1.jar:/usr/hdp/3.1.5.0-152/tez/lib/gcs-connector-hadoop3-1.9.17.3.1.5.0-152-shaded.jar:/usr/hdp/3.1.5.0-152/tez/lib/guava-28.0-jre.jar:/usr/hdp/3.1.5.0-152/tez/lib/hadoop-aws-3.1.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/lib/hadoop-azure-3.1.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/lib/hadoop-azure-datalake-3.1.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/lib/hadoop-hdfs-client-3.1.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/lib/hadoop-mapreduce-client-common-3.1.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/lib/hadoop-mapreduce-client-core-3.1.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/lib/hadoop-yarn-server-timeline-pluginstorage-3.1.1.3.1.5.0-152.jar:/usr/hdp/3.1.5.0-152/tez/lib/jersey-client-1.19.jar:/usr/hdp/3.1.5.0-152/tez/lib/jersey-json-1.19.jar:/usr/hdp/3.1.5.0-152/tez/lib/jettison-1.3.4.jar:/usr/hdp/3.1.5.0-152/tez/lib/jetty-server-9.3.24.v20180605.jar:/usr/hdp/3.1.5.0-152/tez/lib/jetty-util-9.3.24.v20180605.jar:/usr/hdp/3.1.5.0-152/tez/lib/jsr305-3.0.0.jar:/usr/hdp/3.1.5.0-152/tez/lib/metrics-core-3.1.0.jar:/usr/hdp/3.1.5.0-152/tez/lib/protobuf-java-2.5.0.jar:/usr/hdp/3.1.5.0-152/tez/lib/RoaringBitmap-0.4.9.jar:/usr/hdp/3.1.5.0-152/tez/lib/servlet-api-2.5.jar:/usr/hdp/3.1.5.0-152/tez/lib/slf4j-api-1.7.10.jar:/usr/hdp/3.1.5.0-152/tez/lib/tez.tar.gz; 

/opt/flink/bin/yarn-session.sh -d -nm flinkapp-from-ambari -n 1 -s 1 -jm 768 -tm 1024 -qu default >> /var/log/flink/flink-setup.log &

Batch模式
SET execution.runtime-mode = batch;
select * from sample;
Streaming模式
-- 設(shè)置 Flink 的運(yùn)行模式為流式計(jì)算
SET execution.runtime-mode = streaming;

-- 啟用動(dòng)態(tài)表選項(xiàng)
SET table.dynamic-table-options.enabled=true;

-- 設(shè)置 Flink SQL 執(zhí)行結(jié)果的輸出格式為 Tableau
SET sql-client.execution.result-mode=tableau;

-- 查詢 Hive Catalog 中的 Iceberg 表 sample 中的所有數(shù)據(jù)
select * from hive_catalog.iceberg_db.sample;

SET table.dynamic-table-options.enabled=true;

從 1.11 開始,用戶可以通過動(dòng)態(tài)參數(shù)的形式靈活地設(shè)置表的屬性參數(shù),覆蓋或者追加原表的 WITH (…) 語句內(nèi)定義的 table options。

基本語法為:

table_path /*+ OPTIONS(key=val [, key=val]*) */
動(dòng)態(tài)參數(shù)的使用沒有語境限制,只要是引用表的地方都可以追加定義。在指定的表后面追加的動(dòng)態(tài)參數(shù)會(huì)自動(dòng)追加到原表定義中

從當(dāng)前快照讀取所有記錄,然后從該快照讀取增量數(shù)據(jù)
SELECT * FROM sample5 /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/ ;

返回格式(會(huì)根據(jù)新增數(shù)據(jù)持續(xù)滾動(dòng))

Flink SQL> SELECT * FROM hive_catalog.iceberg_db.sample /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/ ;
2023-04-21 15:43:33,819 INFO  org.apache.hadoop.yarn.client.RMProxy                        [] - Connecting to ResourceManager at bigdata-24-194/172.16.24.194:8050
2023-04-21 15:43:33,821 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2023-04-21 15:43:33,822 WARN  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set.The Flink YARN Client needs one of these to be set to properly load the Hadoop configuration for accessing YARN.
2023-04-21 15:43:33,830 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface bigdata-24-196:6588 of application 'application_1675237371712_0532'.
+----+----------------------+--------------------------------+
| op |                   id |                           data |
+----+----------------------+--------------------------------+
| +I |                   34 |                          aeefb |
| +I |                   34 |                             ae |
| +I |                    1 |                              a |
| +I |                    1 |                              a |
| +I |                   34 |                            aee |
| +I |                   34 |                    aeeefdsfafb |

讀取指定快照id(不包含)后的增量數(shù)據(jù)
SELECT * FROM sample /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'start-snapshot-id'='3821550127947089987')*/ ;
  • monitor-interval: 連續(xù)監(jiān)控新提交數(shù)據(jù)文件的時(shí)間間隔(默認(rèn)為10s)。

  • start-snapshot-id: 流作業(yè)開始的快照id。

注意:如果是無界數(shù)據(jù)流式upsert進(jìn)iceberg表(讀kafka,upsert進(jìn)iceberg表),那么再去流讀iceberg表會(huì)存在讀不出數(shù)據(jù)的問題。如果無界數(shù)據(jù)流式append進(jìn)iceberg表(讀kafka,append進(jìn)iceberg表),那么流讀該iceberg表可以正??吹浇Y(jié)果。

插入數(shù)據(jù)

INSERT INTO
INSERT INTO `hive_catalog`.`iceberg_db`.`sample` VALUES (1, 'a');
INSERT INTO `hive_catalog`.`iceberg_db`.`sample` SELECT id, data from sample2;
INSERT OVERWRITE

僅支持Flink的Batch模式

SET execution.runtime-mode = batch;

INSERT OVERWRITE sample VALUES (1, 'a');

INSERT OVERWRITE `hive_catalog`.`default`.`sample` PARTITION(data='a') SELECT 6;
UPSERT

當(dāng)將數(shù)據(jù)寫入v2表格式時(shí),Iceberg支持基于主鍵的UPSERT。有兩種方法可以啟用upsert。

建表時(shí)指定

CREATE TABLE `hive_catalog`.`iceberg_db`.`sample5` (
  `id`  INT UNIQUE COMMENT 'unique id',
  `data` STRING NOT NULL,
 PRIMARY KEY(`id`) NOT ENFORCED
) with (
'format-version'='2', 
'write.upsert.enabled'='true'
);

插入時(shí)指定

INSERT INTO `hive_catalog`.`iceberg_db`.`sample5` /*+ OPTIONS('upsert-enabled'='true') */ values(1,'a'),(2,'b'),(3,'c');

結(jié)果:

+----+-------------+--------------------------------+
| op |          id |                           data |
+----+-------------+--------------------------------+
| +I |           1 |                              a |
| +I |           2 |                              b |
| +I |           3 |                              c |
+----+-------------+--------------------------------+

插入

INSERT INTO `hive_catalog`.`iceberg_db`.`sample5` /*+ OPTIONS('upsert-enabled'='true') */ values(1,'abc')); 

結(jié)果

+----+-------------+--------------------------------+
| op |          id |                           data |
+----+-------------+--------------------------------+
| +I |           1 |                            abc |
| +I |           2 |                              b |
| +I |           3 |                              c |
+----+-------------+--------------------------------+

插入的表,format-version需要為2。

OVERWRITE和UPSERT不能同時(shí)設(shè)置。在UPSERT模式下,如果對(duì)表進(jìn)行分區(qū),則分區(qū)字段必須也是主鍵。

讀取Kafka流,upsert插入到iceberg表中

下載:https://repo1.maven.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.12/1.14.3/flink-sql-connector-kafka_2.12-1.14.3.jar

將jar包放到$FLINK_HOME/lib目錄下,重啟Flink On Yarn

這里先說一個(gè)大坑,Iceberg現(xiàn)階段的一個(gè)Bug

Kafka表必須要在default_catalog.default_database下,即catalog名為default_catalog,數(shù)據(jù)庫(命名空間)為default_database下,否則kafka類型的表讀取不到數(shù)據(jù)。

如果都在我們自己創(chuàng)建的catalog下創(chuàng)建,則執(zhí)行INSERT INTO hadoop_catalog.iceberg_db.sample6 SELECT * FROM default_catalog.default_database.kafka1;后,在Flink任務(wù)中看不到一個(gè)持續(xù)執(zhí)行的Flink Job,而正常執(zhí)行該命令Flink會(huì)執(zhí)行一個(gè)持續(xù)執(zhí)行的任務(wù),去消費(fèi)kafka數(shù)據(jù)寫入Iceberg,正常情況如下圖:

數(shù)據(jù)湖Iceberg-FlinkSQL集成(5)

所以這里我們kafka表在default_catalog.default_database下,寫入數(shù)據(jù)的表在我們自己創(chuàng)建的hadoop_catalog.iceberg_db

create table default_catalog.default_database.kafka1(
  id int,
  data string
) with (
  'connector' = 'kafka'
  ,'topic' = 'ttt'
  ,'properties.zookeeper.connect' = '172.16.24.194:2181'
  ,'properties.bootstrap.servers' = '172.16.24.194:9092'
  ,'format' = 'json'
  ,'properties.group.id'='iceberg1'
  ,'scan.startup.mode'='earliest-offset'
);

CREATE TABLE `hadoop_catalog`.`iceberg_db`.`sample6` (
  `id`  INT UNIQUE COMMENT 'unique id',
  `data` STRING NOT NULL,
 PRIMARY KEY(`id`) NOT ENFORCED
) with (
'format-version'='2', 
'write.upsert.enabled'='true'
);


INSERT INTO hadoop_catalog.iceberg_db.sample6 SELECT * FROM default_catalog.default_database.kafka1;

此時(shí)我們往Kafka發(fā)送數(shù)據(jù):

{"id":123,"data":"llalalala"}
{"id":1123,"data":"asdfasfds"}

查看表中數(shù)據(jù)可以看到寫入成功

select * from hadoop_catalog.iceberg_db.sample6;

數(shù)據(jù)湖Iceberg-FlinkSQL集成(5)

再次發(fā)送數(shù)據(jù)

{"id":123,"data":"JastData"}

查看表中數(shù)據(jù),發(fā)現(xiàn)修改成功

數(shù)據(jù)湖Iceberg-FlinkSQL集成(5)文章來源地址http://www.zghlxwxcb.cn/news/detail-425526.html

與Flink集成的不足

支持的特性 Flink 備注
SQL create catalog
SQL create database
SQL create table
SQL create table like
SQL alter table 只支持修改表屬性,不支持更改列和分區(qū)
SQL drop_table
SQL select 支持流式和批處理模式
SQL insert into 支持流式和批處理模式
SQL insert overwrite
DataStream read
DataStream append
DataStream overwrite
Metadata tables 支持Java API,不支持Flink SQL
Rewrite files action
  • 不支持創(chuàng)建隱藏分區(qū)的Iceberg表。
  • 不支持創(chuàng)建帶有計(jì)算列的Iceberg表。
  • 不支持創(chuàng)建帶watermark的Iceberg表。
  • 不支持添加列,刪除列,重命名列,更改列。
  • Iceberg目前不支持Flink SQL 查詢表的元數(shù)據(jù)信息,需要使用Java API 實(shí)現(xiàn)。

到了這里,關(guān)于數(shù)據(jù)湖Iceberg-FlinkSQL集成(5)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 第二章 Flink集成Iceberg的集成方式及基本SQL使用

    第二章 Flink集成Iceberg的集成方式及基本SQL使用

    注意事項(xiàng):一般都是用基于Flink的Hive Catalog,使用HMS存儲(chǔ)表模型數(shù)據(jù) 1、集成方式 (1)下載jar包 下載地址 (2)啟動(dòng)FlinkSQL ①StandLone模式啟動(dòng) ②Flink On Yarn模式啟動(dòng) 2、基本使用 2.1、創(chuàng)建catalog 核心:可創(chuàng)建hive、hadoop、自定義等目錄,創(chuàng)建模板如下 type : 必須的 iceberg 。(必需

    2024年02月08日
    瀏覽(29)
  • ETL簡介:數(shù)據(jù)集成與應(yīng)用

    在當(dāng)今大數(shù)據(jù)時(shí)代,組織和企業(yè)需要處理和分析龐大的數(shù)據(jù)量。ETL(Extract, Transform, Load)是一種重要的數(shù)據(jù)集成和處理方法,它在數(shù)據(jù)管理和決策支持中起著關(guān)鍵作用。本文將介紹ETL的基本概念、作用和關(guān)鍵組成部分,以幫助讀者了解ETL的重要性和應(yīng)用領(lǐng)域。 ETL是指數(shù)據(jù)提取

    2024年02月12日
    瀏覽(20)
  • Iceberg從入門到精通系列之三:創(chuàng)建Iceberg表、修改表結(jié)構(gòu)、插入數(shù)據(jù)、刪除表

    Hive語法創(chuàng)建分區(qū)表,不會(huì)在元數(shù)據(jù)創(chuàng)建分區(qū),而是將分區(qū)數(shù)據(jù)轉(zhuǎn)換為Iceberg標(biāo)識(shí)分區(qū)。 這種情況下不能使用Iceberg的分區(qū)轉(zhuǎn)換,例如:days(timestamp),如果想要使用Iceberg格式表的分區(qū)轉(zhuǎn)換標(biāo)識(shí)分區(qū),需要使用Spark或者Flink引擎創(chuàng)建表。 只支持HiveCatalog表修改表屬性,Iceberg表屬性和

    2024年02月11日
    瀏覽(111)
  • Iceberg從入門到精通系列之十:flink sql往Iceberg表插入數(shù)據(jù),Batch模式和Streaming模式查詢數(shù)據(jù)

    僅支持Flink的Batch模式 當(dāng)將數(shù)據(jù)寫入v2表格時(shí),Iceberg支持基于主鍵的UPSERT。有兩種方法可以啟用upsert。 建表時(shí)指定 UPSERT模式下,如果對(duì)表進(jìn)行分區(qū),則分區(qū)字段必須是主鍵。 Batch模式: Streaming模式: 從當(dāng)前快照讀取所有記錄,然后從該快照讀取增量數(shù)據(jù) 讀取指定快照id(不包

    2024年02月12日
    瀏覽(26)
  • FlinkSql 如何實(shí)現(xiàn)數(shù)據(jù)去重?

    很多時(shí)候flink消費(fèi)上游kafka的數(shù)據(jù)是有重復(fù)的,因此有時(shí)候我們想數(shù)據(jù)在落盤之前進(jìn)行去重,這在實(shí)際開發(fā)中具有廣泛的應(yīng)用場(chǎng)景,此處不說詳細(xì)代碼,只粘貼相應(yīng)的flinksql

    2024年02月10日
    瀏覽(16)
  • 數(shù)據(jù)湖08:Apache Iceberg原理和功能介紹

    數(shù)據(jù)湖08:Apache Iceberg原理和功能介紹

    ?系列專題:數(shù)據(jù)湖系列文章 ????????在使用不同的引擎進(jìn)行大數(shù)據(jù)計(jì)算時(shí),需要將數(shù)據(jù)根據(jù)計(jì)算引擎進(jìn)行適配。這是一個(gè)相當(dāng)棘手的問題,為此出現(xiàn)了一種新的解決方案: 介于上層計(jì)算引擎和底層存儲(chǔ)格式之間的一個(gè)中間層 。這個(gè)中間層不是數(shù)據(jù)存儲(chǔ)的方式,只是定義

    2023年04月09日
    瀏覽(33)
  • 火山引擎 Iceberg 數(shù)據(jù)湖的應(yīng)用與實(shí)踐

    火山引擎 Iceberg 數(shù)據(jù)湖的應(yīng)用與實(shí)踐

    在云原生計(jì)算時(shí)代,云存儲(chǔ)使得海量數(shù)據(jù)能以低成本進(jìn)行存儲(chǔ),但是這也給如何訪問、管理和使用這些云上的數(shù)據(jù)提出了挑戰(zhàn)。而 Iceberg 作為一種云原生的表格式,可以很好地應(yīng)對(duì)這些挑戰(zhàn)。本文將介紹火山引擎在云原生計(jì)算產(chǎn)品上使用 Iceberg 的實(shí)踐,和大家分享高效查詢、

    2024年02月09日
    瀏覽(20)
  • [實(shí)戰(zhàn)-10]FlinkSql 如何實(shí)現(xiàn)數(shù)據(jù)去重?

    很多時(shí)候flink消費(fèi)上游kafka的數(shù)據(jù)是有重復(fù)的,因此有時(shí)候我們想數(shù)據(jù)在落盤之前進(jìn)行去重,這在實(shí)際開發(fā)中具有廣泛的應(yīng)用場(chǎng)景,此處不說詳細(xì)代碼,只粘貼相應(yīng)的flinksql

    2024年02月03日
    瀏覽(21)
  • 【大數(shù)據(jù)】Apache Iceberg 概述和源代碼的構(gòu)建

    【大數(shù)據(jù)】Apache Iceberg 概述和源代碼的構(gòu)建

    我們?cè)谑褂貌煌囊孢M(jìn)行大數(shù)據(jù)計(jì)算時(shí),需要將數(shù)據(jù)根據(jù)計(jì)算引擎進(jìn)行適配。這是一個(gè)相當(dāng)棘手的問題,為此出現(xiàn)了一種新的解決方案:介于上層計(jì)算引擎和底層存儲(chǔ)格式之間的一個(gè)中間層。這個(gè)中間層不是數(shù)據(jù)存儲(chǔ)的方式,只是定義了數(shù)據(jù)的元數(shù)據(jù)組織方式,并向計(jì)算引

    2024年02月09日
    瀏覽(24)
  • Flink + Iceberg打造流批一體的數(shù)據(jù)湖架構(gòu)

    Flink + Iceberg打造流批一體的數(shù)據(jù)湖架構(gòu)

    一、背景 1、數(shù)據(jù)倉庫架構(gòu) ????????從Hive表 出倉 到外部系統(tǒng)(ClickHouse、Presto、ES等)帶來的復(fù)雜性和存儲(chǔ)開發(fā)等額外代價(jià),盡量減少這種場(chǎng)景出倉的必要性。 痛點(diǎn):傳統(tǒng) T+1 任務(wù) 海量的TB級(jí) T+ 1 任務(wù)延遲導(dǎo)致下游數(shù)據(jù)產(chǎn)出時(shí)間不穩(wěn)定。 任務(wù)遇到故障重試恢復(fù)代價(jià)昂貴 數(shù)

    2024年02月04日
    瀏覽(23)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包