国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

初探Flink的Java實現(xiàn)流處理和批處理

這篇具有很好參考價值的文章主要介紹了初探Flink的Java實現(xiàn)流處理和批處理。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

端午假期,夏日炎炎,溫度連續(xù)40度以上,在家學習Flink相關(guān)知識,記錄下來,方便備查。
開發(fā)工具:IntelliJ Idea
Flink版本:1.13.0
本次主要用Flink實現(xiàn)批處理(DataSet API) 和 流處理(DataStream API)簡單實現(xiàn)。

第一步、創(chuàng)建項目與添加依賴

1)新建項目
打開Idea,新建Maven項目,包和項目命名,點擊確定進入項目。
初探Flink的Java實現(xiàn)流處理和批處理
2)引入依賴
pom.xml文件中添加依賴,即Flink-java、flink-streaming、slf4j等, 可參考以下代碼。

<properties>
    <flink.version>1.13.0</flink.version>
    <java.version>1.8</java.version>
    <scala.binary.version>2.12</scala.binary.version>
    <slf4j.version>1.7.2</slf4j.version>
</properties>
<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
    </dependency>
    <!-- 日志-->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
        <version>${slf4j.version}</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>${slf4j.version}</version>
    </dependency>
    <dependency>
        <groupId>org.apache.logging.log4j</groupId>
        <artifactId>log4j-to-slf4j</artifactId>
        <version>2.16.0</version>
    </dependency>
</dependencies>

3)添加日志文件
resource目錄下添加日志文件log4j.properties,內(nèi)容如下所示。

log4j.rootLogger=error,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=@-4r [%t] %-5p %c %x - %m%n
第二步、構(gòu)造數(shù)據(jù)集

在項目下新建 input 文件夾,用于存放數(shù)據(jù)集,在其下新建 words.txt 文件,即測試的數(shù)據(jù)集,如下圖所示。
初探Flink的Java實現(xiàn)流處理和批處理

第三步、編寫業(yè)務代碼

