国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

【大數(shù)據(jù)】Flink 測(cè)試?yán)鳎篋ataGen

這篇具有很好參考價(jià)值的文章主要介紹了【大數(shù)據(jù)】Flink 測(cè)試?yán)鳎篋ataGen。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

1.什么是 FlinkSQL ?

Flink SQL 是基于 Apache Calcite 的 SQL 解析器和優(yōu)化器構(gòu)建的,支持 ANSI SQL 標(biāo)準(zhǔn),允許使用標(biāo)準(zhǔn)的 SQL 語(yǔ)句來處理流式和批處理數(shù)據(jù)。通過 Flink SQL,可以以聲明式的方式描述數(shù)據(jù)處理邏輯,而無需編寫顯式的代碼。使用 Flink SQL,可以執(zhí)行各種數(shù)據(jù)操作,如 過濾聚合、連接轉(zhuǎn)換 等。它還提供了 窗口操作、時(shí)間處理復(fù)雜事件處理 等功能,以滿足流式數(shù)據(jù)處理的需求。

Flink SQL 提供了許多擴(kuò)展功能和語(yǔ)法,以適應(yīng) Flink 的流式和批處理引擎的特性。它是 Flink 最高級(jí)別的抽象,可以與 DataStream API 和 DataSet API 無縫集成,利用 Flink 的分布式計(jì)算能力和容錯(cuò)機(jī)制。

【大數(shù)據(jù)】Flink 測(cè)試?yán)鳎篋ataGen,# Flink,大數(shù)據(jù),flink,測(cè)試,DataGen,測(cè)試數(shù)據(jù),Connector,FlinkSQL
使用 Flink SQL 處理數(shù)據(jù)的基本步驟:

  • 定義輸入表:使用 CREATE TABLE 語(yǔ)句定義輸入表,指定表的模式(字段和類型)和數(shù)據(jù)源(如 Kafka、文件等)。
  • 執(zhí)行 SQL 查詢:使用 SELECT、INSERT INTO 等 SQL 語(yǔ)句來執(zhí)行數(shù)據(jù)查詢和操作。您可以在 SQL 查詢中使用各種內(nèi)置函數(shù)、聚合操作、窗口操作和時(shí)間屬性等。
  • 定義輸出表:使用 CREATE TABLE 語(yǔ)句定義輸出表,指定表的模式和目標(biāo)數(shù)據(jù)存儲(chǔ)(如 Kafka、文件等)。
  • 提交作業(yè):將 Flink SQL 查詢作為 Flink 作業(yè)提交到 Flink 集群中執(zhí)行。Flink 會(huì)根據(jù)查詢的邏輯和配置自動(dòng)構(gòu)建執(zhí)行計(jì)劃,并將數(shù)據(jù)處理任務(wù)分發(fā)到集群中的任務(wù)管理器進(jìn)行執(zhí)行。

總而言之,我們可以通過 Flink SQL 查詢和操作來處理流式和批處理數(shù)據(jù)。它提供了一種簡(jiǎn)化和加速數(shù)據(jù)處理開發(fā)的方式,尤其適用于熟悉 SQL 的開發(fā)人員和數(shù)據(jù)工程師。

2.什么是 Connector ?

Flink Connector 是指 用于連接外部系統(tǒng)和數(shù)據(jù)源的組件。它允許 Flink 通過特定的連接器與不同的數(shù)據(jù)源進(jìn)行交互,例如數(shù)據(jù)庫(kù)、消息隊(duì)列、文件系統(tǒng)等。它負(fù)責(zé)處理與外部系統(tǒng)的通信、數(shù)據(jù)格式轉(zhuǎn)換、數(shù)據(jù)讀取和寫入等任務(wù)。無論是作為輸入數(shù)據(jù)表還是輸出數(shù)據(jù)表,通過使用適當(dāng)?shù)倪B接器,可以在 Flink SQL 中訪問和操作外部系統(tǒng)中的數(shù)據(jù)。目前實(shí)時(shí)平臺(tái)提供了很多常用的連接器:

例如:

  • JDBC:用于與關(guān)系型數(shù)據(jù)庫(kù)(如 MySQL、PostgreSQL)建立連接,并支持在 Flink SQL 中讀取和寫入數(shù)據(jù)庫(kù)表的數(shù)據(jù)。
  • JDQ:用于與 JDQ 集成,可以讀取和寫入 JDQ 主題中的數(shù)據(jù)。
  • Elasticsearch:用于與 Elasticsearch 集成,可以將數(shù)據(jù)寫入 Elasticsearch 索引或從索引中讀取數(shù)據(jù)。
  • File Connector:用于讀取和寫入各種文件格式(如 CSV、JSON、Parquet)的數(shù)據(jù)。
  • ……

