国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

2.2 如何使用FlinkSQL讀取&寫入到文件系統(tǒng)(HDFS\Local\Hive)

這篇具有很好參考價(jià)值的文章主要介紹了2.2 如何使用FlinkSQL讀取&寫入到文件系統(tǒng)(HDFS\Local\Hive)。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

目錄

1、文件系統(tǒng) SQL 連接器

2、如何指定文件系統(tǒng)類型

3、如何指定文件格式

4、讀取文件系統(tǒng)

4.1 開啟?目錄監(jiān)控?

4.2?可用的 Metadata

5、寫出文件系統(tǒng)

5.1 創(chuàng)建分區(qū)表

5.2 滾動(dòng)策略、文件合并、分區(qū)提交

5.3 指定 Sink Parallelism

6、示例_通過FlinkSQL讀取kafka在寫入hive表

6.1、創(chuàng)建 kafka source表用于讀取kafka

6.2、創(chuàng)建 hdfs sink表用于寫出到hdfs

6.3、insert into 寫入到?hdfs_sink_table

6.4、查詢?hdfs_sink_table

6.5、創(chuàng)建hive表,指定local


1、文件系統(tǒng) SQL 連接器

文件系統(tǒng)連接器允許從本地分布式文件系統(tǒng)進(jìn)行讀寫數(shù)據(jù)

官網(wǎng)鏈接:文件系統(tǒng) SQL 連接器

flink sql 讀取hdfs,# FlinkSQL 使用技巧,hdfs,大數(shù)據(jù),服務(wù)器


2、如何指定文件系統(tǒng)類型

創(chuàng)建表時(shí)通過?'path' = '協(xié)議名稱:///path' 來指定 文件系統(tǒng)類型

參考官網(wǎng):文件系統(tǒng)類型

CREATE TABLE filesystem_table (
  id INT,
  name STRING,
  ds STRING
) partitioned by (ds) WITH (
  'connector' = 'filesystem',
  -- 本地文件系統(tǒng)
  'path' = 'file:///URI',
  -- HDFS文件系統(tǒng)
  'path' = 'hdfs://URI',
  -- 阿里云對(duì)象存儲(chǔ) 
  'path' = 'oss://URI',
  'format' = 'json'
);

3、如何指定文件格式

FlinkSQL 文件系統(tǒng)連接器支持多種format,來讀取和寫入文件

比如當(dāng)讀取的source格式為 csv、json、Parquet... 可以在建表是指定相應(yīng)的格式類型

來對(duì)數(shù)據(jù)進(jìn)行解析后映射到表中的字段中

flink sql 讀取hdfs,# FlinkSQL 使用技巧,hdfs,大數(shù)據(jù),服務(wù)器

CREATE TABLE filesystem_table_file_format (
  id INT,
  name STRING,
  ds STRING
) partitioned by (ds) WITH (
  'connector' = 'filesystem',
  -- 指定文件格式類型
  'format' = 'json|csv|orc|raw'
);

4、讀取文件系統(tǒng)

FlinkSQL可以將單個(gè)文件或整個(gè)目錄的數(shù)據(jù)讀取到單個(gè)表中

注意:

? ? ? ? 1、當(dāng)讀取目錄時(shí),對(duì)目錄中的文件進(jìn)行?無序的讀取

? ? ? ? 2、默認(rèn)情況下,讀取文件時(shí)為批處理模式,只會(huì)掃描配置路徑一遍后就會(huì)停止

? ? ? ? ? ? ?當(dāng)開啟目錄監(jiān)控(source.monitor-interval)時(shí),才是流處理模式

4.1 開啟?目錄監(jiān)控?

通過設(shè)置?source.monitor-interval?屬性來開啟目錄監(jiān)控,以便在新文件出現(xiàn)時(shí)繼續(xù)掃描

注意:

? ? ? ? 只會(huì)對(duì)指定目錄內(nèi)新增文件進(jìn)行讀取,不會(huì)讀取更新后的舊文件

-- 目錄監(jiān)控
drop table filesystem_source_table;
CREATE TABLE filesystem_source_table (
  id INT,
  name STRING,
  `file.name` STRING NOT NULL METADATA
) WITH (
  'connector' = 'filesystem',
  'path' = 'file:///usr/local/lib/mavne01/FlinkAPI1.17/data/output/1016',
  'format' = 'json',
  'source.monitor-interval' = '3' -- 開啟目錄監(jiān)控,設(shè)置監(jiān)控時(shí)間間隔
);

-- 持續(xù)讀取
select * from filesystem_source_table;

4.2?可用的 Metadata