讀取數(shù)據(jù)集中內(nèi)容,并進行單詞的字數(shù)統(tǒng)計。新建 BatchWordCout 類,引入分6個步驟實現(xiàn)數(shù)據(jù)集的讀取與打印。
方式一、批處理 DataSet API
主要處理步驟為
1)創(chuàng)建執(zhí)行環(huán)境;
2)從環(huán)境中讀取數(shù)據(jù);
3)將每行數(shù)據(jù)進行分詞,轉(zhuǎn)化成二元組類型 扁平映射;
4)按照word進行分組;
5)分組內(nèi)進行聚合統(tǒng)計;
6)打印結(jié)果
批處理 DataSet API 寫法如下所示。

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class BatchWordCount {
    public static void main(String[] args) throws Exception {
        //1、創(chuàng)建執(zhí)行環(huán)境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // 2、從環(huán)境中讀取數(shù)據(jù)
        DataSource<String> lineDataSource = env.readTextFile("input/words.txt");
        // 3、將每行數(shù)據(jù)進行分詞,轉(zhuǎn)化成二元組類型 扁平映射
        FlatMapOperator<String,Tuple2<String,Long>> wordAndOneTuple = lineDataSource.flatMap((String line, Collector<Tuple2<String,Long>> out) -> {
            // 將每行文本進行拆分
            String[] words = line.split(" ");
            // 將每個單詞轉(zhuǎn)化成二元組
            for(String word : words){
                out.collect(Tuple2.of(word,1L));
            }
        }).returns(Types.TUPLE(Types.STRING,Types.LONG));
        // 4、按照word進行分組
         UnsortedGrouping<Tuple2<String,Long>> wordAndOneGroup =  wordAndOneTuple.groupBy(0);
         // 5、分組內(nèi)進行聚合統(tǒng)計
        AggregateOperator<Tuple2<String, Long>> sum = wordAndOneGroup.sum(1);
        // 6、打印結(jié)果
        sum.print();

    }

控制臺打印效果如下圖所示。
初探Flink的Java實現(xiàn)流處理和批處理
在Flink 1.12 版本后,官方推薦做法是直接使用 DataSet API 即提交任務時將執(zhí)行模式更改為BATCH來進行批處理
$bin/flink run -Dexecution.runtime-mode=BATCH batchWordCount.jar

方式二、流處理 DataStream API
流處理的處理步驟與批處理流程類似,主要區(qū)別是執(zhí)行環(huán)境不一樣。

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class BatchSteamWordCount {
    public static void main(String[] args) throws Exception {
        // 1、創(chuàng)建流式執(zhí)行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 2、讀取文件
        DataStreamSource<String> lineDataStreamSource = env.readTextFile("input/words.txt");
        // 3、轉(zhuǎn)換計算
        SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOneTuple = lineDataStreamSource.flatMap((String line, Collector<Tuple2<String, Long>> out) -> {
            // 將每行文本進行拆分
            String[] words = line.split(" ");
            // 將每個單詞轉(zhuǎn)化成二元組
            for (String word : words) {
                out.collect(Tuple2.of(word, 1L));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));
        // 4、分組
        KeyedStream<Tuple2<String, Long>, Object> wordAndOneKeyedStream = wordAndOneTuple.keyBy(data -> data.f0);
        // 5、求和
        SingleOutputStreamOperator<Tuple2<String, Long>> sum = wordAndOneKeyedStream.sum(1);
        // 6、打印結(jié)果
        sum.print();
        // 7、啟動執(zhí)行
        env.execute();
    }
}

控制臺輸出結(jié)果如下圖所示。
初探Flink的Java實現(xiàn)流處理和批處理
從打印結(jié)果可以看出 多線程執(zhí)行,結(jié)果是無序;第一列數(shù)字與本地運行環(huán)境的CPU核數(shù)有關(guān);

參考文檔

【1】https://www.bilibili.com/video/BV133411s7Sa?p=9&vd_source=c8717efb4869aaa507d74b272c5d90be文章來源地址http://www.zghlxwxcb.cn/news/detail-499892.html

到了這里,關(guān)于初探Flink的Java實現(xiàn)流處理和批處理的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權(quán),不承擔相關(guān)法律責任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務器費用

相關(guān)文章

  • ‘java‘ 不是內(nèi)部或外部命令,也不是可運行的程序 或批處理文件

    ‘java‘ 不是內(nèi)部或外部命令,也不是可運行的程序 或批處理文件

    1.‘java’ 不是內(nèi)部或外部命令,也不是可運行的程序 或批處理文件。 這種情況一般來說是沒有配置環(huán)境變量或者是沒有配置好 (1)找到安裝java的位置 (2)進入控制面板==》系統(tǒng)與安全==》系統(tǒng)==》高級設置–》環(huán)境變量 2.開始配置 (1)系統(tǒng)變量中新建 變量名:JAVA_HOME 變

    2024年02月07日
    瀏覽(26)
  • React源碼解析18(11)------ 實現(xiàn)多次setState的批處理

    在React中,如果涉及到了多次setState,組件render幾次。setState是同步的還是異步的。這是一個很常見的面試題。 而本篇文章,就是主要實現(xiàn)React中,對于這部分的性能優(yōu)化,我們稱之為批處理。例如當我有下面的JSX。 對于當前的點擊事件來說,只有最后的setNum(num + 3)是有效的。

    2024年02月11日
    瀏覽(40)
  • Spring Boot + Spring Batch 實現(xiàn)批處理任務,保姆級教程?。▓鼍皩崙?zhàn))

    Spring Boot + Spring Batch 實現(xiàn)批處理任務,保姆級教程?。▓鼍皩崙?zhàn))

    來源:blog.csdn.net/qq_35387940/article/details/108193473 概念詞就不多說了,我簡單地介紹下 , spring batch 是一個 方便使用的 較健全的 批處理 框架。 為什么說是方便使用的,因為這是 基于spring的一個框架,接入簡單、易理解、流程分明。 為什么說是較健全的, 因為它提供了往常我

    2024年02月11日
    瀏覽(25)
  • 批處理命令大全 | Windows批處理教程 - ChatGPT

    批處理以.bat或.cmd文件的形式存在,在Windows命令提示符下運行,也可以通過雙擊批處理文件來運行。批處理文件由一系列命令組成,可以按照順序執(zhí)行,也可以根據(jù)條件或循環(huán)控制選擇性地執(zhí)行。 在Windows上創(chuàng)建一個批處理文件非常簡單,在編輯器中輸入一系列命令并保存為

    2024年02月04日
    瀏覽(29)
  • Windows批處理

    @ echo off :關(guān)閉命令的回顯功能,這樣在執(zhí)行腳本時不會顯示每條命令的具體執(zhí)行過程。建議將此行放在批處理腳本的首行。 rem :用于添加注釋,后面可以跟上注釋內(nèi)容。注釋的作用是對腳本進行說明或提醒,不會被執(zhí)行。 pause :暫停批處理的運行,直到用戶按下任意鍵才

    2024年02月07日
    瀏覽(56)
  • redis批處理優(yōu)化

    redis批處理優(yōu)化

    一個命令在網(wǎng)絡傳輸?shù)臅r間往往是遠大于在redis中執(zhí)行命令的時間的,如果每條命令都要逐條經(jīng)歷網(wǎng)絡傳輸,耗時將會大大增加,我們不妨將命令多量少次的傳輸給redis,這樣就大大減少了因為網(wǎng)絡傳輸時間,大大提高的效率 2.1.單機模式下的批處理 2.2.集群模式下的批處理 這

    2024年01月19日
    瀏覽(31)
  • 【bat】批處理腳本大全

    【bat】批處理腳本大全

    目錄 1.概述 2.變量 3.運算符 3.2.重定向運算符 3.3.多命名運算符 3.4.管道運算符 4.命令 4.1.基本命令 4.2.參數(shù)傳遞 4.3.查看腳本內(nèi)容 4.4.注釋 4.5.日期和時間 4.6.啟動腳本 4.7.調(diào)用其他bat 4.8.任務管理 4.8.1.任務列表查看 4.8.2.任務終止 4.9.文件夾 4.10.關(guān)機 4.11.環(huán)境變量 4.12.目錄 4.12.1

    2024年02月04日
    瀏覽(21)
  • BAT 批處理腳本教程

    第一節(jié) 常用批處理內(nèi)部命令簡介 批處理定義:顧名思義,批處理文件是將一系列命令按一定的順序集合為一個可執(zhí)行的文本文件,其擴展名為BAT或者CMD。這些命令統(tǒng)稱批處理命令。 小知識:可以在鍵盤上按下Ctrl+C組合鍵來強行終止一個批處理的執(zhí)行過程。 了解了大概意思后

    2024年02月02日
    瀏覽(29)
  • JDBC p4 批處理

    基本介紹: 當需要成批插入或者更新記錄時??梢圆捎肑ava的批量更新機制,這一機制允許多條語句一次性提交給數(shù)據(jù)庫批量處理。通常情況下比單獨提交處理更有效率。 JDBC的批量處理語句包括下面方法: addBatch():添加需要批量處理的SQL語句或參數(shù); executeBatch():執(zhí)行批量

    2024年02月15日
    瀏覽(20)
  • 大數(shù)據(jù)處理平臺的架構(gòu)演進:從批處理到實時流處理

    大數(shù)據(jù)處理平臺的架構(gòu)演進:從批處理到實時流處理

    ??個人主頁:程序員 小侯 ??CSDN新晉作者 ??歡迎 ??點贊?評論?收藏 ?收錄專欄:大數(shù)據(jù)系列 ?文章內(nèi)容:大數(shù)據(jù)框架演進 ??希望作者的文章能對你有所幫助,有不足的地方請在評論區(qū)留言指正,大家一起學習交流!?? 大數(shù)據(jù)處理平臺的架構(gòu)演進經(jīng)歷了從批處理到實

    2024年02月10日
    瀏覽(23)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包