還有如 HBase、JMQ4、Doris、Clickhouse,Jimdb,Hive 等,用于與不同的數(shù)據(jù)源進(jìn)行集成。通過使用 Flink SQL Connector,我們可以輕松地與外部系統(tǒng)進(jìn)行數(shù)據(jù)交互,將數(shù)據(jù)導(dǎo)入到 Flink 進(jìn)行處理,或 將處理結(jié)果導(dǎo)出到外部系統(tǒng)

【大數(shù)據(jù)】Flink 測(cè)試?yán)鳎篋ataGen,# Flink,大數(shù)據(jù),flink,測(cè)試,DataGen,測(cè)試數(shù)據(jù),Connector,FlinkSQL

3.DataGen Connector

DataGen 是 Flink SQL 提供的一個(gè)內(nèi)置連接器,用于生成模擬的測(cè)試數(shù)據(jù),以便在開發(fā)和測(cè)試過程中使用。

使用 DataGen,可以生成具有不同數(shù)據(jù)類型和分布的數(shù)據(jù),例如整數(shù)、字符串、日期等。這樣可以模擬真實(shí)的數(shù)據(jù)場(chǎng)景,并幫助驗(yàn)證和調(diào)試 Flink SQL 查詢和操作。

3.1 Demo

以下是一個(gè)使用 DataGen 函數(shù)的簡(jiǎn)單示例:

-- 創(chuàng)建輸入表
CREATE TABLE input_table (
	order_number BIGINT,
	price DECIMAL(32,2),
	buyer ROW <first_name STRING,last_name STRING>,
	order_time TIMESTAMP(3)
) WITH (
	'connector' = 'datagen',
);

在上面的示例中,我們使用 DataGen 連接器創(chuàng)建了一個(gè)名為 input_table 的輸入表。該表包含了 order_number、pricebuyer、order_time 四個(gè)字段。默認(rèn)是 Random 隨機(jī)生成對(duì)應(yīng)類型的數(shù)據(jù),生產(chǎn)速率是 10000 10000 10000 條/秒,只要任務(wù)不停,就會(huì)源源不斷的生產(chǎn)數(shù)據(jù)。當(dāng)然也可以指定一些參數(shù)來定義生成數(shù)據(jù)的規(guī)則,例如每秒生成的行數(shù)、字段的數(shù)據(jù)類型和分布。

生成的數(shù)據(jù)樣例:

{
    "order_number": -6353089831284155505,
    "price": 253422671148527900374700392448,
    "buyer": {
        "first_name": "6e4df4455bed12c8ad74f03471e5d8e3141d7977bcc5bef88a57102dac71ac9a9dbef00f406ce9bddaf3741f37330e5fb9d2",
        "last_name": "d7d8a39e063fbd2beac91c791dc1024e2b1f0857b85990fbb5c4eac32445951aad0a2bcffd3a29b2a08b057a0b31aa689ed7"
    },
    "order_time": "2023-09-21 06:22:29.618"
}
{
    "order_number": 1102733628546646982,
    "price": 628524591222898424803263250432,
    "buyer": {
        "first_name": "4738f237436b70c80e504b95f0d9ec3d7c01c8745edf21495f17bb4d7044b4950943014f26b5d7fdaed10db37a632849b96c",
        "last_name": "7f9dbdbed581b687989665b97c09dec1a617c830c048446bf31c746898e1bccfe21a5969ee174a1d69845be7163b5e375a09"
    },
    "order_time": "2023-09-21 06:23:01.69"
}

3.2 支持的類型

字段類型 數(shù)據(jù)生成方式
BOOLEAN random
CHAR random / sequence
VARCHAR random / sequence
STRING random / sequence
DECIMAL random / sequence
TINYINT random / sequence
SMALLINT random / sequence
INT random / sequence
BIGINT random / sequence
FLOAT random / sequence
DOUBLE random / sequence
DATE random
TIME random
TIMESTAMP random
TIMESTAMP_LTZ random
INTERVAL YEAR TO MONTH random
INTERVAL DAY TO MONTH random
ROW random
ARRAY random
MAP random
MULTISET random

3.3 連接器屬性

