PiflowX如何快速開發(fā)flink程序
參考資料
Flink最鋒利的武器:Flink SQL入門和實戰(zhàn) | 附完整實現(xiàn)代碼-騰訊云開發(fā)者社區(qū)-騰訊云 (tencent.com)
Flink SQL 背景
Flink SQL 是 Flink 實時計算為簡化計算模型,降低用戶使用實時計算門檻而設(shè)計的一套符合標準 SQL 語義的開發(fā)語言。
自 2015 年開始,阿里巴巴開始調(diào)研開源流計算引擎,最終決定基于 Flink 打造新一代計算引擎,針對 Flink 存在的不足進行優(yōu)化和改進,并且在 2019 年初將最終代碼開源,也就是我們熟知的 Blink。Blink 在原來的 Flink 基礎(chǔ)上最顯著的一個貢獻就是 Flink SQL 的實現(xiàn)。
Flink SQL 是面向用戶的 API 層,在我們傳統(tǒng)的流式計算領(lǐng)域,比如 Storm、Spark Streaming 都會提供一些 Function 或者 Datastream API,用戶通過 Java 或 Scala 寫業(yè)務(wù)邏輯,這種方式雖然靈活,但有一些不足,比如具備一定門檻且調(diào)優(yōu)較難,隨著版本的不斷更新,API 也出現(xiàn)了很多不兼容的地方。
在這個背景下,毫無疑問,SQL 就成了我們最佳選擇,之所以選擇將 SQL 作為核心 API,是因為其具有幾個非常重要的特點:
- SQL 屬于設(shè)定式語言,用戶只要表達清楚需求即可,不需要了解具體做法;
- SQL 可優(yōu)化,內(nèi)置多種查詢優(yōu)化器,這些查詢優(yōu)化器可為 SQL 翻譯出最優(yōu)執(zhí)行計劃;
- SQL 易于理解,不同行業(yè)和領(lǐng)域的人都懂,學習成本較低;
- SQL 非常穩(wěn)定,在數(shù)據(jù)庫 30 多年的歷史中,SQL 本身變化較少;
- 流與批的統(tǒng)一,F(xiàn)link 底層 Runtime 本身就是一個流與批統(tǒng)一的引擎,而 SQL 可以做到 API 層的流與批統(tǒng)一。
Flink SQL 常規(guī)實戰(zhàn)應(yīng)用
案例來自(Flink最鋒利的武器:Flink SQL入門和實戰(zhàn) | 附完整實現(xiàn)代碼-騰訊云開發(fā)者社區(qū)-騰訊云 (tencent.com))!詳細流程有興趣可以參考原文示例。(如有侵犯,請請聯(lián)系!)。
在此,簡單總結(jié)一下flink sql的開發(fā)流程:
1.首先需要創(chuàng)建maven工程,確認需要的各種依賴,運氣好的話,還需要花費大量的精力和時間去排查依賴沖突的問題(oh God bless me!);
2.開始balabala編寫模板代碼,如:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = BatchTableEnvironment.getTableEnvironment(env);
3.數(shù)據(jù)準備和預處理;
DataSet<String> input = env.readTextFile("score.csv");
DataSet<PlayerData> topInput = input.map(new MapFunction<String, PlayerData>() {
@Override
public PlayerData map(String s) throws Exception {
String[] split = s.split(",");
return new PlayerData(String.valueOf(split[0]),
String.valueOf(split[1]),
String.valueOf(split[2]),
Integer.valueOf(split[3]),
Double.valueOf(split[4]),
Double.valueOf(split[5]),
Double.valueOf(split[6]),
Double.valueOf(split[7]),
Double.valueOf(split[8])
);
}
});
其中的PlayerData類為自定義類:
public static class PlayerData {
/**
* 賽季,球員,出場,首發(fā),時間,助攻,搶斷,蓋帽,得分
*/
public String season;
public String player;
public String play_num;
public Integer first_court;
public Double time;
public Double assists;
public Double steals;
public Double blocks;
public Double scores;
public PlayerData() {
super();
}
public PlayerData(String season,
String player,
String play_num,
Integer first_court,
Double time,
Double assists,
Double steals,
Double blocks,
Double scores
) {
this.season = season;
this.player = player;
this.play_num = play_num;
this.first_court = first_court;
this.time = time;
this.assists = assists;
this.steals = steals;
this.blocks = blocks;
this.scores = scores;
}
}
4.終于到了真正的業(yè)務(wù)處理了,有了flink sql的強大和方便,倒是省了不少代碼;
Table queryResult = tableEnv.sqlQuery("
select player,
count(season) as num
FROM score
GROUP BY player
ORDER BY num desc
LIMIT 3
");
5.ok,到此,數(shù)據(jù)處理和計算邏輯完畢,處理結(jié)果寫入到sink,可以完結(jié)散花咯,哈哈;
DataSet<Result> result = tableEnv.toDataSet(queryResult, Result.class);
result.print();
6.哦!好像還需要調(diào)試運行,好吧,再辛苦一會,便可大功告成!
7.完美,上線。。。。。。
(以上,純屬娛樂,如有不當,敬請諒解?。?/p>
可見,在平日開發(fā)一個flink任務(wù)雖已盡可能簡單,但開發(fā)周期也得1-2個工作日,甚至更長,有沒有簡單粗暴的,讓我分分鐘領(lǐng)盒飯,不,讓我分分鐘高效完成任務(wù)的!
當然有啦?。?!接下來讓我隆重的介紹一下今天的主角—PilfowX
—大數(shù)據(jù)流水線系統(tǒng)。有興趣可以查看之前的文章(StreamPark + PiflowX 打造新一代大數(shù)據(jù)計算處理平臺-CSDN博客)。
PiflowX是基于Piflow和StreamPark二開實現(xiàn)的,在其基礎(chǔ)上,實現(xiàn)了圖像化拖拉拽的方式開發(fā)spark或flink作業(yè),這里我將介紹flink任務(wù)的開發(fā)流程,以及如何零代碼實現(xiàn)flink sql的開發(fā)。
PiflowX的flink組件算子基本都是基于flink table和sql實現(xiàn)的,我們只需在UI界面填寫組件相關(guān)參數(shù),之后的工作交給底層框架即可。
我們回顧一下flink sql語法定義。
Flink SQL 的語法和算子
Flink SQL 核心算子的語義設(shè)計參考了 1992、2011 等 ANSI-SQL 標準,F(xiàn)link 使用 Apache Calcite 解析 SQL ,Calcite 支持標準的 ANSI SQL。
CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name
(
{ <physical_column_definition> | <metadata_column_definition> | <computed_column_definition> }[ , ...n]
[ <watermark_definition> ]
[ <table_constraint> ][ , ...n]
)
[COMMENT table_comment]
[PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
WITH (key1=val1, key2=val2, ...)
[ LIKE source_table [( <like_options> )] | AS select_query ]
<physical_column_definition>:
column_name column_type [ <column_constraint> ] [COMMENT column_comment]
<column_constraint>:
[CONSTRAINT constraint_name] PRIMARY KEY NOT ENFORCED
<table_constraint>:
[CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED
<metadata_column_definition>:
column_name column_type METADATA [ FROM metadata_key ] [ VIRTUAL ]
<computed_column_definition>:
column_name AS computed_column_expression [COMMENT column_comment]
<watermark_definition>:
WATERMARK FOR rowtime_column_name AS watermark_strategy_expression
<source_table>:
[catalog_name.][db_name.]table_name
<like_options>:
{
{ INCLUDING | EXCLUDING } { ALL | CONSTRAINTS | PARTITIONS }
| { INCLUDING | EXCLUDING | OVERWRITING } { GENERATED | OPTIONS | WATERMARKS }
}[, ...]
PiflowX組件flink table實現(xiàn)
在了解了flink sql的定義后,一切便簡單多了,那么,我們只需要根據(jù)業(yè)務(wù)需要,設(shè)計出一個表單輸入,填寫我們的業(yè)務(wù)參數(shù),然后,由框架自動生成sql不就可以了么。
以下介紹如何配置一個mysqlcdc組件:
1.首先從組件列表中拖入一個MysqlCdc組件到畫布中,點擊節(jié)點,右側(cè)會顯示出節(jié)點參數(shù)表單區(qū)域和參數(shù)說明和示例。參數(shù)解釋可以查看之前的文章(PiflowX-MysqlCdc組件-CSDN博客)。
2.填寫相關(guān)參數(shù),其實就是在定義flink table中的with屬性。
在屬性輸入框中,點擊預覽可以實時查看生成的flink sql。
生成的flink sql 語句僅供參考,最終執(zhí)行的語句會在引擎執(zhí)行側(cè)生成。
3.接下來我們可以根據(jù)需要來定義flink table結(jié)構(gòu),此步驟和其他步驟沒有先后順序。點擊表單屬性tableDefinition
,在此表單中我們可以輸入flink table中的結(jié)構(gòu)屬性定義。
可以看到,我們可以在此定義flink table中的表基本信息,物理列,元數(shù)據(jù)列,計算列,水印等,具體說明在此就不贅述了,以后會有具體文章來說明??纯醋罱K的效果:
至此,我們通過簡單的表單填寫,便可開發(fā)一個flink任務(wù),最后,點擊運行,系統(tǒng)便可自動提交到flink環(huán)境,并可實時查看運行日志,是不是很方便快捷!
當然,目前系統(tǒng)處于初期研發(fā)階段,還有很多不完善的地方,敬請諒解。最后,我們來看一個簡單的實例,如果通過PiflowX開發(fā)一個mysql cdc實時同步和flink讀取doris的任務(wù)。
PiflowX-Droris讀寫組件文章來源:http://www.zghlxwxcb.cn/news/detail-795462.html
PiflowX-MysqlCdc組件文章來源地址http://www.zghlxwxcb.cn/news/detail-795462.html
到了這里,關(guān)于PiflowX如何快速開發(fā)flink程序的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!