使用FLinkSQL讀取文件系統(tǒng)中的數(shù)據(jù)時(shí),支持對(duì)?metadata 進(jìn)行讀取

注意:?所有 metadata 都是只讀的

flink sql 讀取hdfs,# FlinkSQL 使用技巧,hdfs,大數(shù)據(jù),服務(wù)器

-- 可用的Metadata
drop table filesystem_source_table_read_metadata;
CREATE TABLE filesystem_source_table_read_metadata (
  id INT,
  name STRING,
  `file.path` STRING NOT NULL METADATA,
  `file.name` STRING NOT NULL METADATA,
  `file.size` BIGINT NOT NULL METADATA,
  `file.modification-time` TIMESTAMP_LTZ(3) NOT NULL METADATA
) WITH (
  'connector' = 'filesystem',
  'path' = 'file:///usr/local/lib/mavne01/FlinkAPI1.17/data/output/1012',
  'format' = 'json'
);

select * from filesystem_source_table_read_metadata;

運(yùn)行結(jié)果:

flink sql 讀取hdfs,# FlinkSQL 使用技巧,hdfs,大數(shù)據(jù),服務(wù)器


5、寫出文件系統(tǒng)

5.1 創(chuàng)建分區(qū)表

FlinkSQL支持創(chuàng)建分區(qū)表,并且通過 insert into(追加) insert overwrite(覆蓋) 寫入數(shù)據(jù)

-- 創(chuàng)建分區(qū)表
drop table filesystem_source_table_partition;
CREATE TABLE filesystem_source_table_partition (
  id INT,
  name STRING,
  ds STRING
) partitioned by (ds) WITH (
  'connector' = 'filesystem',
  'path' = 'file:///usr/local/lib/mavne01/FlinkAPI1.17/data/output/1012',
  'partition.default-name' = 'default_partition',
  'format' = 'json'
);

-- 動(dòng)態(tài)分區(qū)寫入
insert into filesystem_source_table_partition
SELECT * FROM (VALUES
  (1,'a','20231010')
, (2,'b','20231010')
, (3,'c','20231011')
, (4,'d','20231011')
, (5,'e','20231012')
, (6,'f','20231012')
) AS user1 (id,name,ds);

-- 靜態(tài)分區(qū)寫入
insert into filesystem_source_table_partition partition(ds = '20231010')
SELECT * FROM (VALUES
  (1,'a')
, (2,'b')
, (3,'c')
, (4,'d')
, (5,'e')
, (6,'f')
) AS user1 (id,name);

-- 查詢分區(qū)表數(shù)據(jù)
select * from filesystem_source_table_partition where ds = '20231010';

5.2 滾動(dòng)策略、文件合并、分區(qū)提交

可以看之前的博客:flink寫入文件時(shí)分桶策略

官網(wǎng)鏈接:官網(wǎng)分桶策略


5.3 指定 Sink Parallelism

當(dāng)使用FlinkSQL寫出到文件系統(tǒng)時(shí),可以通過?sink.parallelism 設(shè)置sink算子的并行度

注意:當(dāng)且僅當(dāng)上游的 changelog 模式為?INSERT-ONLY?時(shí),才支持配置 sink parallelism。否則,程序?qū)?huì)拋出異常

CREATE TABLE hdfs_sink_table (
  `log` STRING,
  `dt` STRING,  -- 分區(qū)字段,天
  `hour` STRING  -- 分區(qū)字段,小時(shí)
) partitioned by (dt,`hour`) WITH (
  'connector' = 'filesystem',
  'path' = 'file:///usr/local/lib/mavne01/FlinkAPI1.17/data/output/kafka',
  'sink.parallelism' = '2', -- 指定sink算子并行度
  'format' = 'raw'
);

6、示例_通過FlinkSQL讀取kafka在寫入hive表

需求:

? ? ? ? 使用FlinkSQL將kafka數(shù)據(jù)寫入到hdfs指定目錄中

? ? ? ? 根據(jù)kafka的timestamp進(jìn)行分區(qū)(按小時(shí)分區(qū))文章來源地址http://www.zghlxwxcb.cn/news/detail-722282.html

6.1、創(chuàng)建 kafka source表用于讀取kafka

-- TODO 創(chuàng)建讀取kafka表時(shí),同時(shí)讀取kafka元數(shù)據(jù)字段
drop table kafka_source_table;
CREATE TABLE kafka_source_table(
  `log` STRING,
  `timestamp` TIMESTAMP(3) METADATA FROM 'timestamp' -- 消息的時(shí)間戳
) WITH (
  'connector' = 'kafka',
  'topic' = '20231017',
  'properties.bootstrap.servers' = 'worker01:9092',
  'properties.group.id' = 'FlinkConsumer',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'raw'
);