屬性 是否必填 默認(rèn)值 類型
描述
connector required (none) String ‘datagen’
rows-per-second optional 10000 10000 10000 Long 數(shù)據(jù)生產(chǎn)速率
number-of-rows optional (none) Long 指定生產(chǎn)的數(shù)據(jù)條數(shù),默認(rèn)是不限制
fields.#.kind optional random String 指定字段的生產(chǎn)數(shù)據(jù)的方式 random 還是 sequence
fields.#.min optional (Minimum value of type) (Type of field) random 生成器的指定字段 # 最小值,支持?jǐn)?shù)字類型
fields.#.max optional (Maximum value of type) (Type of field) random 生成器的指定字段 # 最大值,支持?jǐn)?shù)字類型
fields.#.length optional 100 100 100 Integer char / varchar / string / array / map / multiset 類型的長(zhǎng)度
fields.#.start optional (none) (Type of field) sequence 生成器的開始值
fields.#.end optional (none) (Type of field) sequence 生成器的結(jié)束值

4.DataGen 使用案例

4.1 場(chǎng)景一:生成一億條數(shù)據(jù)到 Hive 表

CREATE TABLE dataGenSourceTable (
	order_number BIGINT,
	price DECIMAL(10, 2),
	buyer STRING,
	order_time TIMESTAMP(3)
) WITH ( 
	'connector'='datagen', 
	'number-of-rows'='100000000',
	'rows-per-second' = '100000'
);


CREATE CATALOG myhive
WITH (
	'type'='hive',
	'default-database'='default'
);
USE CATALOG myhive;
USE dev;
SET table.sql-dialect=hive;
CREATE TABLE if not exists shipu3_test_0932 (
	order_number BIGINT,
	price DECIMAL(10, 2),
	buyer STRING,
	order_time TIMESTAMP(3)
) PARTITIONED BY (dt STRING) STORED AS parquet TBLPROPERTIES (
	'partition.time-extractor.timestamp-pattern'='$dt',
	'sink.partition-commit.trigger'='partition-time',
	'sink.partition-commit.delay'='1 h',
	'sink.partition-commit.policy.kind'='metastore,success-file'
);
SET table.sql-dialect=default;
insert into myhive.dev.shipu3_test_0932
select order_number, price, buyer, order_time, cast(CURRENT_DATE as varchar)
from default_catalog.default_database.dataGenSourceTable;

當(dāng)每秒生產(chǎn) 10 萬(wàn)條數(shù)據(jù)的時(shí)候,17 分鐘左右就可以完成,當(dāng)然我們可以通過增加 Flink 任務(wù)的計(jì)算節(jié)點(diǎn)、并行度、提高生產(chǎn)速率 rows-per-second 的值等來更快速的完成大數(shù)據(jù)量的生產(chǎn)。

4.2 場(chǎng)景二:持續(xù)每秒生產(chǎn) 10 萬(wàn)條數(shù)到消息隊(duì)列

CREATE TABLE dataGenSourceTable (
	order_number BIGINT,
	price INT,
	buyer ROW <first_name STRING,last_name STRING>,
	order_time TIMESTAMP(3),
	col_array ARRAY <STRING>,
	col_map map <STRING,STRING>
) WITH ( 
	'connector'='datagen', --連接器類型
	'rows-per-second'='100000', --生產(chǎn)速率
	'fields.order_number.kind'='random', --字段order_number的生產(chǎn)方式
	'fields.order_number.min'='1', --字段order_number最小值
	'fields.order_number.max'='1000', --字段order_number最大值
	'fields.price.kind'='sequence', --字段price的生產(chǎn)方式
	'fields.price.start'='1', --字段price開始值
	'fields.price.end'='1000', --字段price最大值
	'fields.col_array.element.length'='5', --每個(gè)元素的長(zhǎng)度
	'fields.col_map.key.length'='5', --map key的長(zhǎng)度
	'fields.col_map.value.length'='5' --map value的長(zhǎng)度
);

CREATE TABLE jdqsink1 (
	order_number BIGINT,
	price DECIMAL(32, 2),
	buyer ROW <first_name STRING,last_name STRING>,
	order_time TIMESTAMP(3),
	col_ARRAY ARRAY <STRING>,
	col_map map <STRING,STRING>
) WITH (
	'connector'='jdq',
	'topic'='jrdw-fk-area_info__1',
	'jdq.client.id'='xxxxx',
	'jdq.password'='xxxxxxx',
	'jdq.domain'='db.test.group.com',
	'format'='json'
);

INSERTINTO jdqsink1
SELECT * FROM dataGenSourceTable;

5.思考

