flink1.14 sql基礎語法(二) flink sql表定義詳解
一、表的概念和類別
1.1 表的標識結構
每一個表的標識由 3 部分組成:
-
catalog name (常用于標識不同的“源”,比如 hive catalog,inner catalog 等)
-
database name(通常語義中的“庫”)
-
table name (通常語義中的“表”)
package cn.yyds.sql;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class _09_FlinkTableDb {
public static void main(String[] args) {
// 1、混合環(huán)境的創(chuàng)建
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 2、建表
TableDescriptor descriptor = TableDescriptor
.forConnector("kafka") // 指定連接器
.schema(
Schema.newBuilder() // 指定表結構
.column("id", DataTypes.INT())
.column("name", DataTypes.STRING())
.column("age", DataTypes.INT())
.column("gender", DataTypes.STRING())
.build()
)
.format("json")
.option("topic","kfa_person")
.option("properties.bootstrap.servers","centos01:9092")
.option("properties.group.id","g1")
.option("scan.startup.mode","earliest-offset")
.option("json.fail-on-missing-field","false")
.option("json.ignore-parse-errors","true")
.build();
Table table = tableEnv.from(descriptor);
// 注冊在默認的catalog和默認的database中
tableEnv.createTemporaryView("kfa_person",table);
// 注冊在默認的catalog和指定的database中
tableEnv.createTemporaryView("ods.kfa_person",table);
// 注冊在指定的catalog和指定的database中(可以和hive整合,保存到mysql中)
tableEnv.createTemporaryView("hive_catalog.ods.kfa_person",table);
}
}
1個flinksql程序在運行時,tableEnvironment 通過持有一個 map 結構來記錄所注冊的 catalog;
public final class CatalogManager {
private static final Logger LOG = LoggerFactory.getLogger(CatalogManager.class);
private final Map<String, Catalog> catalogs;
private final Map<ObjectIdentifier, CatalogBaseTable> temporaryTables;
......
}
1.2 表和視圖
Flinksql中的表,可以是 virtual的 (view 視圖) 和 regular 的 (table 常規(guī)表)
-
table 描述了
一個物理上的外部數(shù)據(jù)源
,如文件、數(shù)據(jù)庫表、kafka 消息 topic -
view 則基于表創(chuàng)建,代表一個或多個表上的一段計算邏輯(就是對一段查詢計劃的邏輯封裝);
不管是 table 還是 view,在 tableAPI 中得到的都是 Table 對象
1.3 臨時和永久
臨時表(視圖) :
- 創(chuàng)建時帶 temporary 關鍵字 (crate temporary view,createtemporary table)
永久表(視圖) :
- 創(chuàng)建時不帶 temporary 關鍵字 (create view ,create table )
臨時表與永久表的本質區(qū)別: schema 信息是否被持久化存儲
臨時表(視圖)
-
表 schema 只維護在所屬 flink session
運行時內存中
-
當所屬的 flink session 結束后表信息將不復存在,且該表無法在 flink session 間共享。
常規(guī)表(視圖)
-
表 schema 可記錄在外部持久化的元數(shù)據(jù)管理器中(比如 hive 的 metastore)
-
當所屬 flink session 結束后,該表信息不會丟失,且在不同 flink session 中都可訪問到該表的信息
。
// sql 定義方式
tableEnv.executeSql("create view view_1 as select .. from projectedTable");
tableEnv.executeSql("create temporary view_2 as select .. from projectedTable");
tableEnv.executeSql("create table (id int,...) with ( 'connector'= ...)");
tableEnv.executeSql("create temporary table (id int,...) with ('connector'= ...)");
// table api方式
tenv.createTemporaryView("v_1", dataStreamschema);
tenv.createTemporaryView("v_1", table);
tenv.createTable("t_1", tableDescriptor);
tenv.createTemporaryTable("t_1", tableDescriptor);
二、表定義概覽
2.1 Table Api創(chuàng)建
Table 對象獲取方式解析:
-
從已注冊的表
-
從 TableDescriptor (連接器/format/schema/options)
-
從 DataStream
-
從 Table 對象上的查詢 api 生成
-
從測試數(shù)據(jù)
涉及的核心參數(shù):
-
已注冊的表名 (catalog name.database_name.object_name)
-
TableDescriptor (表描述器,核心是 connector 連接器)
-
Datastream(底層流)
-
測試數(shù)據(jù)值
package cn.yyds.sql;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import static org.apache.flink.table.api.Expressions.$;
/**
* 創(chuàng)建table的幾種方式
*
* 1、從已注冊的表
* 2、從 TableDescriptor (連接器/format/schema/options)
* 3、從 DataStream
* 4、從 Table 對象上的查詢 api 生成
* 5、從測試數(shù)據(jù)
*/
public class _04_TableCreate {
public static void main(String[] args) {
// 混合環(huán)境的創(chuàng)建
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 1、從 TableDescriptor (連接器/format/schema/options)
TableDescriptor descriptor = TableDescriptor
.forConnector("kafka") // 指定連接器
.schema(
Schema.newBuilder() // 指定表結構
.column("id", DataTypes.INT())
.column("name", DataTypes.STRING())
.column("age", DataTypes.INT())
.column("gender", DataTypes.STRING())
.build()
)
.format("json")
.option("topic","t_kafka_1")
.option("properties.bootstrap.servers","centos01:9092")
.option("properties.group.id","g1")
.option("scan.startup.mode","earliest-offset")
.option("json.fail-on-missing-field","false")
.option("json.ignore-parse-errors","true")
.build();
Table table1 = tableEnv.from(descriptor);
// 2、從已注冊的表
Table table2 = tableEnv.from("t_kafka_1");
// 3、從 DataStream
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
// 設置訂閱的目標主題
.setTopics("tp01")
// 設置消費者組id
.setGroupId("gp01")
// 設置kafka服務器地址
.setBootstrapServers("centos01:9092")
// 起始消費位移的指定:
// OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST) 消費起始位移選擇之前所提交的偏移量(如果沒有,則重置為LATEST)
// OffsetsInitializer.earliest() 消費起始位移直接選擇為 “最早”
// OffsetsInitializer.latest() 消費起始位移直接選擇為 “最新”
// OffsetsInitializer.offsets(Map<TopicPartition,Long>) 消費起始位移選擇為:方法所傳入的每個分區(qū)和對應的起始偏移量
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
// 設置value數(shù)據(jù)的反序列化器
.setValueOnlyDeserializer(new SimpleStringSchema())
// 開啟kafka底層消費者的自動位移提交機制
// 它會把最新的消費位移提交到kafka的consumer_offsets中
// 就算把自動位移提交機制開啟,KafkaSource依然不依賴自動位移提交機制
// (宕機重啟時,優(yōu)先從flink自己的狀態(tài)中去獲取偏移量<更可靠>)
.setProperty("auto.offset.commit", "true")
.build();
// env.addSource(); // 接收的是 SourceFunction接口的 實現(xiàn)類
DataStreamSource<String> streamSource = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kfk-source");// 接收的是 Source 接口的實現(xiàn)類
Table table3 = tableEnv.fromDataStream(streamSource);
// 4、從 Table 對象上的查詢 api 生成
Table table4 = table1.groupBy($("gender"))
.select($("gender"), $("age").avg().as("avg_age"));
// 5、從測試數(shù)據(jù)
Table table5 = tableEnv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("id", DataTypes.INT()),
DataTypes.FIELD("name", DataTypes.STRING()),
DataTypes.FIELD("info", DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING())),
DataTypes.FIELD("ts1", DataTypes.TIMESTAMP(3)),
DataTypes.FIELD("ts3", DataTypes.TIMESTAMP_LTZ(3))
),
Row.of(1, "a", null, "2023-02-02 13:00:00.200", 1654236105000L)
);
}
}
2.2 Table Sql創(chuàng)建
注冊 sql表 (視圖)方式
-
從已存在的 datastream 注冊
-
從已存在的 Table 對象注冊
-
從 TableDescriptor (連接器)注冊
-
執(zhí)行 Sql 的 DDL 語句來注冊
package cn.yyds.sql;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableDescriptor;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
/**
* 注冊 sql表 (視圖)方式
* 從已存在的 datastream 注冊
* 從已存在的 Table 對象注冊
* 從 TableDescriptor (連接器)注冊
* 執(zhí)行 Sql 的 DDL 語句來注冊
*/
public class _04_SqlCreate {
public static void main(String[] args) {
// 混合環(huán)境的創(chuàng)建
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 1、從 TableDescriptor (連接器)注冊
TableDescriptor descriptor = TableDescriptor
.forConnector("kafka") // 指定連接器
.schema(
Schema.newBuilder() // 指定表結構
.column("id", DataTypes.INT())
.column("name", DataTypes.STRING())
.column("age", DataTypes.INT())
.column("gender", DataTypes.STRING())
.build()
)
.format("json")
.option("topic","t_kafka_1")
.option("properties.bootstrap.servers","centos01:9092")
.option("properties.group.id","g1")
.option("scan.startup.mode","earliest-offset")
.option("json.fail-on-missing-field","false")
.option("json.ignore-parse-errors","true")
.build();
tableEnv.createTable("kfk_person",descriptor);
// 2、從已存在的 datastream 注冊
KafkaSource<String> kafkaSource = KafkaSource.<String>builder()
// 設置訂閱的目標主題
.setTopics("tp01")
// 設置消費者組id
.setGroupId("gp01")
// 設置kafka服務器地址
.setBootstrapServers("centos01:9092")
// 起始消費位移的指定:
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
// 設置value數(shù)據(jù)的反序列化器
.setValueOnlyDeserializer(new SimpleStringSchema())
.setProperty("auto.offset.commit", "true")
.build();
DataStreamSource<String> streamSource = env.fromSource(kafkaSource, WatermarkStrategy.noWatermarks(), "kfk-source");// 接收的是 Source 接口的實現(xiàn)類
tableEnv.createTemporaryView("kfk_source",streamSource);
// 3、從已存在的 Table 對象注冊
Table table = null;
tableEnv.createTemporaryView("k_table",table);
// 4、執(zhí)行 Sql 的 DDL 語句來注冊
tableEnv.executeSql("create table t_kafka_1(\n" +
" id int,\n" +
" name string,\n" +
" age int,\n" +
" gender string\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 't_kafka_1',\n" +
" 'properties.bootstrap.servers' = 'centos01:9092',\n" +
" 'properties.group.id' = 'g1',\n" +
" 'format' = 'json',\n" +
" 'scan.startup.mode' = 'earliest-offset',\n" +
" 'json.fail-on-missing-field' = 'false',\n" +
" 'json.ignore-parse-errors' = 'true'\n" +
")");
}
}
三、Catalog詳解
3.1 catalog概念
catalog 就是一個元數(shù)據(jù)空間,簡單說就是記錄、獲取元數(shù)據(jù)(表定義信息)的實體。
flink sql 在運行時,可以擁有多個 catalog,它們由 catalogManager 模塊來注冊、管理。
CatalogManager 中可以注冊多個元數(shù)據(jù)空間。
1、環(huán)境創(chuàng)建之初,就會初始化一個默認的元數(shù)據(jù)空間
-
空間名稱: default_catalog
-
空間實現(xiàn)類: GenericInMemoryCatalog(基于內存)
public class GenericInMemoryCatalog extends AbstractCatalog {
public static final String DEFAULT_DB = "default";
// 用于記錄 本catalog空間所有database
private final Map<String, CatalogDatabase> databases;
// 用于記錄 本catalog空間所有table
private final Map<ObjectPath, CatalogBaseTable> tables;
......
}
2、用戶還可以向環(huán)境中注冊更多的 catalog,如下代碼新增注冊了一個 hivecatalog
// 創(chuàng)建hive元數(shù)據(jù)空間的實現(xiàn)對象
HiveCatalog hiveCatalog = new HiveCatalog("hive", "default", "d:/conf/hiveconf");
// 將hive的元數(shù)據(jù)對象注冊到環(huán)境中
tableEnv.registerCatalog("hive_catalog",hiveCatalog);
注意:需要導入jar包,并把hive-site.xml的配置文件放入到hiveconf目錄下
<!--flink-sql使用hive-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-hive-3.1.2_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
3.2 臨時表與永久表的底層差異
結論 1: 如果選擇 hive 元數(shù)據(jù)空間來創(chuàng)建表、視圖,則
-
永久表(視圖)的元信息,都會被寫入 hive 的元數(shù)據(jù)管理器中,從而可以實現(xiàn)永久存
-
在臨時表(視圖)的元信息,并不會寫入 hive 的元數(shù)據(jù)管理其中,而是放在 catalogManager 的一個 temporaryTables 的內存 hashmap 中記錄
-
臨時表空間中的表名(全名) 如果與 hive 空間中的表名相同,則查詢時會優(yōu)先選擇臨時表空間的表
結論 2: 如果選擇 GenericInMemoryCatalog 元數(shù)據(jù)空間來創(chuàng)建表、視圖,則
-
永久表(視圖)的元信息,都會被寫入 GenericInMemoryCatalog 的元數(shù)據(jù)管理器中(
內存中
) -
臨時表(視圖)的元信息,放在 catalogManager 的一個 temporaryTables 的內存 hashmap 中記
3.3 理解Hive catalog
flink sql利用 hive catalog 來建表 (查詢、修改、刪除表),本質上只是利用了 hive 的 metastore 服務
更具體來說,flinksql 只是把 flinksal 的表定義信息,按照 hive 元數(shù)據(jù)的形式,托管到 hive 的 metastore中而已。
當然,hive 中也能看到這些托管的表信息,但是,并不能利用它底層的 mapreduce 或者 spark 引擎來查詢這些表
因為 mapreduce 或者 spark 引擎,并不能理解 flinksql 表定義中的信息,也無法為這些定義信息提供相應的組件去讀取數(shù)據(jù)(比如,mr 或者 spark 就沒有 flinksql 中的各種 connector 組件)
四、表定義詳解
定義表時所需的核心要素
-
表名 (catalog_name.database_name.object_name)
-
TableDescriptor
TableDescriptor 核心要素
-
Schema 表結構(字段)
-
Format 數(shù)據(jù)格式
-
Connector 連接器
-
Option 連接器參數(shù)
4.1 Schema字段定義詳解
4.1.1 physical column(物理字段)
物理字段: 源自于外部存儲
系統(tǒng)本身 schema 中的字段
如 kafka 消息的 key、value (json 格式)中的字段;mysql表中的字段…
-- 一些連接器需要設置主鍵,例如upsert-kafka,因為支持change-log流
-- 單字段主鍵約束語法
id INT PRIMARY KEY NOT ENFORCED ,
name STRING
-- 多字段主鍵約束語法:
id,
name,
PRIMARY KEY(id,name) NOT ENFORCED
4.1.2 computed column(表達式字段)
表達式字段(邏輯字段) : 在物理字段上施加一個 sql 表達式,并將表達式結果定義為一個字段。
4.1.3 metadata column(元數(shù)據(jù)字段)
元數(shù)據(jù)字段: 來源于 connector 從外部存儲系統(tǒng)中獲取到的 外部系統(tǒng)元信息
? 比如,kafka 的消息,通常意義上的數(shù)據(jù)內容是在 record 的 key 和 value 中的,而實質上 (底層角度來看), kafka 中的每一條 record,不光帶了 key 和 value 數(shù)據(jù)內容,還帶了這條record 所屬的 topic,所屬的 partition,所在的 offset,以及 record 的 timetamp 和 timestamp 類型等“元信息”。
? fink 的 connector 可以獲取并暴露這些元信息,并允許用戶將這些信息定義成 flinksal表中的字段
官網(wǎng)中可以查到暴露的元數(shù)據(jù)字段
比如kafka元數(shù)據(jù)字段: https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/kafka/
// DDL方式
tableEnv.executeSql("create table t_kafka_person(\n" +
" id int, -- 物理字段\n" +
" name string, -- 物理字段\n" +
" nick string, -- 物理字段\n" +
" age int, -- 物理字段\n" +
" big_age as age + 10, -- 表達式字段\n" +
" my_offset bigint METADATA FROM 'offset', --元數(shù)據(jù)字段,來自kafka\n" +
" ts TIMESTAMP_LTZ(3) METADATA FROM 'timestamp', --元數(shù)據(jù)字段,來自kafka\n" +
" gender string\n" +
") WITH (\n" +
" 'connector' = 'kafka',\n" +
" 'topic' = 't_kafka_2',\n" +
" 'properties.bootstrap.servers' = 'centos01:9092',\n" +
" 'properties.group.id' = 'g1',\n" +
" 'format' = 'json',\n" +
" 'scan.startup.mode' = 'earliest-offset',\n" +
" 'json.fail-on-missing-field' = 'false',\n" +
" 'json.ignore-parse-errors' = 'true'\n" +
")");
// API 方式
TableDescriptor descriptor = TableDescriptor
.forConnector("kafka") // 指定連接器
.schema(
Schema.newBuilder() // 指定表結構
.column("id", DataTypes.INT()) //column是物理字段
.column("name", DataTypes.STRING()) //column是物理字段
.column("nick", DataTypes.STRING()) //column是物理字段
.column("age", DataTypes.INT()) //column是物理字段
.column("gender", DataTypes.STRING()) //column是物理字段
.columnByExpression("big_age","age + 10") // 聲明表達式字段
.columnByMetadata("my_offset",DataTypes.BIGINT(),"offset") // 聲明元數(shù)據(jù)字段
// 聲明元數(shù)據(jù)字段 isVirtual表示,當這個表被當作sink表時候,該字段是否出現(xiàn)在schema中
.columnByMetadata("ts",DataTypes.TIMESTAMP_LTZ(3),"timestamp",true)
/*.primaryKey("id")*/ // 主鍵約束,upsert-kafka需要填寫主鍵
.build()
)
.format("json")
.option("topic","t_kafka_2")
.option("properties.bootstrap.servers","centos01:9092")
.option("properties.group.id","g1")
.option("scan.startup.mode","earliest-offset")
.option("json.fail-on-missing-field","false")
.option("json.ignore-parse-errors","true")
.build();
4.2 format概述
connector 連接器在對接外部存儲時,根據(jù)外部存儲中的數(shù)據(jù)格式不同,需要用到不同的 format 組件
format 組件的作用就是:告訴連接器,如何解析外部存儲中的數(shù)據(jù)及映射到表 schema
format 組件的使用要點
-
導入 format 組件的 jar 包依賴
-
指定 format 組件的名稱
-
設置 format 組件所需的參數(shù)(不同 format 組件有不同的參數(shù)配置需求)
官網(wǎng):https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/formats/overview/
4.2.1 json format
官網(wǎng):https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/formats/json/
1、需要引入依賴
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>1.14.4</version>
</dependency>
2、常用參數(shù)
參數(shù) | 是否必須 | 默認值 | 類型 | 描述 |
---|---|---|---|---|
format | required | (none) | String | 組件名json |
json.fail-on-missing-field | optional | false | Boolean | 缺失字段是否失敗 |
json.ignore-parse-errors | optional | false | Boolean | 是否忽略json解析錯誤 |
json.timestamp-format.standard | optional | 'SQL' |
String | json中timestamp類型字段格式 |
json.map-null-key.mode | optional | 'FAIL' |
String | 可選值'FAIL' , 'DROP' ,'LITERAL'
|
json.map-null-key.literal | optional | ‘null’ | String | 替換null的字符串 |
3、數(shù)據(jù)類型映射
Flink SQL type | JSON type |
---|---|
CHAR / VARCHAR / STRING |
string |
BOOLEAN |
boolean |
BINARY / VARBINARY |
string with encoding: base64 |
DECIMAL |
number |
TINYINT |
number |
SMALLINT |
number |
INT |
number |
BIGINT |
number |
FLOAT |
number |
DOUBLE |
number |
DATE |
string with format: date |
TIME |
string with format: time |
TIMESTAMP |
string with format: date-time |
TIMESTAMP_WITH_LOCAL_TIME_ZONE |
string with format: date-time (with UTC time zone) |
INTERVAL |
number |
ARRAY |
array |
MAP / MULTISET |
object |
ROW |
object |
4、使用案例(復雜json解析)
package cn.yyds.sql;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
* 文件中有如下的數(shù)據(jù):
*
* {"id":10, "name":"tom", "age":28, "ts":"2023-03-02 00:00:00.000"}
*/
public class _10_JsonFormatTest1 {
public static void main(String[] args) {
// 創(chuàng)建環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.executeSql("create table t_kafka_p(\n" +
" id int,\n" +
" name string,\n" +
" age int,\n" +
" ts TIMESTAMP(3)\n" +
") WITH (\n" +
" 'connector' = 'filesystem',\n" +
" 'path' = 'file:///D:/works/flink-live/files/sql-data/test1.txt',\n" +
" 'format' = 'json',\n" +
" 'json.ignore-parse-errors' = 'true'\n" +
")");
tableEnv.executeSql("select * from t_kafka_p").print();
}
}
+----+-------------+--------------------------------+-------------+-------------------------+
| op | id | name | age | ts |
+----+-------------+--------------------------------+-------------+-------------------------+
| +I | 10 | tom | 28 | 2023-03-02 00:00:00.000 |
+----+-------------+--------------------------------+-------------+-------------------------+
復雜json類型的解析
{
"id":1238123899121,
"name":"hank",
"date":"2022-10-14",
"obj":{
"time1":"12:12:43Z",
"str":"sfasfafs",
"lg":2324342345
},
"arr":[
{
"f1":"f1str11",
"f2":134
},
{
"f1":"f1str22",
"f2":555
}
],
"time":"12:12:43Z",
"timestamp":"2022-10-14T12:12:43Z",
"map":{
"flink":123
},
"mapinmap":{
"inner_map":{
"key":234
}
}
}
-- 復雜json解析的表定義
CREATE TABLE json_source (
id BIGINT,
name STRING,
`date` DATE,
obj ROW<time1 TIME,str STRING,lg BIGINT>,
arr ARRAY<ROW<f1 STRING,f2 INT>>,
`time` TIME,
`timestamp` TIMESTAMP(3),
`map` MAP<STRING,BIGINT>,
mapinmap MAP<STRING,MAP<STRING,INT>>,
proctime as PROCTIME()
) WITH (
'connector' = 'filesystem',
'path' = 'file:///D:\doit\works\flink-live\files\sql-data\test3.txt',
'format' = 'json',
'json.ignore-parse-errors' = 'true'
);
-- 從表中獲取數(shù)據(jù)
-- 注意數(shù)組index從1開始
select id, name,`date`,obj.str,arr[1].f1,`map`['flink'],mapinmap['inner_map']['key'] from json_source;
4.2.2 csv format
官網(wǎng): https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/formats/csv/
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>1.14.4</version>
</dependency>
參數(shù)解釋
參數(shù) | 是否必須 | 默認值 | 類型 | 描述 |
---|---|---|---|---|
format | required | (none) | String | csv |
csv.field-delimiter | optional | , |
String | 分割符 |
csv.allow-comments | optional | false | Boolean | 是否允許注釋'默認#開頭注釋'
|
csv.ignore-parse-errors | optional | false | Boolean | 是否忽略解析錯誤 |
csv.array-element-delimiter | optional | ; |
String | 數(shù)組元素之間分隔符 |
csv.escape-character | optional | (none) | String | 轉義字符 |
csv.null-literal | optional | (none) | String | null的字面量字符串 |
4.3 watermark和時間屬性
時間屬性定義,主要是用于各類基于時間的運算操作(如基于時間窗口的查詢計算)。
4.3.1 eventTime和watermark定義
核心要點:
-
需要一個 timestamp(3)類型字段(可以是物理字段,也可以是表達式字段,也可以是元數(shù)據(jù)字段)
-
需要用一個 watermarkExpression 來指定 watermark 策略
package cn.yyds.sql;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
public class _11_SqlWatermark {
public static void main(String[] args) {
// 創(chuàng)建環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// guid,uuid,eventId,pageId,ts
DataStreamSource<String> sourceStream = env.socketTextStream("centos04", 9999);
SingleOutputStreamOperator<EventBean> mapStream = sourceStream.map(line -> {
String[] arr = line.split(",");
return new EventBean(Integer.parseInt(arr[0]), arr[1], arr[2], arr[3], Long.parseLong(arr[4]));
});
// 分配wm
SingleOutputStreamOperator<EventBean> wmStream = mapStream.assignTimestampsAndWatermarks(
WatermarkStrategy.<EventBean>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<EventBean>() {
@Override
public long extractTimestamp(EventBean eventBean, long l) {
return eventBean.getTs();
}
})
);
// 轉換為table
Table table = tableEnv.fromDataStream(wmStream,
Schema.newBuilder()
// 聲明表達式字段,并聲明為 processing time 屬性字段
// .columnByExpression("pt","proctime()")
// 聲明表達式字段
.columnByExpression("rt","to_timestamp_ltz(ts, 3)")
// 將 rt 字段指定為 event time 屬性字段,并基于它指定 watermark 策略: = rt
.watermark("rt","rt")
// 將 rt 字段指定為 event time 屬性字段,并基于它指定 watermark 策略: = rt-8s
.watermark("rt","rt - interval '8' second")
// 將 rt 字段指定為 event time 屬性字段,并沿用“源頭流”的 watermark
.watermark("rt","source_watermark()")
.build()
);
table.printSchema();
}
}
-- DDL方式定義水位線
-- {"id":1,"eventId":"e1","ts":1679754806020,"pageId":"p01"}
--加上水位線及處理時間
create table t_kafka_wm(
id int,
eventId string,
ts bigint,
pageId string, -- 物理字段
pt as PROCTIME(), -- 聲明處理時間
wc_time as TO_TIMESTAMP_LTZ(ts, 3), -- 表達式字段,將long轉換為TIMESTAMP_LTZ
WATERMARK FOR wc_time AS wc_time - INTERVAL '5' SECOND -- 水位線
) WITH (
'connector' = 'kafka',
'topic' = 't_kafka_3',
'properties.bootstrap.servers' = 'centos01:9092',
'properties.group.id' = 'g1',
'format' = 'json',
'scan.startup.mode' = 'earliest-offset',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true'
)
4.3.2 processing time
定義一個表達式字段,并用表達式 proctime()
將其聲明為 processing time 即可;
// 轉換為table
Table table = tableEnv.fromDataStream(wmStream,
Schema.newBuilder()
// 聲明表達式字段,并聲明為 processing time 屬性字段
.columnByExpression("pt","proctime()")
.build()
)
4.3.3 表和流之間水位線的傳遞
4.3.3.1 流轉表的時候
流轉表的過程中,無論“源流”是否存在 watermark,都不會自動傳遞 watermark
如需時間運算(如時間窗口等),需要在轉換定義中顯式聲明 watermark 策略
- 先設法定義一個
timestamp(3)
或者timestamp_ltz(3)
類型的字段 (可以來自于數(shù)據(jù)字段,也可以來自于一個元數(shù)據(jù): rowtime)
rt as to_timestamp_ltz(ts,3) -- 從一個bigint中得到timestamp(3)類型的字段
rt timestamp(3) metadata from 'rowtime'
- 然后基于該字段,用 watermarkExpression 聲明 watermark 策略
watermark for rt AS rt - interval '1' second
watermark for rt AS source_watermark() -- 代表使用底層流的 watermark 策略
4.3.3.2 表轉流的時候
源表定義了 wartermark 策略,則將表轉成流時,將會自動傳遞源表的 watermark
。
/**
* 前提:table是一個存在watermark的表對象
*/
tableEnv.toDataStream(table)
.process(new ProcessFunction<Row, String>() {
@Override
public void processElement(Row value, Context ctx, Collector<String> out) throws Exception {
long watermark = ctx.timerService().currentWatermark();
System.out.println(watermark + "=>" + value);
}
}).print();
4.4 connector詳解
-
connector 通常是用于對接外部存儲建表(源表或目標表)時的映射器、橋接器
-
connector 本質上是對 flink 的 table source /table sink 算子的封裝
連接器使用的核心要素
-
1、導入連接器jar 包依賴
-
2、指定連接器類型名
-
3、指定連接器所需的參數(shù) (不同連接器有不同的參數(shù)配置需求)
-
4、獲取連接器所提供的元數(shù)據(jù)
flink1.14支持的連接器
4.4.1 kafka連接器
產生的數(shù)據(jù)以及能接受的數(shù)據(jù)流,是 append-only 流
(只有 +I 這種 changemode)
所需依賴
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>1.14.4</version>
</dependency>
入門案例
CREATE TABLE KafkaTable (
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING,
`ts` TIMESTAMP(3) METADATA FROM 'timestamp'
) WITH (
'connector' = 'kafka',
'topic' = 'user_behavior',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'scan.startup.mode' = 'earliest-offset',
'format' = 'csv'
)
復雜案例
解析kafka生產者產生具有key以及headers的數(shù)據(jù)
package cn.yyds.sql;
import com.alibaba.fastjson.JSONObject;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.*;
/**
* 生產者生產數(shù)據(jù)
*/
public class _12_KafkaProducer {
public static void main(String[] args) throws InterruptedException {
// 泛型 K: 要發(fā)送的數(shù)據(jù)中的key
// 泛型 V: 要發(fā)送的數(shù)據(jù)中的value
// 隱含之意: kafka中的 message,是 Key-value結構的 (可以沒有key)
Properties props = new Properties();
// 因為kafka底層的存儲是沒有類型維護機制的,用戶所發(fā)的所有數(shù)據(jù)類型,都必須變成 序列化后的byte[]
// 所以,kafka的producer需要一個針對用戶要發(fā)送的數(shù)據(jù)類型的序列化工具類
// 且這個序列化工具類,需要實現(xiàn)kafka所提供的序列工具接口: org.apache.kafka.common.serialization.Serializer
props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "centos01:9092,centos02:9092,centos03:9092");
props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.setProperty(ProducerConfig.ACKS_CONFIG, "all"); // 消息發(fā)送應答級別
// 構造一個生產者客戶端
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// 檢查是否發(fā)送成功的消費者命令:
// kafka-console-consumer.sh --bootstrap-server centos01:9092 --topic abcd
for(int i = 0; i < 10; i++){
// 將業(yè)務數(shù)據(jù)封裝成客戶端所能發(fā)送的封裝格式
// 0->abc0
// 1->abc1
List<Header> headers = new ArrayList<>();
headers.add(new RecordHeader("k1", "v1".getBytes()));
JSONObject jsonObject = new JSONObject();
jsonObject.put("guid",i);
jsonObject.put("pageId","page" + i);
jsonObject.put("eventId","e" + i);
jsonObject.put("eventTime",System.currentTimeMillis());
ProducerRecord<String, String> message = new ProducerRecord<>("abcd", 0, "key_" + (i % 3), jsonObject.toJSONString(),headers);
// 調用客戶端去發(fā)送
// 數(shù)據(jù)的發(fā)送動作在producer的底層是異步線程去異步發(fā)送的
producer.send(message);
Thread.sleep(100);
}
// 關閉客戶端
producer.close();
}
}
{"eventId":"e0","eventTime":1680615780889,"guid":0,"pageId":"page0"}
{"eventId":"e1","eventTime":1680615781420,"guid":1,"pageId":"page1"}
{"eventId":"e2","eventTime":1680615781521,"guid":2,"pageId":"page2"}
{"eventId":"e3","eventTime":1680615781622,"guid":3,"pageId":"page3"}
{"eventId":"e4","eventTime":1680615781724,"guid":4,"pageId":"page4"}
{"eventId":"e5","eventTime":1680615781825,"guid":5,"pageId":"page5"}
{"eventId":"e6","eventTime":1680615781925,"guid":6,"pageId":"page6"}
{"eventId":"e7","eventTime":1680615782027,"guid":7,"pageId":"page7"}
{"eventId":"e8","eventTime":1680615782129,"guid":8,"pageId":"page8"}
{"eventId":"e9","eventTime":1680615782229,"guid":9,"pageId":"page9"}
·
-- 解析kafka
create table t_kafka_w(
guid int,
pageId string,
eventId string,
eventTime bigint,
msgkey string,
`partition` bigint METADATA VIRTUAL,
`offset` bigint METADATA VIRTUAL,
`headers` MAP<string,bytes> METADATA FROM 'headers'
) WITH (
'connector' = 'kafka',
'topic' = 'abcd',
'properties.bootstrap.servers' = 'centos01:9092',
'properties.group.id' = 'g1',
--'format' = 'json',
'key.format' = 'raw', -- 解析key用raw
'key.fields' = 'msgkey',
'value.format' = 'json', -- 解析value用json
-- 解析key的值是,要加上 'value.fields-include' = 'EXCEPT_KEY' 參數(shù)
-- 不然這個 key_field列也會被當成 value 的一部分參與 value 的解析,從而導致解析不出來數(shù)據(jù)
'value.fields-include' = 'EXCEPT_KEY',
'scan.startup.mode' = 'earliest-offset'
)
-- 查找數(shù)據(jù)
select guid,pageId,eventId,eventTime,msgkey,`partition`,`offset`,cast(headers['k1'] as string) as headers_value from t_kafka_w
+----+-------------+--------------------------------+--------------------------------+----------------------+--------------------------------+----------------------+----------------------+--------------------------------+
| op | guid | pageId | eventId | eventTime | msgkey | partition | offset | headers_value |
+----+-------------+--------------------------------+--------------------------------+----------------------+--------------------------------+----------------------+----------------------+--------------------------------+
| +I | 0 | page0 | e0 | 1680615780889 | key_0 | 0 | 0 | v1 |
| +I | 1 | page1 | e1 | 1680615781420 | key_1 | 0 | 1 | v1 |
| +I | 2 | page2 | e2 | 1680615781521 | key_2 | 0 | 2 | v1 |
| +I | 3 | page3 | e3 | 1680615781622 | key_0 | 0 | 3 | v1 |
| +I | 4 | page4 | e4 | 1680615781724 | key_1 | 0 | 4 | v1 |
| +I | 5 | page5 | e5 | 1680615781825 | key_2 | 0 | 5 | v1 |
| +I | 6 | page6 | e6 | 1680615781925 | key_0 | 0 | 6 | v1 |
| +I | 7 | page7 | e7 | 1680615782027 | key_1 | 0 | 7 | v1 |
| +I | 8 | page8 | e8 | 1680615782129 | key_2 | 0 | 8 | v1 |
| +I | 9 | page9 | e9 | 1680615782229 | key_0 | 0 | 9 | v1 |
4.4.2 upsert kafka連接器
所需依賴和kafka相同。
作為source
根據(jù)所定義的主鍵
,將讀取到的數(shù)據(jù)轉換為 +I/-U/+U 記錄,如果讀到 null,則轉換為-D 記錄。
-- kafka 中假設有如下數(shù)據(jù)
1,zs,18
1,zs,28
-- kafka-connector產生出 appendonly 流
+I[1,zs,18]
+I[1,zs,28]
-- upsert-kafka-connector 產生出 upsert 模式的 changelog 流
+I [1,zs,18]
-U [1,zs,18]
+U [1,zs,28]
作為sink
-
對于 -U/+U/+I 記錄,都以正常的 append 消息寫入 kafka
-
對于-D 記錄,則寫入一個 null 到 kafka 來表示 delete 操作:
案例
package cn.yyds.sql;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
public class _13_UpsertKafka {
public static void main(String[] args) {
// 創(chuàng)建環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 創(chuàng)建測試數(shù)據(jù)
Table table = tableEnv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("province", DataTypes.STRING()),
DataTypes.FIELD("user_id", DataTypes.STRING())
),
Row.of("sh","u001"),
Row.of("sh","u002"),
Row.of("sh","u003")
);
tableEnv.createTemporaryView("s_source",table);
// 創(chuàng)建upsert-kafka sink表
tableEnv.executeSql("create table t_upsert_kafka_w(\n" +
" province string,\n" +
" pv bigint, \n" +
" primary key(province) not enforced -- 需要設置主鍵字段 \n" +
") WITH (\n" +
" 'connector' = 'upsert-kafka',\n" +
" 'topic' = 't_upsert_kafka',\n" +
" 'properties.bootstrap.servers' = 'centos01:9092',\n" +
" 'key.format' = 'csv',\n" +
" 'value.format' = 'csv'\n" +
")");
tableEnv.executeSql("insert into t_upsert_kafka_w select province,count(distinct user_id) as uv from s_source group by province");
/**
*+----+--------------------------------+----------------------+
* | op | province | uv |
* +----+--------------------------------+----------------------+
* | +I | sh | 1 |
* | -U | sh | 1 |
* | +U | sh | 2 |
* | -U | sh | 2 |
* | +U | sh | 3 |
* +----+--------------------------------+----------------------+
*/
// 從kafka讀取結果
tableEnv.executeSql("select * from t_upsert_kafka_w").print();
}
}
4.4.3 jdbc連接器
jdbc connector作為source有如下特性
-
可作為scan source,底層產生bounded stream
-
可作為 lookup source,底層是“事件驅動"式查詢??梢詫dbc連接器作為一個維表進行時態(tài)關聯(lián)。
具體可參考:flink1.14 sql基礎語法(一) flink sql表查詢詳解
jdbc connector作為sink有如下特性
-
可作為 Batch 模式的sink
-
可作為Stream模式下的append sink和upsert sink
所需依賴
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.12</artifactId>
<version>1.14.4</version>
</dependency>
根據(jù)所連接的數(shù)據(jù)庫不同,還需要相應的 jdbc 驅動,比如連接 mysql
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.49</version>
</dependency>
冪等寫出
-
jdbc connector 可以利用目標數(shù)據(jù)庫的特性,實現(xiàn)冪等寫出
-
冪等寫出可以避免在 failover 發(fā)生后的可能產生的數(shù)據(jù)重復
實現(xiàn)冪等寫出,本身并不需要對jdbc connector 做額外的配置,只需要指定主鍵字段
,jdbc connector 就會利用目標數(shù)據(jù)庫的 upsert 語法,來實現(xiàn)冪等寫出。
package cn.yyds.sql;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
public class _14_UpsertJdbcSink {
public static void main(String[] args) {
// 創(chuàng)建環(huán)境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
Table table = tableEnv.fromValues(
DataTypes.ROW(
DataTypes.FIELD("province", DataTypes.STRING()),
DataTypes.FIELD("user_id", DataTypes.STRING())
),
Row.of("sh","u001"),
Row.of("sh","u002"),
Row.of("sh","u003")
);
tableEnv.createTemporaryView("s_source",table);
// 創(chuàng)建jdbc sink表
tableEnv.executeSql("create table t_province_uv(\n" +
" province string,\n" +
" uv bigint, \n" +
" primary key(province) not enforced \n" +
") with(\n" +
" 'connector' = 'jdbc',\n" +
" 'url' = 'jdbc:mysql://localhost:3306/test?serverTimezone=UTC',\n" +
" 'table-name' = 't_province_uv_res',\n" +
" 'driver' = 'com.mysql.jdbc.Driver',\n" +
" 'username' = 'root',\n" +
" 'password' = 'root'\n" +
")");
tableEnv.executeSql("insert into t_province_uv select province,count(distinct user_id) as uv from s_source group by province");
/**
* +----+--------------------------------+----------------------+
* | op | province | uv |
* +----+--------------------------------+----------------------+
* | +I | sh | 3 |
* +----+--------------------------------+----------------------+
*/
// 從kafka讀取結果
tableEnv.executeSql("select * from t_province_uv").print();
}
}
分區(qū)并行讀取 (partitioned scan)
jdbc connector 持有一個多并行度的 source task,因而可以多并行度加快表數(shù)據(jù)的讀取
通過設置如下參數(shù)即可實現(xiàn)多并行讀取
-
scan.partition.column: 劃分并行任務的參照列
-
scan.partition.num: 任務并行數(shù)
-
scan.partition.lower-bound: 首分區(qū)的參照字段最小值
-
scan.partition.upper-bound: 末分區(qū)的參照字段最大值
分區(qū)參照字段必須是: numeric, date,或 timestamp 類型
4.4.4 filesystem連接器
filesystem connector 表特性
-
可讀可寫
-
作為 source 表時,支持持續(xù)監(jiān)視讀取目錄下新文件,且每個新文件只會被讀取一次
-
作為 sink 表時,支持 多種文件格式、分區(qū)、文件滾動、壓縮設置等功能
CREATE TABLE MyUserTable (
column_name1 INT,
column_name2 STRING,
...
part_name1 INT,
part_name2 STRING
)
PARTITIONED BY (part_name1, part_name2)
WITH (
'connector' = 'filesystem', -- 必填: 指定連接器名稱
'path' = 'file:///path/to/whatever', -- 必填: 目錄路徑
'format' = '...', -- 必填: 文件系統(tǒng)連接器要求指定一個format格式化
'partition.default-name' = '...', -- 可選: 如果動態(tài)分區(qū)字段值為null/空字符串,則使用指定的默認分區(qū)名稱
'sink.shuffle-by-partition.enable' = '...', --可選:在sink階段開啟對動態(tài)分區(qū)文件數(shù)據(jù)的shuffle,開啟之后可以減少寫出文件的數(shù)量,但是有可能造成數(shù)據(jù)傾斜。默認為false。
...
);
1、分區(qū)文件
文件系統(tǒng)分區(qū)支持使用標準的hive format格式,而且,它不要求分區(qū)被預注冊在表的catalog中。分區(qū)通過目錄結構來進行發(fā)現(xiàn)和推斷。比如,下面基于目錄的表分區(qū)將會被推斷為包含日期和小時分區(qū)。
path
└── datetime=2019-08-25
└── hour=11
├── part-0.parquet
├── part-1.parquet
└── hour=12
├── part-0.parquet
└── datetime=2019-08-26
└── hour=6
├── part-0.parquet
12345678910
使用insert overwrite
覆蓋一個分區(qū)表時,只有相關聯(lián)的分區(qū)被覆蓋,而不是整張表。
2、文件format
文件系統(tǒng)連接器支持多種format格式:
- CSV: RFC-4180. 未壓縮
- JSON: 注意,文件系統(tǒng)的JSON格式并不是標準的JSON文件,而是未壓縮的newline delimited JSON。
- Avro: Apache Avro. 支持通過配置avro.codec來支持壓縮。
- Parquet: Apache Parquet. 兼容Hive.
- Orc: Apache Orc. 兼容Hive.
- Debezium-JSON: debezium-json.
- Canal-JSON: canal-json.
- Raw: raw.
3、Source
file system 連接器在單個表中可以被用于讀取單個文件,或者是整個目錄。
當使用目錄作為 source 路徑時,目錄中的文件并沒有定義好的讀取順序。
目錄監(jiān)控
默認情況下,file system 連接器是有界的,該連接器只會讀取一次配置的目錄,然后關閉它。
你可以通過配置 option source.monitor-interval 選項配置持續(xù)的目錄監(jiān)控:
Key | 默認值 | 類型 | 描述 |
---|---|---|---|
source.monitor-interval | (none) | Duration | source 檢查新文件的時間間隔,該數(shù)值必須大于0。每個文件都會使用他們自己的路徑作為唯一標識符,并且在被發(fā)現(xiàn)后處理一次。已經被處理過的文件集合會在整個 source 的生命周期內被保存到 state 中,因此他們和 source state 一起被持久化到 checkpoint 和 savepoint 中。 更小的時間間隔意味著文件會更快被發(fā)現(xiàn),但是會對文件系統(tǒng)或對象存儲進行更頻繁的文件列出或目錄遍歷。如果沒有配置該選項,則提供的路徑將只會被掃描一次,此時該 source 將會是有界的。 |
可用元數(shù)據(jù)
下面的連接器元數(shù)據(jù)可以通過被定義為表的元數(shù)據(jù)字段來訪問,所有的元數(shù)據(jù)都是只讀的。
Key | 數(shù)據(jù)類型 | 描述 |
---|---|---|
file.path | STRING NOT NULL | 輸入文件的路徑 |
file.name | STRING NOT NULL | 文件名稱,他是距離文件路徑根目錄最遠的元素。 |
file.size | BIGINT NOT NULL | 文件的字節(jié)數(shù)。 |
file.modification-time | TIMESTAMP_LTZ(3) NOT NULL | 文件的修改時間。 |
下面的代碼片段展示了 CREATE TABLE
案例如何訪問元數(shù)據(jù)屬性:
CREATE TABLE MyUserTableWithFilepath (
column_name1 INT,
column_name2 STRING,
`file.path` STRING NOT NULL METADATA
) WITH (
'connector' = 'filesystem',
'path' = 'file:///path/to/whatever',
'format' = 'json'
)
4、Streaming Sink
文件系統(tǒng)連接器基于Streaming File Sink 寫入記錄到文件以支持文件系統(tǒng)連接器流式寫入。行編碼格式支持csv和json。塊編碼格式支持parquet、orc和avro。
可以通過sql直接寫入,插入流數(shù)據(jù)到不分區(qū)的表中。如果是分區(qū)表,可以配置分區(qū)關聯(lián)操作。
滾動策略
數(shù)據(jù)通過分區(qū)目錄會被切分為多個文件。每個分區(qū)將包含其對應sink子任務接收到數(shù)據(jù)之后寫入的至少一個文件,正在處理的文件將會根據(jù)配置的滾動策略來關閉并成為分區(qū)中的一個文件。文件的滾動策略基于大小、文件可以被打開的最大超時時間間隔來配置。
Key | 要求 | 是否可被傳遞 | 默認值 | 類型 | 描述 |
---|---|---|---|---|---|
sink.rolling-policy.file-size | 可選 | 是 | 128MB | MemorySize | 滾動之前文件的最大大小。 |
sink.rolling-policy.rollover-interval | 可選 | 是 | 30 min | Duration | 被滾動之前,一個文件可以保持打開的最大時間間隔(默認為30分鐘,以避免產生很多小文件)。通過 sink.rolling-policy.check-interval 選項來控制檢查的頻率。 |
sink.rolling-policy.check-interval | 可選 | 是 | 1 min | Duration | 滾動策略的檢查時間間隔。該選項基于 sink.rolling-policy.rollover-interval 選項來控制檢查文件是否可以被滾動。 |
注:對于塊格式(parquet、orc、avro),滾動策略將會根據(jù)checkpoint間隔來控制大小和他們的數(shù)量,checkpoint決定文件的寫入完成。
注:對于行格式(csv、json),如果想查看文件是否在文件系統(tǒng)中存在,并且不想等待過長的時間,則可以在連接器配置 sink.rolling-policy.file-size 和 sink.rolling-policy.rollover-interval ,并且在flink-conf.yaml中設置 execution.checkpointing.interval 參數(shù)。
對于其他的格式(avro、orc),可以只在flink-conf.yaml中配置execution.checkpointing.interval參數(shù)。
文件壓縮
文件系統(tǒng)sink支持文件壓縮,該特性允許應用程序設置更小的checkpoint間隔,而不會產生很多的文件。
Key | 要求 | 是否可被傳遞 | 默認值 | 類型 | 描述 |
---|---|---|---|---|---|
auto-compaction | 可選 | 否 | false | Boolean | 是否在流slink中開啟自動壓縮。數(shù)據(jù)將會被寫入臨時文件。checkpoint完成之后,通過checkpoint生成的臨時文件將會被壓縮。臨時文件在被壓縮之前是不可見的。 |
compaction.file-size | 可選 | 是 | (none) | Boolean | 壓縮的目標文件大小,默認值為滾動文件大小。 |
如果開啟,文件壓縮將會基于目標文件大小合并多個小文件為大文件。在生產生運行文件壓縮時,需要注意以下問題:
- 只有單個checkpoint中的文件可以被合并,因此,至少有和checkpoint次數(shù)相同的文件被生成。
- 文件在被合并之前是不可見的,因此文件可見時間為:checkpoint間隔+壓縮時間。
- 如果壓縮運行時間過長,則將會造成任務的反壓,并且增加checkpoint的時間。
5、分區(qū)提交
通常來說,寫入分區(qū)之后通知下游應用程序是非常必要的。比如:增加分區(qū)信息到hive的元數(shù)據(jù),或者是在分區(qū)目錄中寫入一個 _SUCCESS 文件。文件系統(tǒng)sink連接器提供了分區(qū)提交特性,以允許配置自定義策略。提交行為基于合并的觸發(fā)器和策略。
Trigger觸發(fā)器:分區(qū)提交的時間可以通過水印或處理時間來確定。
Policy策略:如何提交一個分區(qū),支持通過success文件和元數(shù)據(jù)提交,也可以自定義實現(xiàn)策略。比如觸發(fā)hive的指標分區(qū),或者是和并小文件等等。
注:分區(qū)提交只在動態(tài)分區(qū)插入時起作用。
分區(qū)提交觸發(fā)器
定義何時提交分區(qū),提供分區(qū)提交觸發(fā)器:
Key | 要求 | 是否可被傳遞 | 默認值 | 類型 | 描述 |
---|---|---|---|---|---|
sink.partition-commit.trigger | 可選 | 是 | process-time | String | 分區(qū)提交觸發(fā)的類型: process-time:基于機器時間,既不需要分區(qū)時間提取,也不需要水印生成。一旦當前系統(tǒng)時間超過了分區(qū)創(chuàng)建時的系統(tǒng)時間加上指定的delay延遲就會提交分區(qū)。 partition-time:基于分區(qū)字段值提取的時間,要求生成水印。當水印超過了分區(qū)值提取的時間加上delay延遲時提交水印。 |
sink.partition-commit.delay | 可選 | 是 | 0 s | Duration | 分區(qū)在延遲時間到達之前不會提交。如果是按天分區(qū),則應該是1 d,如果是按小時分區(qū),則應該是1 h。 |
sink.partition-commit.watermark-time-zone | 可選 | 是 | UTC | String | 轉換long 類型的水印值為TIMESTAMP 類型是使用的時區(qū),轉換之后的水印時間戳將被用于和分區(qū)時間計算,以決定分區(qū)是否應該被提交。 該選項只有在 sink.partition-commit.trigger 選項設置為 partition-time 時起作用。如果該選項沒有被正確配置,比如source的rowtime被定義為TIMESTAMP_LTZ 字段,但是該選項沒有配置,則用戶將會延遲幾小時之后看到提交的分區(qū)。 默認值為UTC,這意味著水印需要被定義為TIMESTAMP 字段,或者是不被定義。如果水印被定義為TIMESTAMP_LTZ 字段,則水印時區(qū)為會話時區(qū)。該選項值可以是完全名稱,比如America/Los_Angeles,或者是自定義的時區(qū)id,比如GMT+08:00。 |
有兩種觸發(fā)器類型:
- 第一個是分區(qū)的處理時間,既不要求分區(qū)時間提取,也不要求水印生成。該觸發(fā)器根據(jù)分區(qū)的創(chuàng)建時間和當前系統(tǒng)時間觸發(fā)分區(qū)提交。該觸發(fā)器更常用,但不是很精確。比如,數(shù)據(jù)延遲或失敗,將會導致不成熟的分區(qū)提交。
- 第二個是根據(jù)水印和從分區(qū)中提取的時間來觸發(fā)分區(qū)提交。該觸發(fā)器要求任務有水印生成,并且分區(qū)根據(jù)時間來劃分,比如按小時或按天分區(qū)。
如果想要下游盡快看到新分區(qū),而不管數(shù)據(jù)寫入是否完成:
- ‘sink.partition-commit.trigger’=‘process-time’ (默認值)
- ‘sink.partition-commit.delay’=‘0s’ (默認值),分區(qū)一旦寫入數(shù)據(jù),將會立即提交。注:分區(qū)可能會被提交多次。
如果想要下游在數(shù)據(jù)寫入完成之后看到分區(qū),并且job任務有水印生成,則可以通過分區(qū)值來提取時間:
- ‘sink.partition-commit.trigger’=‘partition-time’
- ‘sink.partition-commit.delay’=‘1h’ (如果分區(qū)為小時分區(qū),則使用 1h,取決于分區(qū)時間類型)這是提交分區(qū)更準確的方式。它將嘗試在數(shù)據(jù)寫入完成之后再提交分區(qū)。
如果想要下游在數(shù)據(jù)寫入完成之后看到分區(qū),但是沒有水印,或者是無法從分區(qū)值提取時間:
- ‘ink.partition-commit.trigger’=‘process-time’ (默認值)
- ‘sink.partition-commit.delay’=‘1h’ (如果分區(qū)為小時分區(qū),則使用 1h,取決于分區(qū)時間類型)嘗試準確的提交分區(qū),但是遲到的數(shù)據(jù)或者是失敗將會導致不成熟的分區(qū)提交。
遲到數(shù)據(jù)處理:支持寫入分區(qū)的記錄將會被寫入已經提交的分區(qū),并且該分區(qū)提交將會被再次觸發(fā)。
默認提取器基于分區(qū)屬性和時間戳默認組成。也可以通過實現(xiàn) PartitionTimeExtractor
接口來完全自定義分區(qū)提取器。
public class HourPartTimeExtractor implements PartitionTimeExtractor {
@Override
public LocalDateTime extract(List<String> keys, List<String> values) {
String dt = values.get(0);
String hour = values.get(1);
return Timestamp.valueOf(dt + " " + hour + ":00:00").toLocalDateTime();
}
}
分區(qū)提交策略
分區(qū)提交策略定義分區(qū)提交時執(zhí)行哪些操作
- 第一個是元數(shù)據(jù),只有hive表支持元數(shù)據(jù)策略,文件系統(tǒng)通過目錄結構管理分區(qū)。
- 第二個是success文件,在分區(qū)對一個的目錄下寫一個空文件。
Key | 要求 | 是否可被傳遞 | 默認值 | 類型 | 描述 |
---|---|---|---|---|---|
sink.partition-commit.policy.kind | 可選 | 是 | (none) | String | 指定提交分區(qū)并通知下游應用程序,該分區(qū)已經完成寫入并可進行讀取的策略。 metastore:將分區(qū)寫入元數(shù)據(jù)。只有hive表支持元數(shù)據(jù)策略,文件系統(tǒng)通過目錄結構來管理分區(qū)。 success-file:在目錄中增加 _success 文件。這兩個方式可以同時配置: metastore,success-file custom:使用策略類創(chuàng)建一個提交策略。 支持配置多個策略:metastore,success-file。 |
sink.partition-commit.policy.class | 可選 | 是 | (none) | String | 實現(xiàn)了PartitionCommitPolicy 接口的分區(qū)提交策略實現(xiàn)類。只在自定義custom提交策略中起作用。 |
sink.partition-commit.success-file.name | 可選 | 是 | _SUCCESS | String |
success-file 分區(qū)提交的文件名稱,默認為: _SUCCESS 。 |
6、sink并行度
寫入文件到外部文件系統(tǒng)的并行度(包括hive),可以通過表的option
選項來配置,流模式和批模式都支持這么做。
默認情況下,slink的并行度和上游鏈在一起的算子并行度一致。如果配置了和上游算子不同的并行度,則寫入文件算子的并行度將使用配置的并行度。
Key | 要求 | 是否可被傳遞 | 默認值 | 類型 | 描述 |
---|---|---|---|---|---|
sink.parallelism | 可選 | 否 | (none) | Integer | 將文件寫入外部文件系統(tǒng)的并行度。數(shù)值應該大于0,否則將拋出異常。 |
注:目前,配置sink并行度只支持上游算子為僅插入INERT-ONLY類型的變更日志模式,否則將拋出異常。
7、完整案例
下面的例子展示文件系統(tǒng)連接器如何通過流查詢從kafka讀取數(shù)據(jù),然后寫入文件系統(tǒng),并且通過批查詢從文件系統(tǒng)中讀取寫入的數(shù)據(jù)。
CREATE TABLE kafka_table (
user_id STRING,
order_amount DOUBLE,
log_ts TIMESTAMP(3),
WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND
) WITH (...);
CREATE TABLE fs_table (
user_id STRING,
order_amount DOUBLE,
dt STRING,
`hour` STRING
) PARTITIONED BY (dt, `hour`) WITH (
'connector'='filesystem',
'path'='...',
'format'='parquet',
'sink.partition-commit.delay'='1 h',
'sink.partition-commit.policy.kind'='success-file'
);
-- streaming sql, insert into file system table
INSERT INTO fs_table
SELECT
user_id,
order_amount,
DATE_FORMAT(log_ts, 'yyyy-MM-dd'),
DATE_FORMAT(log_ts, 'HH')
FROM kafka_table;
-- 批式sql,查詢指定分區(qū)下的數(shù)據(jù)
SELECT * FROM fs_table WHERE dt='2020-05-20' and `hour`='12';
如果水印定義在TIMESTAMP_LTZ
類型的字段上,并且被用于分區(qū)提交時間,則sink.partition-commit.watermark-time-zone
配置必須設置為會話時間分區(qū),否則分區(qū)提交將會晚幾個小時。
CREATE TABLE kafka_table (
user_id STRING,
order_amount DOUBLE,
ts BIGINT, -- 毫秒值
ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '5' SECOND -- 在TIMESTAMP_LTZ字段上定義水印
) WITH (...);
CREATE TABLE fs_table (
user_id STRING,
order_amount DOUBLE,
dt STRING,
`hour` STRING
) PARTITIONED BY (dt, `hour`) WITH (
'connector'='filesystem',
'path'='...',
'format'='parquet',
'partition.time-extractor.timestamp-pattern'='$dt $hour:00:00',
'sink.partition-commit.delay'='1 h',
'sink.partition-commit.trigger'='partition-time',
'sink.partition-commit.watermark-time-zone'='Asia/Shanghai', -- 表名用戶配置的時區(qū)為:'Asia/Shanghai'
'sink.partition-commit.policy.kind'='success-file'
);
-- 流式sql,插入數(shù)據(jù)到文件系統(tǒng)
INSERT INTO fs_table
SELECT
user_id,
order_amount,
DATE_FORMAT(ts_ltz, 'yyyy-MM-dd'),
DATE_FORMAT(ts_ltz, 'HH')
FROM kafka_table;
-- 批式sql,查詢指定分區(qū)下的數(shù)據(jù)
SELECT * FROM fs_table WHERE dt='2020-05-20' and `hour`='12';
4.4.5 第三方連接器
例如:flink-doris-connector文章來源:http://www.zghlxwxcb.cn/news/detail-626695.html
create table cdc_mysql_source(
id int,
name varchar,
primary key(id) not enforced
)with(
'connector' = 'mysql-cdc',
'hostname' = 'centos01',
'port' = '3306',
'username' = 'root',
'password' = 'root',
'database-name' = 'test',
'table-name' = 't_test'
)
-- 支持刪除事件同步(sink.enable-delete='true'),需要 Doris 表開啟批量刪除功能
CREATE TABLE doris_sink (
id INT,
name STRING
) WITH (
'connector' = 'doris',
'fenodes' ='centos01:8030',
'table.identifier' = 'test.t_test',
'username' = 'root',
'password' = 'root',
'sink.properties.format' = 'json',
'sink.properties.strip_outer_array' = 'true',
'sink.enable-delete' = 'true'
)
insert into doris_sink select id,name from cdc_mysql_source;
flink-hudi-connector文章來源地址http://www.zghlxwxcb.cn/news/detail-626695.html
-- 1、創(chuàng)建測試表
CREATE TABLE sourceT (
uuid varchar(20),
name varchar(10),
age int,
ts timestamp(3),
`partition` varchar(20)
) WITH (
'connector' = 'datagen',
'rows-per-second' = '1'
);
create table t2(
uuid varchar(20),
name varchar(10),
age int,
ts timestamp(3),
`partition` varchar(20)
)
with (
'connector' = 'hudi',
'path' = '/tmp/hudi_flink/t2',
'table.type' = 'MERGE_ON_READ'
);
-- 2、執(zhí)行插入
insert into t2 select * from sourceT;
到了這里,關于flink1.14 sql基礎語法(二) flink sql表定義詳解的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!