1.什么是 FlinkSQL ?
Flink SQL 是基于 Apache Calcite 的 SQL 解析器和優(yōu)化器構(gòu)建的,支持 ANSI SQL 標(biāo)準(zhǔn),允許使用標(biāo)準(zhǔn)的 SQL 語(yǔ)句來處理流式和批處理數(shù)據(jù)。通過 Flink SQL,可以以聲明式的方式描述數(shù)據(jù)處理邏輯,而無需編寫顯式的代碼。使用 Flink SQL,可以執(zhí)行各種數(shù)據(jù)操作,如 過濾、聚合、連接 和 轉(zhuǎn)換 等。它還提供了 窗口操作、時(shí)間處理 和 復(fù)雜事件處理 等功能,以滿足流式數(shù)據(jù)處理的需求。
Flink SQL 提供了許多擴(kuò)展功能和語(yǔ)法,以適應(yīng) Flink 的流式和批處理引擎的特性。它是 Flink 最高級(jí)別的抽象,可以與 DataStream API 和 DataSet API 無縫集成,利用 Flink 的分布式計(jì)算能力和容錯(cuò)機(jī)制。
使用 Flink SQL 處理數(shù)據(jù)的基本步驟:
- 定義輸入表:使用 CREATE TABLE 語(yǔ)句定義輸入表,指定表的模式(字段和類型)和數(shù)據(jù)源(如 Kafka、文件等)。
- 執(zhí)行 SQL 查詢:使用 SELECT、INSERT INTO 等 SQL 語(yǔ)句來執(zhí)行數(shù)據(jù)查詢和操作。您可以在 SQL 查詢中使用各種內(nèi)置函數(shù)、聚合操作、窗口操作和時(shí)間屬性等。
- 定義輸出表:使用 CREATE TABLE 語(yǔ)句定義輸出表,指定表的模式和目標(biāo)數(shù)據(jù)存儲(chǔ)(如 Kafka、文件等)。
- 提交作業(yè):將 Flink SQL 查詢作為 Flink 作業(yè)提交到 Flink 集群中執(zhí)行。Flink 會(huì)根據(jù)查詢的邏輯和配置自動(dòng)構(gòu)建執(zhí)行計(jì)劃,并將數(shù)據(jù)處理任務(wù)分發(fā)到集群中的任務(wù)管理器進(jìn)行執(zhí)行。
總而言之,我們可以通過 Flink SQL 查詢和操作來處理流式和批處理數(shù)據(jù)。它提供了一種簡(jiǎn)化和加速數(shù)據(jù)處理開發(fā)的方式,尤其適用于熟悉 SQL 的開發(fā)人員和數(shù)據(jù)工程師。
2.什么是 Connector ?
Flink Connector 是指 用于連接外部系統(tǒng)和數(shù)據(jù)源的組件。它允許 Flink 通過特定的連接器與不同的數(shù)據(jù)源進(jìn)行交互,例如數(shù)據(jù)庫(kù)、消息隊(duì)列、文件系統(tǒng)等。它負(fù)責(zé)處理與外部系統(tǒng)的通信、數(shù)據(jù)格式轉(zhuǎn)換、數(shù)據(jù)讀取和寫入等任務(wù)。無論是作為輸入數(shù)據(jù)表還是輸出數(shù)據(jù)表,通過使用適當(dāng)?shù)倪B接器,可以在 Flink SQL 中訪問和操作外部系統(tǒng)中的數(shù)據(jù)。目前實(shí)時(shí)平臺(tái)提供了很多常用的連接器:
例如:
- JDBC:用于與關(guān)系型數(shù)據(jù)庫(kù)(如 MySQL、PostgreSQL)建立連接,并支持在 Flink SQL 中讀取和寫入數(shù)據(jù)庫(kù)表的數(shù)據(jù)。
- JDQ:用于與 JDQ 集成,可以讀取和寫入 JDQ 主題中的數(shù)據(jù)。
- Elasticsearch:用于與 Elasticsearch 集成,可以將數(shù)據(jù)寫入 Elasticsearch 索引或從索引中讀取數(shù)據(jù)。
- File Connector:用于讀取和寫入各種文件格式(如 CSV、JSON、Parquet)的數(shù)據(jù)。
- ……
還有如 HBase、JMQ4、Doris、Clickhouse,Jimdb,Hive 等,用于與不同的數(shù)據(jù)源進(jìn)行集成。通過使用 Flink SQL Connector,我們可以輕松地與外部系統(tǒng)進(jìn)行數(shù)據(jù)交互,將數(shù)據(jù)導(dǎo)入到 Flink 進(jìn)行處理,或 將處理結(jié)果導(dǎo)出到外部系統(tǒng)。
3.DataGen Connector
DataGen 是 Flink SQL 提供的一個(gè)內(nèi)置連接器,用于生成模擬的測(cè)試數(shù)據(jù),以便在開發(fā)和測(cè)試過程中使用。
使用 DataGen,可以生成具有不同數(shù)據(jù)類型和分布的數(shù)據(jù),例如整數(shù)、字符串、日期等。這樣可以模擬真實(shí)的數(shù)據(jù)場(chǎng)景,并幫助驗(yàn)證和調(diào)試 Flink SQL 查詢和操作。
3.1 Demo
以下是一個(gè)使用 DataGen 函數(shù)的簡(jiǎn)單示例:
-- 創(chuàng)建輸入表
CREATE TABLE input_table (
order_number BIGINT,
price DECIMAL(32,2),
buyer ROW <first_name STRING,last_name STRING>,
order_time TIMESTAMP(3)
) WITH (
'connector' = 'datagen',
);
在上面的示例中,我們使用 DataGen 連接器創(chuàng)建了一個(gè)名為 input_table
的輸入表。該表包含了 order_number
、price
、buyer
、order_time
四個(gè)字段。默認(rèn)是 Random 隨機(jī)生成對(duì)應(yīng)類型的數(shù)據(jù),生產(chǎn)速率是
10000
10000
10000 條/秒,只要任務(wù)不停,就會(huì)源源不斷的生產(chǎn)數(shù)據(jù)。當(dāng)然也可以指定一些參數(shù)來定義生成數(shù)據(jù)的規(guī)則,例如每秒生成的行數(shù)、字段的數(shù)據(jù)類型和分布。
生成的數(shù)據(jù)樣例:
{
"order_number": -6353089831284155505,
"price": 253422671148527900374700392448,
"buyer": {
"first_name": "6e4df4455bed12c8ad74f03471e5d8e3141d7977bcc5bef88a57102dac71ac9a9dbef00f406ce9bddaf3741f37330e5fb9d2",
"last_name": "d7d8a39e063fbd2beac91c791dc1024e2b1f0857b85990fbb5c4eac32445951aad0a2bcffd3a29b2a08b057a0b31aa689ed7"
},
"order_time": "2023-09-21 06:22:29.618"
}
{
"order_number": 1102733628546646982,
"price": 628524591222898424803263250432,
"buyer": {
"first_name": "4738f237436b70c80e504b95f0d9ec3d7c01c8745edf21495f17bb4d7044b4950943014f26b5d7fdaed10db37a632849b96c",
"last_name": "7f9dbdbed581b687989665b97c09dec1a617c830c048446bf31c746898e1bccfe21a5969ee174a1d69845be7163b5e375a09"
},
"order_time": "2023-09-21 06:23:01.69"
}
3.2 支持的類型
字段類型 | 數(shù)據(jù)生成方式 |
---|---|
BOOLEAN |
random |
CHAR |
random / sequence |
VARCHAR |
random / sequence |
STRING |
random / sequence |
DECIMAL |
random / sequence |
TINYINT |
random / sequence |
SMALLINT |
random / sequence |
INT |
random / sequence |
BIGINT |
random / sequence |
FLOAT |
random / sequence |
DOUBLE |
random / sequence |
DATE |
random |
TIME |
random |
TIMESTAMP |
random |
TIMESTAMP_LTZ |
random |
INTERVAL YEAR TO MONTH |
random |
INTERVAL DAY TO MONTH |
random |
ROW |
random |
ARRAY |
random |
MAP |
random |
MULTISET |
random |
3.3 連接器屬性
屬性 | 是否必填 | 默認(rèn)值 | 類型 |
|
---|---|---|---|---|
connector |
required | (none) | String | ‘datagen’ |
rows-per-second |
optional | 10000 10000 10000 | Long | 數(shù)據(jù)生產(chǎn)速率 |
number-of-rows |
optional | (none) | Long | 指定生產(chǎn)的數(shù)據(jù)條數(shù),默認(rèn)是不限制 |
fields.#.kind |
optional | random | String | 指定字段的生產(chǎn)數(shù)據(jù)的方式 random 還是 sequence |
fields.#.min |
optional | (Minimum value of type) | (Type of field) | random 生成器的指定字段 # 最小值,支持?jǐn)?shù)字類型 |
fields.#.max |
optional | (Maximum value of type) | (Type of field) | random 生成器的指定字段 # 最大值,支持?jǐn)?shù)字類型 |
fields.#.length |
optional | 100 100 100 | Integer | char / varchar / string / array / map / multiset 類型的長(zhǎng)度 |
fields.#.start |
optional | (none) | (Type of field) | sequence 生成器的開始值 |
fields.#.end |
optional | (none) | (Type of field) | sequence 生成器的結(jié)束值 |
4.DataGen 使用案例
4.1 場(chǎng)景一:生成一億條數(shù)據(jù)到 Hive 表
CREATE TABLE dataGenSourceTable (
order_number BIGINT,
price DECIMAL(10, 2),
buyer STRING,
order_time TIMESTAMP(3)
) WITH (
'connector'='datagen',
'number-of-rows'='100000000',
'rows-per-second' = '100000'
);
CREATE CATALOG myhive
WITH (
'type'='hive',
'default-database'='default'
);
USE CATALOG myhive;
USE dev;
SET table.sql-dialect=hive;
CREATE TABLE if not exists shipu3_test_0932 (
order_number BIGINT,
price DECIMAL(10, 2),
buyer STRING,
order_time TIMESTAMP(3)
) PARTITIONED BY (dt STRING) STORED AS parquet TBLPROPERTIES (
'partition.time-extractor.timestamp-pattern'='$dt',
'sink.partition-commit.trigger'='partition-time',
'sink.partition-commit.delay'='1 h',
'sink.partition-commit.policy.kind'='metastore,success-file'
);
SET table.sql-dialect=default;
insert into myhive.dev.shipu3_test_0932
select order_number, price, buyer, order_time, cast(CURRENT_DATE as varchar)
from default_catalog.default_database.dataGenSourceTable;
當(dāng)每秒生產(chǎn) 10 萬(wàn)條數(shù)據(jù)的時(shí)候,17 分鐘左右就可以完成,當(dāng)然我們可以通過增加 Flink 任務(wù)的計(jì)算節(jié)點(diǎn)、并行度、提高生產(chǎn)速率 rows-per-second
的值等來更快速的完成大數(shù)據(jù)量的生產(chǎn)。
4.2 場(chǎng)景二:持續(xù)每秒生產(chǎn) 10 萬(wàn)條數(shù)到消息隊(duì)列
CREATE TABLE dataGenSourceTable (
order_number BIGINT,
price INT,
buyer ROW <first_name STRING,last_name STRING>,
order_time TIMESTAMP(3),
col_array ARRAY <STRING>,
col_map map <STRING,STRING>
) WITH (
'connector'='datagen', --連接器類型
'rows-per-second'='100000', --生產(chǎn)速率
'fields.order_number.kind'='random', --字段order_number的生產(chǎn)方式
'fields.order_number.min'='1', --字段order_number最小值
'fields.order_number.max'='1000', --字段order_number最大值
'fields.price.kind'='sequence', --字段price的生產(chǎn)方式
'fields.price.start'='1', --字段price開始值
'fields.price.end'='1000', --字段price最大值
'fields.col_array.element.length'='5', --每個(gè)元素的長(zhǎng)度
'fields.col_map.key.length'='5', --map key的長(zhǎng)度
'fields.col_map.value.length'='5' --map value的長(zhǎng)度
);
CREATE TABLE jdqsink1 (
order_number BIGINT,
price DECIMAL(32, 2),
buyer ROW <first_name STRING,last_name STRING>,
order_time TIMESTAMP(3),
col_ARRAY ARRAY <STRING>,
col_map map <STRING,STRING>
) WITH (
'connector'='jdq',
'topic'='jrdw-fk-area_info__1',
'jdq.client.id'='xxxxx',
'jdq.password'='xxxxxxx',
'jdq.domain'='db.test.group.com',
'format'='json'
);
INSERTINTO jdqsink1
SELECT * FROM dataGenSourceTable;
5.思考
通過以上案例可以看到,通過 Datagen 結(jié)合其他連接器可以模擬各種場(chǎng)景的數(shù)據(jù)。文章來源:http://www.zghlxwxcb.cn/news/detail-805609.html
- 性能測(cè)試:我們可以利用 Flink 的高處理性能,來調(diào)試任務(wù)的外部依賴的閾值(超時(shí),限流等)到一個(gè)合適的水位,避免自己的任務(wù)有過多的外部依賴出現(xiàn)木桶效應(yīng)。
- 邊界條件測(cè)試:我們通過使用 Flink DataGen 生成特殊的測(cè)試數(shù)據(jù),如最小值、最大值、空值、重復(fù)值等來驗(yàn)證 Flink 任務(wù)在邊界條件下的正確性和魯棒性。
- 數(shù)據(jù)完整性測(cè)試:我們通過 Flink DataGen 可以生成包含錯(cuò)誤或異常數(shù)據(jù)的數(shù)據(jù)集,如無效的數(shù)據(jù)格式、缺失的字段、重復(fù)的數(shù)據(jù)等。從而可以測(cè)試 Flink 任務(wù)對(duì)異常情況的處理能力,驗(yàn)證 Flink 任務(wù)在處理數(shù)據(jù)時(shí)是否能夠正確地保持?jǐn)?shù)據(jù)的完整性。
總之,F(xiàn)link DataGen 是一個(gè)強(qiáng)大的工具,可以幫助測(cè)試人員構(gòu)造各種類型的測(cè)試數(shù)據(jù)。通過合理的使用 ,測(cè)試人員可以更有效地進(jìn)行測(cè)試,并發(fā)現(xiàn)潛在的問題和缺陷。文章來源地址http://www.zghlxwxcb.cn/news/detail-805609.html
到了這里,關(guān)于【大數(shù)據(jù)】Flink 測(cè)試?yán)鳎篋ataGen的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!