通過以上案例可以看到,通過 Datagen 結(jié)合其他連接器可以模擬各種場(chǎng)景的數(shù)據(jù)。

  • 性能測(cè)試:我們可以利用 Flink 的高處理性能,來調(diào)試任務(wù)的外部依賴的閾值(超時(shí),限流等)到一個(gè)合適的水位,避免自己的任務(wù)有過多的外部依賴出現(xiàn)木桶效應(yīng)。
  • 邊界條件測(cè)試:我們通過使用 Flink DataGen 生成特殊的測(cè)試數(shù)據(jù),如最小值、最大值、空值、重復(fù)值等來驗(yàn)證 Flink 任務(wù)在邊界條件下的正確性和魯棒性。
  • 數(shù)據(jù)完整性測(cè)試:我們通過 Flink DataGen 可以生成包含錯(cuò)誤或異常數(shù)據(jù)的數(shù)據(jù)集,如無效的數(shù)據(jù)格式、缺失的字段、重復(fù)的數(shù)據(jù)等。從而可以測(cè)試 Flink 任務(wù)對(duì)異常情況的處理能力,驗(yàn)證 Flink 任務(wù)在處理數(shù)據(jù)時(shí)是否能夠正確地保持?jǐn)?shù)據(jù)的完整性。

總之,F(xiàn)link DataGen 是一個(gè)強(qiáng)大的工具,可以幫助測(cè)試人員構(gòu)造各種類型的測(cè)試數(shù)據(jù)。通過合理的使用 ,測(cè)試人員可以更有效地進(jìn)行測(cè)試,并發(fā)現(xiàn)潛在的問題和缺陷。文章來源地址http://www.zghlxwxcb.cn/news/detail-805609.html

