国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

PiflowX如何快速開發(fā)flink程序

這篇具有很好參考價值的文章主要介紹了PiflowX如何快速開發(fā)flink程序。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

PiflowX如何快速開發(fā)flink程序

參考資料

Flink最鋒利的武器:Flink SQL入門和實戰(zhàn) | 附完整實現(xiàn)代碼-騰訊云開發(fā)者社區(qū)-騰訊云 (tencent.com)

Flink SQL 背景

Flink SQL 是 Flink 實時計算為簡化計算模型,降低用戶使用實時計算門檻而設(shè)計的一套符合標準 SQL 語義的開發(fā)語言。

自 2015 年開始,阿里巴巴開始調(diào)研開源流計算引擎,最終決定基于 Flink 打造新一代計算引擎,針對 Flink 存在的不足進行優(yōu)化和改進,并且在 2019 年初將最終代碼開源,也就是我們熟知的 Blink。Blink 在原來的 Flink 基礎(chǔ)上最顯著的一個貢獻就是 Flink SQL 的實現(xiàn)。

Flink SQL 是面向用戶的 API 層,在我們傳統(tǒng)的流式計算領(lǐng)域,比如 Storm、Spark Streaming 都會提供一些 Function 或者 Datastream API,用戶通過 Java 或 Scala 寫業(yè)務(wù)邏輯,這種方式雖然靈活,但有一些不足,比如具備一定門檻且調(diào)優(yōu)較難,隨著版本的不斷更新,API 也出現(xiàn)了很多不兼容的地方。

PiflowX如何快速開發(fā)flink程序,PiflowX,flink,大數(shù)據(jù),spark,hadoop,big data

在這個背景下,毫無疑問,SQL 就成了我們最佳選擇,之所以選擇將 SQL 作為核心 API,是因為其具有幾個非常重要的特點:

  • SQL 屬于設(shè)定式語言,用戶只要表達清楚需求即可,不需要了解具體做法;
  • SQL 可優(yōu)化,內(nèi)置多種查詢優(yōu)化器,這些查詢優(yōu)化器可為 SQL 翻譯出最優(yōu)執(zhí)行計劃;
  • SQL 易于理解,不同行業(yè)和領(lǐng)域的人都懂,學習成本較低;
  • SQL 非常穩(wěn)定,在數(shù)據(jù)庫 30 多年的歷史中,SQL 本身變化較少;
  • 流與批的統(tǒng)一,F(xiàn)link 底層 Runtime 本身就是一個流與批統(tǒng)一的引擎,而 SQL 可以做到 API 層的流與批統(tǒng)一。
Flink SQL 常規(guī)實戰(zhàn)應(yīng)用

案例來自(Flink最鋒利的武器:Flink SQL入門和實戰(zhàn) | 附完整實現(xiàn)代碼-騰訊云開發(fā)者社區(qū)-騰訊云 (tencent.com))!詳細流程有興趣可以參考原文示例。(如有侵犯,請請聯(lián)系!)。

在此,簡單總結(jié)一下flink sql的開發(fā)流程:

1.首先需要創(chuàng)建maven工程,確認需要的各種依賴,運氣好的話,還需要花費大量的精力和時間去排查依賴沖突的問題(oh God bless me!);

2.開始balabala編寫模板代碼,如:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tableEnv = BatchTableEnvironment.getTableEnvironment(env);

3.數(shù)據(jù)準備和預處理;

 DataSet<String> input = env.readTextFile("score.csv");
        DataSet<PlayerData> topInput = input.map(new MapFunction<String, PlayerData>() {
            @Override
            public PlayerData map(String s) throws Exception {
                String[] split = s.split(",");
                return new PlayerData(String.valueOf(split[0]),
                        String.valueOf(split[1]),
                        String.valueOf(split[2]),
                        Integer.valueOf(split[3]),
                        Double.valueOf(split[4]),
                        Double.valueOf(split[5]),
                        Double.valueOf(split[6]),
                        Double.valueOf(split[7]),
                        Double.valueOf(split[8])
                );
            }
        });
