目錄
1、文件系統(tǒng) SQL 連接器
2、如何指定文件系統(tǒng)類型
3、如何指定文件格式
4、讀取文件系統(tǒng)
4.1 開啟?目錄監(jiān)控?
4.2?可用的 Metadata
5、寫出文件系統(tǒng)
5.1 創(chuàng)建分區(qū)表
5.2 滾動(dòng)策略、文件合并、分區(qū)提交
5.3 指定 Sink Parallelism
6、示例_通過FlinkSQL讀取kafka在寫入hive表
6.1、創(chuàng)建 kafka source表用于讀取kafka
6.2、創(chuàng)建 hdfs sink表用于寫出到hdfs
6.3、insert into 寫入到?hdfs_sink_table
6.4、查詢?hdfs_sink_table
6.5、創(chuàng)建hive表,指定local
1、文件系統(tǒng) SQL 連接器
文件系統(tǒng)連接器允許從本地或分布式文件系統(tǒng)進(jìn)行讀寫數(shù)據(jù)
官網(wǎng)鏈接:文件系統(tǒng) SQL 連接器
2、如何指定文件系統(tǒng)類型
創(chuàng)建表時(shí)通過?'path' = '協(xié)議名稱:///path' 來指定 文件系統(tǒng)類型
參考官網(wǎng):文件系統(tǒng)類型
CREATE TABLE filesystem_table (
id INT,
name STRING,
ds STRING
) partitioned by (ds) WITH (
'connector' = 'filesystem',
-- 本地文件系統(tǒng)
'path' = 'file:///URI',
-- HDFS文件系統(tǒng)
'path' = 'hdfs://URI',
-- 阿里云對(duì)象存儲(chǔ)
'path' = 'oss://URI',
'format' = 'json'
);
3、如何指定文件格式
FlinkSQL 文件系統(tǒng)連接器支持多種format,來讀取和寫入文件
比如當(dāng)讀取的source格式為 csv、json、Parquet... 可以在建表是指定相應(yīng)的格式類型
來對(duì)數(shù)據(jù)進(jìn)行解析后映射到表中的字段中
CREATE TABLE filesystem_table_file_format (
id INT,
name STRING,
ds STRING
) partitioned by (ds) WITH (
'connector' = 'filesystem',
-- 指定文件格式類型
'format' = 'json|csv|orc|raw'
);
4、讀取文件系統(tǒng)
FlinkSQL可以將單個(gè)文件或整個(gè)目錄的數(shù)據(jù)讀取到單個(gè)表中
注意:
? ? ? ? 1、當(dāng)讀取目錄時(shí),對(duì)目錄中的文件進(jìn)行?無序的讀取
? ? ? ? 2、默認(rèn)情況下,讀取文件時(shí)為批處理模式,只會(huì)掃描配置路徑一遍后就會(huì)停止
? ? ? ? ? ? ?當(dāng)開啟目錄監(jiān)控(source.monitor-interval)時(shí),才是流處理模式
4.1 開啟?目錄監(jiān)控?
通過設(shè)置?source.monitor-interval
?屬性來開啟目錄監(jiān)控,以便在新文件出現(xiàn)時(shí)繼續(xù)掃描
注意:
? ? ? ? 只會(huì)對(duì)指定目錄內(nèi)新增文件進(jìn)行讀取,不會(huì)讀取更新后的舊文件
-- 目錄監(jiān)控
drop table filesystem_source_table;
CREATE TABLE filesystem_source_table (
id INT,
name STRING,
`file.name` STRING NOT NULL METADATA
) WITH (
'connector' = 'filesystem',
'path' = 'file:///usr/local/lib/mavne01/FlinkAPI1.17/data/output/1016',
'format' = 'json',
'source.monitor-interval' = '3' -- 開啟目錄監(jiān)控,設(shè)置監(jiān)控時(shí)間間隔
);
-- 持續(xù)讀取
select * from filesystem_source_table;
4.2?可用的 Metadata
使用FLinkSQL讀取文件系統(tǒng)中的數(shù)據(jù)時(shí),支持對(duì)?metadata 進(jìn)行讀取
注意:?所有 metadata 都是只讀的
-- 可用的Metadata
drop table filesystem_source_table_read_metadata;
CREATE TABLE filesystem_source_table_read_metadata (
id INT,
name STRING,
`file.path` STRING NOT NULL METADATA,
`file.name` STRING NOT NULL METADATA,
`file.size` BIGINT NOT NULL METADATA,
`file.modification-time` TIMESTAMP_LTZ(3) NOT NULL METADATA
) WITH (
'connector' = 'filesystem',
'path' = 'file:///usr/local/lib/mavne01/FlinkAPI1.17/data/output/1012',
'format' = 'json'
);
select * from filesystem_source_table_read_metadata;
運(yùn)行結(jié)果:
5、寫出文件系統(tǒng)
5.1 創(chuàng)建分區(qū)表
FlinkSQL支持創(chuàng)建分區(qū)表,并且通過 insert into(追加) 和 insert overwrite(覆蓋) 寫入數(shù)據(jù)
-- 創(chuàng)建分區(qū)表
drop table filesystem_source_table_partition;
CREATE TABLE filesystem_source_table_partition (
id INT,
name STRING,
ds STRING
) partitioned by (ds) WITH (
'connector' = 'filesystem',
'path' = 'file:///usr/local/lib/mavne01/FlinkAPI1.17/data/output/1012',
'partition.default-name' = 'default_partition',
'format' = 'json'
);
-- 動(dòng)態(tài)分區(qū)寫入
insert into filesystem_source_table_partition
SELECT * FROM (VALUES
(1,'a','20231010')
, (2,'b','20231010')
, (3,'c','20231011')
, (4,'d','20231011')
, (5,'e','20231012')
, (6,'f','20231012')
) AS user1 (id,name,ds);
-- 靜態(tài)分區(qū)寫入
insert into filesystem_source_table_partition partition(ds = '20231010')
SELECT * FROM (VALUES
(1,'a')
, (2,'b')
, (3,'c')
, (4,'d')
, (5,'e')
, (6,'f')
) AS user1 (id,name);
-- 查詢分區(qū)表數(shù)據(jù)
select * from filesystem_source_table_partition where ds = '20231010';
5.2 滾動(dòng)策略、文件合并、分區(qū)提交
可以看之前的博客:flink寫入文件時(shí)分桶策略
官網(wǎng)鏈接:官網(wǎng)分桶策略
5.3 指定 Sink Parallelism
當(dāng)使用FlinkSQL寫出到文件系統(tǒng)時(shí),可以通過?sink.parallelism 設(shè)置sink算子的并行度
注意:當(dāng)且僅當(dāng)上游的 changelog 模式為?INSERT-ONLY?時(shí),才支持配置 sink parallelism。否則,程序?qū)?huì)拋出異常
CREATE TABLE hdfs_sink_table (
`log` STRING,
`dt` STRING, -- 分區(qū)字段,天
`hour` STRING -- 分區(qū)字段,小時(shí)
) partitioned by (dt,`hour`) WITH (
'connector' = 'filesystem',
'path' = 'file:///usr/local/lib/mavne01/FlinkAPI1.17/data/output/kafka',
'sink.parallelism' = '2', -- 指定sink算子并行度
'format' = 'raw'
);
6、示例_通過FlinkSQL讀取kafka在寫入hive表
需求:
? ? ? ? 使用FlinkSQL將kafka數(shù)據(jù)寫入到hdfs指定目錄中文章來源:http://www.zghlxwxcb.cn/news/detail-722282.html
? ? ? ? 根據(jù)kafka的timestamp進(jìn)行分區(qū)(按小時(shí)分區(qū))文章來源地址http://www.zghlxwxcb.cn/news/detail-722282.html
6.1、創(chuàng)建 kafka source表用于讀取kafka
-- TODO 創(chuàng)建讀取kafka表時(shí),同時(shí)讀取kafka元數(shù)據(jù)字段
drop table kafka_source_table;
CREATE TABLE kafka_source_table(
`log` STRING,
`timestamp` TIMESTAMP(3) METADATA FROM 'timestamp' -- 消息的時(shí)間戳
) WITH (
'connector' = 'kafka',
'topic' = '20231017',
'properties.bootstrap.servers' = 'worker01:9092',
'properties.group.id' = 'FlinkConsumer',
'scan.startup.mode' = 'earliest-offset',
'format' = 'raw'
);
6.2、創(chuàng)建 hdfs sink表用于寫出到hdfs
drop table hdfs_sink_table;
CREATE TABLE hdfs_sink_table (
`log` STRING,
`dt` STRING, -- 分區(qū)字段,天
`hour` STRING -- 分區(qū)字段,小時(shí)
) partitioned by (dt,`hour`) WITH (
'connector' = 'filesystem',
'path' = 'hdfs://usr/local/lib/mavne01/FlinkAPI1.17/data/output/kafka',
'sink.parallelism' = '2', -- 指定sink算子并行度
'format' = 'raw'
);
6.3、insert into 寫入到?hdfs_sink_table
-- 流式 sql,插入文件系統(tǒng)表
insert into hdfs_sink_table
select
log
,DATE_FORMAT(`timestamp`,'yyyyMMdd') as dt
,DATE_FORMAT(`timestamp`,'HH') as `hour`
from kafka_source_table;
6.4、查詢?hdfs_sink_table
-- 批式 sql,使用分區(qū)修剪進(jìn)行選擇
select * from hdfs_sink_table;
6.5、創(chuàng)建hive表,指定local
create table `kafka_to_hive` (
`log` string comment '日志數(shù)據(jù)')
comment '埋點(diǎn)日志數(shù)據(jù)' PARTITIONED BY (dt string,`hour` string)
row format delimited fields terminated by '\t' lines terminated by '\n' stored as orc
LOCATION 'hdfs://usr/local/lib/mavne01/FlinkAPI1.17/data/output/kafka';
到了這里,關(guān)于2.2 如何使用FlinkSQL讀取&寫入到文件系統(tǒng)(HDFS\Local\Hive)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!