6.2、創(chuàng)建 hdfs sink表用于寫出到hdfs

drop table hdfs_sink_table;
CREATE TABLE hdfs_sink_table (
  `log` STRING,
  `dt` STRING,  -- 分區(qū)字段,天
  `hour` STRING  -- 分區(qū)字段,小時(shí)
) partitioned by (dt,`hour`) WITH (
  'connector' = 'filesystem',
  'path' = 'hdfs://usr/local/lib/mavne01/FlinkAPI1.17/data/output/kafka',
  'sink.parallelism' = '2', -- 指定sink算子并行度
  'format' = 'raw'
);

6.3、insert into 寫入到?hdfs_sink_table

-- 流式 sql,插入文件系統(tǒng)表
insert into hdfs_sink_table
select 
	log
	,DATE_FORMAT(`timestamp`,'yyyyMMdd') as dt
	,DATE_FORMAT(`timestamp`,'HH') as `hour`
from kafka_source_table;

6.4、查詢?hdfs_sink_table

-- 批式 sql,使用分區(qū)修剪進(jìn)行選擇
select * from hdfs_sink_table;

6.5、創(chuàng)建hive表,指定local

create table `kafka_to_hive` (
`log` string comment '日志數(shù)據(jù)')
 comment '埋點(diǎn)日志數(shù)據(jù)' PARTITIONED BY (dt string,`hour` string) 
row format delimited fields terminated by '\t' lines terminated by '\n' stored as orc
LOCATION 'hdfs://usr/local/lib/mavne01/FlinkAPI1.17/data/output/kafka';