到了這里,關(guān)于【大數(shù)據(jù)】Flink 測(cè)試?yán)鳎篋ataGen的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 第3.4章:StarRocks數(shù)據(jù)導(dǎo)入--Flink Connector與CDC秒級(jí)數(shù)據(jù)同步

    Flink作為當(dāng)前流行的流式計(jì)算框架,在對(duì)接StarRocks時(shí),若直接使用JDBC的方式“流式”寫入數(shù)據(jù),對(duì)StarRocks是不友好的,StarRocks作為一款MVCC的數(shù)據(jù)庫(kù),其導(dǎo)入的核心思想還是“攢微批+降頻率”。為此,StarRocks單獨(dú)開發(fā)了flink-connector-starrocks,其內(nèi)部實(shí)現(xiàn)仍是通過對(duì)數(shù)據(jù)緩存攢批

    2023年04月15日
    瀏覽(51)
  • Flink 學(xué)習(xí)十 FlinkSQL

    Flink 學(xué)習(xí)十 FlinkSQL

    flink sql 基于flink core ,使用sql 語(yǔ)義方便快捷的進(jìn)行結(jié)構(gòu)化數(shù)據(jù)處理的上層庫(kù); 類似理解sparksql 和sparkcore , hive和mapreduce 1.1 工作流程 整體架構(gòu)和工作流程 數(shù)據(jù)流,綁定元數(shù)據(jù) schema ,注冊(cè)成catalog 中的表 table / view 用戶使用table Api / table sql 來表達(dá)計(jì)算邏輯 table-planner利用 apache calci

    2024年02月10日
    瀏覽(17)
  • Flink 優(yōu)化(六) --------- FlinkSQL 調(diào)優(yōu)

    Flink 優(yōu)化(六) --------- FlinkSQL 調(diào)優(yōu)

    FlinkSQL 官網(wǎng)配置參數(shù): https://ci.apache.org/projects/flink/flink-docs-release-1.13/dev/table/config.html Flink SQL 新手有可能犯的錯(cuò)誤,其中之一就是忘記設(shè)置空閑狀態(tài)保留時(shí)間導(dǎo)致狀態(tài)爆炸。列舉兩個(gè)場(chǎng)景: ? FlinkSQL 的 regular join(inner、left、right),左右表的數(shù)據(jù)都會(huì)一直保存在狀態(tài)里,不

    2024年02月14日
    瀏覽(21)
  • 【實(shí)戰(zhàn)-01】flink cdc 實(shí)時(shí)數(shù)據(jù)同步利器

    【實(shí)戰(zhàn)-01】flink cdc 實(shí)時(shí)數(shù)據(jù)同步利器

    cdc github源碼地址 cdc官方文檔 對(duì)很多初入門的人來說是無法理解cdc到底是什么個(gè)東西。 有這樣一個(gè)需求,比如在mysql數(shù)據(jù)庫(kù)中存在很多數(shù)據(jù),但是公司要把mysql中的數(shù)據(jù)同步到數(shù)據(jù)倉(cāng)庫(kù)(starrocks), 數(shù)據(jù)倉(cāng)庫(kù)你可以理解為存儲(chǔ)了各種各樣來自不同數(shù)據(jù)庫(kù)中表。 數(shù)據(jù)的同步目前對(duì)

    2023年04月08日
    瀏覽(94)
  • Flink:FlinkSql解析嵌套Json

    Flink:FlinkSql解析嵌套Json

    日常開發(fā)中都是用的簡(jiǎn)便json格式,但是偶爾也會(huì)遇到嵌套json的時(shí)候,因此在用flinksql的時(shí)候就有點(diǎn)麻煩,下面用簡(jiǎn)單例子簡(jiǎn)單定義處理下 1,數(shù)據(jù)是網(wǎng)上摘抄,但包含里常用的大部分格式 { ?? ?\\\"afterColumns\\\": { ?? ??? ?\\\"created\\\": \\\"1589186680\\\", ?? ??? ?\\\"extra\\\": { ?? ??? ??? ?\\\"

    2023年04月09日
    瀏覽(25)
  • Flink實(shí)戰(zhàn)-(6)FlinkSQL實(shí)現(xiàn)CDC

    FlinkSQL說明 Flink SQL 是 Flink 實(shí)時(shí)計(jì)算為簡(jiǎn)化計(jì)算模型,降低用戶使用實(shí)時(shí)計(jì)算門檻而設(shè)計(jì)的一套符合標(biāo)準(zhǔn) SQL 語(yǔ)義的開發(fā)語(yǔ)言。 自 2015 年開始,阿里巴巴開始調(diào)研開源流計(jì)算引擎,最終決定基于 Flink 打造新一代計(jì)算引擎,針對(duì) Flink 存在的不足進(jìn)行優(yōu)化和改進(jìn),并且在 2019 年初

    2023年04月26日
    瀏覽(26)
  • flink學(xué)習(xí)35:flinkSQL查詢mysql

    flink學(xué)習(xí)35:flinkSQL查詢mysql

    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.table.api.EnvironmentSettings import org.apache.flink.table.api.bridge.scala.{StreamTableEnvironment, tableConversions} object sqlQueryTable { ? def main(args: Array[String]): Unit = { ??? //create env ??? val env = StreamExecutionEnvironment.getExecutionEnv

    2023年04月23日
    瀏覽(19)
  • 【Flink系列七】TableAPI和FlinkSQL初體驗(yàn)

    【Flink系列七】TableAPI和FlinkSQL初體驗(yàn)

    Apache Flink 有兩種關(guān)系型 API 來做流批統(tǒng)一處理:Table API 和 SQL Table API 是用于 Scala 和 Java 語(yǔ)言的查詢API,它可以用一種非常直觀的方式來組合使用選取、過濾、join 等關(guān)系型算子。 ?Flink SQL 是基于?Apache Calcite?來實(shí)現(xiàn)的標(biāo)準(zhǔn) SQL。無論輸入是連續(xù)的(流式)還是有界的(批處理

    2024年02月03日
    瀏覽(22)
  • Flink Connector 開發(fā)

    Flink Connector 開發(fā)

    Flink 是新一代流 批統(tǒng)一的計(jì)算引擎 ,它需要從不同的第三方存儲(chǔ)引擎中把數(shù)據(jù)讀過來,進(jìn)行處理,然后再寫出到另外的存儲(chǔ)引擎中。 Connector 的作用就相當(dāng)于一個(gè)連接器 ,連接 Flink 計(jì)算引擎跟外界存儲(chǔ)系統(tǒng)。 Flink 里有以下幾種方式,當(dāng)然也不限于這幾種方式可以跟外界進(jìn)行

    2024年02月03日
    瀏覽(24)
  • Flink RocketMQ Connector實(shí)現(xiàn)

    Flink內(nèi)置了很多Connector,可以滿足大部分場(chǎng)景。但是還是有一些場(chǎng)景無法滿足,比如RocketMQ。需要消費(fèi)RocketMQ的消息,需要自定時(shí)Source。 參考FlinkKafkaConsumer: 可以看到,自定義的Source,只需要實(shí)現(xiàn)SourceFunction。 創(chuàng)建FlinkRocketMQConsumer,實(shí)現(xiàn)SourceFunction,重寫run()和cancel()方法 需要

    2024年02月11日
    瀏覽(24)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包