其中的PlayerData類為自定義類:
public static class PlayerData {
        /**
         * 賽季,球員,出場,首發(fā),時間,助攻,搶斷,蓋帽,得分
         */
        public String season;
        public String player;
        public String play_num;
        public Integer first_court;
        public Double time;
        public Double assists;
        public Double steals;
        public Double blocks;
        public Double scores;

        public PlayerData() {
            super();
        }

        public PlayerData(String season,
                          String player,
                          String play_num,
                          Integer first_court,
                          Double time,
                          Double assists,
                          Double steals,
                          Double blocks,
                          Double scores
                          ) {
            this.season = season;
            this.player = player;
            this.play_num = play_num;
            this.first_court = first_court;
            this.time = time;
            this.assists = assists;
            this.steals = steals;
            this.blocks = blocks;
            this.scores = scores;
        }
    }

4.終于到了真正的業(yè)務(wù)處理了,有了flink sql的強大和方便,倒是省了不少代碼;

Table queryResult = tableEnv.sqlQuery("
select player, 
       count(season) as num 
    FROM score 
    GROUP BY player 
    ORDER BY num desc 
    LIMIT 3
");

5.ok,到此,數(shù)據(jù)處理和計算邏輯完畢,處理結(jié)果寫入到sink,可以完結(jié)散花咯,哈哈;

DataSet<Result> result = tableEnv.toDataSet(queryResult, Result.class);
result.print();

6.哦!好像還需要調(diào)試運行,好吧,再辛苦一會,便可大功告成!
PiflowX如何快速開發(fā)flink程序,PiflowX,flink,大數(shù)據(jù),spark,hadoop,big data

PiflowX如何快速開發(fā)flink程序,PiflowX,flink,大數(shù)據(jù),spark,hadoop,big data

7.完美,上線。。。。。。
PiflowX如何快速開發(fā)flink程序,PiflowX,flink,大數(shù)據(jù),spark,hadoop,big data

(以上,純屬娛樂,如有不當,敬請諒解?。?/p>

可見,在平日開發(fā)一個flink任務(wù)雖已盡可能簡單,但開發(fā)周期也得1-2個工作日,甚至更長,有沒有簡單粗暴的,讓我分分鐘領(lǐng)盒飯,不,讓我分分鐘高效完成任務(wù)的!

PiflowX如何快速開發(fā)flink程序,PiflowX,flink,大數(shù)據(jù),spark,hadoop,big data

PiflowX如何快速開發(fā)flink程序,PiflowX,flink,大數(shù)據(jù),spark,hadoop,big data

當然有啦?。?!接下來讓我隆重的介紹一下今天的主角—PilfowX—大數(shù)據(jù)流水線系統(tǒng)。有興趣可以查看之前的文章(StreamPark + PiflowX 打造新一代大數(shù)據(jù)計算處理平臺-CSDN博客)。

PiflowX如何快速開發(fā)flink程序,PiflowX,flink,大數(shù)據(jù),spark,hadoop,big data

PiflowX是基于Piflow和StreamPark二開實現(xiàn)的,在其基礎(chǔ)上,實現(xiàn)了圖像化拖拉拽的方式開發(fā)spark或flink作業(yè),這里我將介紹flink任務(wù)的開發(fā)流程,以及如何零代碼實現(xiàn)flink sql的開發(fā)。

PiflowX的flink組件算子基本都是基于flink table和sql實現(xiàn)的,我們只需在UI界面填寫組件相關(guān)參數(shù),之后的工作交給底層框架即可。

PiflowX如何快速開發(fā)flink程序,PiflowX,flink,大數(shù)據(jù),spark,hadoop,big data

我們回顧一下flink sql語法定義。

Flink SQL 的語法和算子

Flink SQL 核心算子的語義設(shè)計參考了 1992、2011 等 ANSI-SQL 標準,F(xiàn)link 使用 Apache Calcite 解析 SQL ,Calcite 支持標準的 ANSI SQL。

CREATE TABLE [IF NOT EXISTS] [catalog_name.][db_name.]table_name
  (
    { <physical_column_definition> | <metadata_column_definition> | <computed_column_definition> }[ , ...n]
    [ <watermark_definition> ]
    [ <table_constraint> ][ , ...n]
  )
  [COMMENT table_comment]
  [PARTITIONED BY (partition_column_name1, partition_column_name2, ...)]
  WITH (key1=val1, key2=val2, ...)
  [ LIKE source_table [( <like_options> )] | AS select_query ]

<physical_column_definition>:
  column_name column_type [ <column_constraint> ] [COMMENT column_comment]

<column_constraint>:
  [CONSTRAINT constraint_name] PRIMARY KEY NOT ENFORCED

<table_constraint>:
  [CONSTRAINT constraint_name] PRIMARY KEY (column_name, ...) NOT ENFORCED

<metadata_column_definition>:
  column_name column_type METADATA [ FROM metadata_key ] [ VIRTUAL ]

<computed_column_definition>:
  column_name AS computed_column_expression [COMMENT column_comment]

<watermark_definition>:
  WATERMARK FOR rowtime_column_name AS watermark_strategy_expression

<source_table>:
  [catalog_name.][db_name.]table_name

<like_options>:
{
   { INCLUDING | EXCLUDING } { ALL | CONSTRAINTS | PARTITIONS }
 | { INCLUDING | EXCLUDING | OVERWRITING } { GENERATED | OPTIONS | WATERMARKS } 
}[, ...]
PiflowX組件flink table實現(xiàn)

在了解了flink sql的定義后,一切便簡單多了,那么,我們只需要根據(jù)業(yè)務(wù)需要,設(shè)計出一個表單輸入,填寫我們的業(yè)務(wù)參數(shù),然后,由框架自動生成sql不就可以了么。

以下介紹如何配置一個mysqlcdc組件:

1.首先從組件列表中拖入一個MysqlCdc組件到畫布中,點擊節(jié)點,右側(cè)會顯示出節(jié)點參數(shù)表單區(qū)域和參數(shù)說明和示例。參數(shù)解釋可以查看之前的文章(PiflowX-MysqlCdc組件-CSDN博客)。

PiflowX如何快速開發(fā)flink程序,PiflowX,flink,大數(shù)據(jù),spark,hadoop,big data

PiflowX如何快速開發(fā)flink程序,PiflowX,flink,大數(shù)據(jù),spark,hadoop,big data

2.填寫相關(guān)參數(shù),其實就是在定義flink table中的with屬性。

在屬性輸入框中,點擊預覽可以實時查看生成的flink sql。

PiflowX如何快速開發(fā)flink程序,PiflowX,flink,大數(shù)據(jù),spark,hadoop,big data
PiflowX如何快速開發(fā)flink程序,PiflowX,flink,大數(shù)據(jù),spark,hadoop,big data

生成的flink sql 語句僅供參考,最終執(zhí)行的語句會在引擎執(zhí)行側(cè)生成。
PiflowX如何快速開發(fā)flink程序,PiflowX,flink,大數(shù)據(jù),spark,hadoop,big data

3.接下來我們可以根據(jù)需要來定義flink table結(jié)構(gòu),此步驟和其他步驟沒有先后順序。點擊表單屬性tableDefinition,在此表單中我們可以輸入flink table中的結(jié)構(gòu)屬性定義。

PiflowX如何快速開發(fā)flink程序,PiflowX,flink,大數(shù)據(jù),spark,hadoop,big data

PiflowX如何快速開發(fā)flink程序,PiflowX,flink,大數(shù)據(jù),spark,hadoop,big data

PiflowX如何快速開發(fā)flink程序,PiflowX,flink,大數(shù)據(jù),spark,hadoop,big data

PiflowX如何快速開發(fā)flink程序,PiflowX,flink,大數(shù)據(jù),spark,hadoop,big data

PiflowX如何快速開發(fā)flink程序,PiflowX,flink,大數(shù)據(jù),spark,hadoop,big data

可以看到,我們可以在此定義flink table中的表基本信息,物理列,元數(shù)據(jù)列,計算列,水印等,具體說明在此就不贅述了,以后會有具體文章來說明??纯醋罱K的效果:

PiflowX如何快速開發(fā)flink程序,PiflowX,flink,大數(shù)據(jù),spark,hadoop,big data

至此,我們通過簡單的表單填寫,便可開發(fā)一個flink任務(wù),最后,點擊運行,系統(tǒng)便可自動提交到flink環(huán)境,并可實時查看運行日志,是不是很方便快捷!

當然,目前系統(tǒng)處于初期研發(fā)階段,還有很多不完善的地方,敬請諒解。最后,我們來看一個簡單的實例,如果通過PiflowX開發(fā)一個mysql cdc實時同步和flink讀取doris的任務(wù)。

PiflowX-Droris讀寫組件

PiflowX-MysqlCdc組件文章來源地址http://www.zghlxwxcb.cn/news/detail-795462.html

到了這里,關(guān)于PiflowX如何快速開發(fā)flink程序的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔相關(guān)法律責任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • 處理大數(shù)據(jù)的基礎(chǔ)架構(gòu),OLTP和OLAP的區(qū)別,數(shù)據(jù)庫與Hadoop、Spark、Hive和Flink大數(shù)據(jù)技術(shù)

    處理大數(shù)據(jù)的基礎(chǔ)架構(gòu),OLTP和OLAP的區(qū)別,數(shù)據(jù)庫與Hadoop、Spark、Hive和Flink大數(shù)據(jù)技術(shù)

    2022找工作是學歷、能力和運氣的超強結(jié)合體,遇到寒冬,大廠不招人,可能很多算法學生都得去找開發(fā),測開 測開的話,你就得學數(shù)據(jù)庫,sql,oracle,尤其sql要學,當然,像很多金融企業(yè)、安全機構(gòu)啥的,他們必須要用oracle數(shù)據(jù)庫 這oracle比sql安全,強大多了,所以你需要學

    2024年02月08日
    瀏覽(33)
  • Hadoop、Spark、Storm、Flink區(qū)別及選擇

    Hadoop、Spark、Storm、Flink區(qū)別及選擇

    hadoop和spark是更偏向于對大量離線數(shù)據(jù)進行批量計算,提高計算速度 storm和flink適用于實時在線數(shù)據(jù),即針對源源不斷產(chǎn)生的數(shù)據(jù)進行實時處理。至于storm和flink之間的區(qū)別在于flink的實時性和吞吐量等要比storm高。 上述四個組件的實時性高低順序如下: hadoop spark storm flink hdf

    2024年02月08日
    瀏覽(18)
  • Zookeeper+Hadoop+Spark+Flink+Kafka+Hbase+Hive

    Zookeeper+Hadoop+Spark+Flink+Kafka+Hbase+Hive 完全分布式高可用集群搭建 下載 https://archive.apache.org/dist/ ?Mysql下載地址 Index of /MySQL/Downloads/ 我最終選擇 Zookeeper3.7.1 +Hadoop3.3.5 + Spark-3.2.4 + Flink-1.16.1 + Kafka2.12-3.4.0 + HBase2.4.17 + Hive3.1.3 ?+JDK1.8.0_391 ?IP規(guī)劃 IP hostname 192.168.1.5 node1 192.168.1.6 node

    2024年01月23日
    瀏覽(31)
  • Hadoop、Spark與Flink的基礎(chǔ)架構(gòu)及其關(guān)系和優(yōu)異

    Hadoop、Spark和Flink是目前重要的三大分布式計算系統(tǒng)。它們都可以用于大數(shù)據(jù)處理,但在處理方式和應(yīng)用場景上有所不同。 Hadoop專為批處理而生,一次將大量數(shù)據(jù)集輸入到輸入中,進行處理并產(chǎn)生結(jié)果。它用于離線復雜的大數(shù)據(jù)處理。 Spark定義是一個批處理系統(tǒng),但也支持流

    2024年02月11日
    瀏覽(26)
  • 數(shù)據(jù)存儲和分布式計算的實際應(yīng)用:如何使用Spark和Flink進行數(shù)據(jù)處理和分析

    作為一名人工智能專家,程序員和軟件架構(gòu)師,我經(jīng)常涉及到數(shù)據(jù)處理和分析。在當前大數(shù)據(jù)和云計算的時代,分布式計算已經(jīng)成為了一個重要的技術(shù)方向。Spark和Flink是當前比較流行的分布式計算框架,它們提供了強大的分布式計算和數(shù)據(jù)分析功能,為數(shù)據(jù)處理和分析提供了

    2024年02月16日
    瀏覽(92)
  • Zookeeper+Hadoop+Spark+Flink+Kafka+Hbase+Hive 完全分布式高可用集群搭建(保姆級超詳細含圖文)

    Zookeeper+Hadoop+Spark+Flink+Kafka+Hbase+Hive 完全分布式高可用集群搭建(保姆級超詳細含圖文)

    說明: 本篇將詳細介紹用二進制安裝包部署hadoop等組件,注意事項,各組件的使用,常用的一些命令,以及在部署中遇到的問題解決思路等等,都將詳細介紹。 ip hostname 192.168.1.11 node1 192.168.1.12 node2 192.168.1.13 node3 1.2.1系統(tǒng)版本 1.2.2內(nèi)存建議最少4g、2cpu、50G以上的磁盤容量 本次

    2024年02月12日
    瀏覽(38)
  • 林子雨 VirtualBox + Ubuntu[linux] 配置 java、hadoop、Spark[python]、pyspark快速配置流程

    林子雨 VirtualBox + Ubuntu[linux] 配置 java、hadoop、Spark[python]、pyspark快速配置流程

    按照步驟快速執(zhí)行shell,最快速配置。 讀者可以根據(jù)該篇隨記快速回顧流程,以及用到的shell指令和相關(guān)配置文件。 是林老師教程的精簡版,初次配置者只能作為流程參考,主要和林子雨Spark[python]版課程配套。 ?林老師廈大實驗指南鏈接如下: Spark編程基礎(chǔ)(Python版)教材官

    2024年04月12日
    瀏覽(25)
  • Linux多虛擬機集群化配置詳解(Zookeeper集群、Kafka集群、Hadoop集群、HBase集群、Spark集群、Flink集群、Zabbix、Grafana部署)

    Linux多虛擬機集群化配置詳解(Zookeeper集群、Kafka集群、Hadoop集群、HBase集群、Spark集群、Flink集群、Zabbix、Grafana部署)

    前面安裝的軟件,都是以單機模式運行的,學習大數(shù)據(jù)相關(guān)的軟件部署,后續(xù)安裝軟件服務(wù),大多數(shù)都是以集群化(多臺服務(wù)器共同工作)模式運行的。所以,需要完成集群化環(huán)境的前置準備,包括創(chuàng)建多臺虛擬機,配置主機名映射,SSH免密登錄等等。 我們可以使用VMware提供

    2024年02月04日
    瀏覽(30)
  • Hadoop+Hive+Spark+Hbase開發(fā)環(huán)境練習

    Hadoop+Hive+Spark+Hbase開發(fā)環(huán)境練習

    1.練習一 1. 數(shù)據(jù)準備 在hdfs上創(chuàng)建文件夾,上傳csv文件 [root@kb129 ~]# hdfs dfs -mkdir -p /app/data/exam 查看csv文件行數(shù) [root@kb129 ~]# hdfs dfs -cat /app/data/exam/meituan_waimai_meishi.csv | wc -l 2. 分別使用 RDD和 Spark SQL 完成以下分析(不用考慮數(shù)據(jù)去重) 開啟spark shell [root@kb129 ~]# spark-shell (1)加載

    2024年02月03日
    瀏覽(26)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包