到了這里,關(guān)于2.2 如何使用FlinkSQL讀取&寫入到文件系統(tǒng)(HDFS\Local\Hive)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • HDFS常用操作以及使用Spark讀取文件系統(tǒng)數(shù)據(jù)

    HDFS常用操作以及使用Spark讀取文件系統(tǒng)數(shù)據(jù)

    掌握在Linux虛擬機(jī)中安裝Hadoop和Spark的方法; 熟悉HDFS的基本使用方法; 掌握使用Spark訪問本地文件和HDFS文件的方法。 啟動(dòng)Hadoop,在HDFS中創(chuàng)建用戶目錄“/user/hadoop” 在Linux系統(tǒng)的本地文件系統(tǒng)的“/home/hadoop”目錄下新建一個(gè)文本文件test.txt,并在該文件中隨便輸入一些內(nèi)容,

    2024年04月22日
    瀏覽(24)
  • Hadoop Distributed System (HDFS) 寫入和讀取流程

    Hadoop Distributed System (HDFS) 寫入和讀取流程

    一、HDFS HDFS全稱是Hadoop Distributed System。HDFS是為以流的方式存取大文件而設(shè)計(jì)的。適用于幾百M(fèi)B,GB以及TB,并寫一次讀多次的場(chǎng)合。而對(duì)于低延時(shí)數(shù)據(jù)訪問、大量小文件、同時(shí)寫和任意的文件修改,則并不是十分適合。 目前HDFS支持的使用接口除了Java的還有,Thrift、C、FUSE、

    2024年02月08日
    瀏覽(20)
  • SAP_ABAP_編程基礎(chǔ)_文件處理(CRUD)_R3系統(tǒng)_打開文件 / 關(guān)閉文件 / 刪除文件 / 向文件中寫入數(shù)據(jù) / 從文件中讀取數(shù)據(jù) / 使用服務(wù)器上的文件

    SAP ABAP 顧問(開發(fā)工程師)能力模型_Terry談企業(yè)數(shù)字化的博客-CSDN博客 文章瀏覽閱讀490次。目標(biāo):基于對(duì)SAP abap 顧問能力模型的梳理,給一年左右經(jīng)驗(yàn)的abaper 快速成長(zhǎng)為三年經(jīng)驗(yàn)提供超級(jí)燃料! https://blog.csdn.net/java_zhong1990/article/details/132469977 平時(shí)在 ?‘ 工地搬磚 ’,很少關(guān)

    2024年02月22日
    瀏覽(22)
  • 大數(shù)據(jù)編程實(shí)驗(yàn)一:HDFS常用操作和Spark讀取文件系統(tǒng)數(shù)據(jù)

    大數(shù)據(jù)編程實(shí)驗(yàn)一:HDFS常用操作和Spark讀取文件系統(tǒng)數(shù)據(jù)

    這是我們大數(shù)據(jù)專業(yè)開設(shè)的第二門課程——大數(shù)據(jù)編程,使用的參考書是《Spark編程基礎(chǔ)》,這門課跟大數(shù)據(jù)技術(shù)基礎(chǔ)是分開學(xué)習(xí)的,但這門課是用的我們自己在電腦上搭建的虛擬環(huán)境進(jìn)行實(shí)驗(yàn)的,不是在那個(gè)平臺(tái)上,而且搭建的還是偽分布式,這門課主要偏向于有關(guān)大數(shù)據(jù)

    2024年04月10日
    瀏覽(26)
  • HDFS文件創(chuàng)建與寫入

    實(shí)驗(yàn)環(huán)境 Linux Ubuntu 16.04 前提條件: 1)Java 運(yùn)行環(huán)境部署完成 2)Hadoop 的單點(diǎn)部署完成 ? 實(shí)驗(yàn)內(nèi)容 在上述前提條件下,學(xué)習(xí)HDFS文件創(chuàng)建、寫入、追加與合并等操作 實(shí)驗(yàn)步驟 啟動(dòng)HDFS,在命令行窗口輸入下面的命令: 運(yùn)行后顯示如下,根據(jù)日志顯示,分別啟動(dòng)了NameNode、Dat

    2024年02月02日
    瀏覽(46)
  • Spark解析JSON文件,寫入hdfs

    一、用Sparkcontext讀入文件,map逐行用Gson解析,輸出轉(zhuǎn)成一個(gè)caseclass類,填充各字段,輸出。 解析JSON這里沒有什么問題。 RDD覆蓋寫的時(shí)候碰到了一些問題 : 1.直接saveAsTextFile沒有覆蓋true參數(shù); 2.轉(zhuǎn)dataframe時(shí),還得一個(gè)一個(gè)字段顯化才能轉(zhuǎn)成dataframe; 3.write時(shí),一開始打算寫

    2024年01月23日
    瀏覽(21)
  • 一百七十三、Flume——Flume寫入HDFS后的諸多小文件問題

    一百七十三、Flume——Flume寫入HDFS后的諸多小文件問題

    在用Flume采集Kafka中的數(shù)據(jù)寫入HDFS后,發(fā)現(xiàn)寫入HDFS的不是每天一個(gè)文件,而是一個(gè)文件夾,里面有很多小文件,浪費(fèi)namenode的寶貴資源 在Flume任務(wù)的配置文件設(shè)置 a1.sinks.k1.hdfs.rollSize = 0 ? a1.sinks.k1.hdfs.rollCount = 0 ? 而不是 a1.sinks.k1.hdfs.round=true a1.sinks.k1.hdfs.roundValue=10 a1.sinks.k1

    2024年02月09日
    瀏覽(21)
  • java 文件讀取和寫入

    java 文件讀取和寫入

    1.文件名 1.InputStream(字節(jié)流)? 和Reader(字符流) 2.OutputStream(字節(jié)流) 和 Writer(字符流) Java提供了File類 來表示一個(gè)文件(通過構(gòu)造方法來指定路徑) 絕對(duì)路徑 目錄與目錄之間用 表示,,也可以用 / ,形如D:xxxxxx的就是絕對(duì)路徑 相對(duì)路徑 ..(當(dāng)前路徑的上一級(jí)路徑) 和 . (當(dāng)前路徑) 表示的

    2024年02月08日
    瀏覽(21)
  • 【PHP】文件寫入和讀取詳解

    【PHP】文件寫入和讀取詳解

    一.實(shí)現(xiàn)文件讀取和寫入的基本思路: 1.通過fopen方法打開文件:$fp =fopen(/*參數(shù),參數(shù)*/),fp為Resource類型 2.進(jìn)行文件讀取或者文件寫入操作(這里使用的函數(shù)以1中返回的$fp作為參數(shù)) 3. ? 調(diào)用fclose($fp)關(guān)閉關(guān)閉文件 二:使用fopen方法打開文件 fopen(文件路徑[string],打開模式

    2024年02月10日
    瀏覽(17)
  • Java 讀取,寫入csv文件

    本人因?yàn)闃I(yè)務(wù)需要,需要對(duì)csv類的數(shù)據(jù)文件進(jìn)行處理,下面就直接上一下代碼,希望能幫到各位; 讀取csv文件 過程很簡(jiǎn)單: 1.就是根據(jù)提供的文件路徑判斷文件是否存在; 2.如果存在開始用流讀取文件內(nèi)容; 3.讀取到文件內(nèi)容之后就開始處理相應(yīng)的數(shù)據(jù); 寫入文件 整體流程

    2024年02月11日
    瀏覽(23)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包