国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

【入門(mén)Flink】- 02Flink經(jīng)典案例-WordCount

這篇具有很好參考價(jià)值的文章主要介紹了【入門(mén)Flink】- 02Flink經(jīng)典案例-WordCount。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

WordCount

需求:統(tǒng)計(jì)一段文字中,每個(gè)單詞出現(xiàn)的頻次

添加依賴

	<properties>
        <flink.version>1.17.0</flink.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>

1.批處理

基本思路:先逐行讀入文件數(shù)據(jù),然后將每一行文字拆分成單詞;接著按照單詞分組,統(tǒng)計(jì)每組數(shù)據(jù)的個(gè)數(shù)。

1.1.數(shù)據(jù)準(zhǔn)備

resources目錄下新建一個(gè) input 文件夾,并在下面創(chuàng)建文本文件words.txt

words.txt

hello flink
hello world
hello java

1.2.代碼編寫(xiě)

public class BatchWordCount {
    public static void main(String[] args) throws Exception {
        // 1. 創(chuàng)建執(zhí)行環(huán)境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        // 2. 從文件讀取數(shù)據(jù) 按行讀取(存儲(chǔ)的元素就是每行的文本)
        String filePath = Objects.requireNonNull(
               BatchWordCount.class.getClassLoader().getResource("input/words.txt")).getPath();
        DataSource<String> lineDS = env.readTextFile(filePath);

        // 3. 轉(zhuǎn)換數(shù)據(jù)格式
        FlatMapOperator<String, Tuple2<String, Long>> wordAndOne = lineDS.flatMap(
                new FlatMapFunction<String, Tuple2<String, Long>>() {
                    @Override
                    public void flatMap(String line, Collector<Tuple2<String, Long>> out) {
                        String[] words = line.split(" ");
                        for (String word : words) {
                            out.collect(Tuple2.of(word, 1L));
                        }
                    }
                });

        // 4. 按照 word 進(jìn)行分組
        UnsortedGrouping<Tuple2<String, Long>> wordAndOneUG = wordAndOne.groupBy(0);

        // 5. 分組內(nèi)聚合統(tǒng)計(jì)
        AggregateOperator<Tuple2<String, Long>> sum = wordAndOneUG.sum(1);

        // 6. 打印結(jié)果
        sum.print();
    }
}

打印結(jié)果如下:(結(jié)果正確)

【入門(mén)Flink】- 02Flink經(jīng)典案例-WordCount,flink,flink,大數(shù)據(jù)

上述代碼是基于 DataSet API 的,也就是對(duì)數(shù)據(jù)的處理轉(zhuǎn)換,是看作數(shù)據(jù)集來(lái)進(jìn)行操作的。

事實(shí)上 Flink 本身是流批統(tǒng)一的處理架構(gòu),批量的數(shù)據(jù)集本質(zhì)上也是流,沒(méi)有必要用兩套不同的 API 來(lái)實(shí)現(xiàn)。從Flink 1.12 開(kāi)始,官方推薦的做法是直接使用 DataStream API,在提交任務(wù)時(shí)通過(guò)將執(zhí)行模式設(shè)為BATCH來(lái)進(jìn)行批處理:

bin/flink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar

2.流處理

DataStreamAPI可以直接處理批處理和流處理的所有場(chǎng)景

2.1讀取文件

還是上述words.txt文件

代碼實(shí)現(xiàn):

public class StreamWordCount {
    public static void main(String[] args) throws Exception {
        // 1. 創(chuàng)建流式執(zhí)行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2.讀取文件
        String filePath = Objects.requireNonNull(
                StreamWordCount.class.getClassLoader().getResource("input/words.txt")).getPath();
        DataStreamSource<String> lineStream = env.readTextFile(filePath);

        // 3. 轉(zhuǎn)換、分組、求和,得到統(tǒng)計(jì)結(jié)果
        SingleOutputStreamOperator<Tuple2<String, Long>> sum = lineStream.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
                    @Override
                    public void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception {
                        String[] words = line.split(" ");
                        for (String word : words) {
                            out.collect(Tuple2.of(word, 1L));
                        }
                    }
                }).keyBy(data -> data.f0)
                .sum(1);

        // 4. 打印
        sum.print();
        // 5. 執(zhí)行
        env.execute();
    }
}

與批處理程序BatchWordCount有幾點(diǎn)不同:

  • 創(chuàng)建執(zhí)行環(huán)境的不同,流處理程序使用的是 StreamExecutionEnvironment
  • 轉(zhuǎn)換處理之后,得到的數(shù)據(jù)對(duì)象類型不同。
  • 分組操做調(diào)用的是 keyBy 方法,可以傳入一個(gè)匿名函數(shù)作為鍵選擇器(KeySelector),指定當(dāng)前分組的key。
  • 最后執(zhí)行execute方法,開(kāi)始執(zhí)行任務(wù)。

2.2讀取Socket文件流

實(shí)際生產(chǎn)中,真正的數(shù)據(jù)多是無(wú)界的,需要持續(xù)地捕獲數(shù)據(jù)。為了模擬這種場(chǎng)景,可以監(jiān)聽(tīng) socket 端口,然后向該端口不斷的發(fā)送數(shù)據(jù)。

  1. 簡(jiǎn)單改動(dòng),只需將StreamWordCount 代碼中讀取文件數(shù)據(jù)的 readTextFile 方法,替換成讀取socket文本流的方法socketTextStream。
public class StreamSocketWordCount {
    public static void main(String[] args) throws Exception {
        // 1. 創(chuàng)建流式執(zhí)行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2.讀取文件
        DataStreamSource<String> lineStream = env.socketTextStream("124.222.253.33", 7777);

        // 3. 轉(zhuǎn)換、分組、求和,得到統(tǒng)計(jì)結(jié)果
        SingleOutputStreamOperator<Tuple2<String, Long>> sum = lineStream.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
                    @Override
                    public void flatMap(String line, Collector<Tuple2<String, Long>> out) throws Exception {
                        String[] words = line.split(" ");
                        for (String word : words) {
                            out.collect(Tuple2.of(word, 1L));
                        }
                    }
                }).keyBy(data -> data.f0)
                .sum(1);

        // 4. 打印
        sum.print();
        // 5. 執(zhí)行
        env.execute();
    }
}
  1. 在 Linux 環(huán)境的主機(jī) 124.222.253.33 上,執(zhí)行下列命令,發(fā)送數(shù)據(jù)進(jìn)行測(cè)試
nc -lk 7777

注意:要先啟動(dòng)端口,后啟動(dòng) StreamSocketWordCount 程序,否則會(huì)報(bào)超時(shí)連接異常。

  1. 從Linux發(fā)送數(shù)據(jù)

1、輸入“hello flink”,輸出如下內(nèi)容

【入門(mén)Flink】- 02Flink經(jīng)典案例-WordCount,flink,flink,大數(shù)據(jù)

2、再輸入“hello world”,輸出如下內(nèi)容

【入門(mén)Flink】- 02Flink經(jīng)典案例-WordCount,flink,flink,大數(shù)據(jù)

Flink 還具有一個(gè)類型提取系統(tǒng),可以分析函數(shù)的輸入和返回類型,自動(dòng)獲取類型信息,從而獲得對(duì)應(yīng)的序列化器和反序列化器。但是,由于 Java 中泛型擦除的存在,在某些特殊情況下(比如 Lambda 表達(dá)式中),自動(dòng)提取的信息是不夠精細(xì)的,對(duì)于 flatMap 里傳入的 Lambda 表達(dá)式,系統(tǒng)只能推斷出返回的是Tuple2類型,而無(wú)法得到 Tuple2<String, Long>。需要顯式地告訴系統(tǒng)當(dāng)前的返回類型,才能正確地解析出完整數(shù)據(jù)文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-741672.html

到了這里,關(guān)于【入門(mén)Flink】- 02Flink經(jīng)典案例-WordCount的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 【數(shù)據(jù)結(jié)構(gòu)與算法】三個(gè)經(jīng)典案例帶你了解動(dòng)態(tài)規(guī)劃

    【數(shù)據(jù)結(jié)構(gòu)與算法】三個(gè)經(jīng)典案例帶你了解動(dòng)態(tài)規(guī)劃

    從表中我們可以看到,最大的公共子串長(zhǎng)度為2,一共有兩個(gè)長(zhǎng)度為2的公共子串,分別是第一個(gè)字符串的第2個(gè)字符到第3個(gè)字符和第一個(gè)字符串的第3個(gè)字符到第4個(gè)字符,即 ba 和 ac 根據(jù)上面的方法,我們來(lái)用代碼封裝一下求取最大公共子串的函數(shù) function publicStr(s1, s2) { // 創(chuàng)建

    2024年04月09日
    瀏覽(20)
  • 第七篇【傳奇開(kāi)心果系列】Python微項(xiàng)目技術(shù)點(diǎn)案例示例:數(shù)據(jù)可視化界面圖形化經(jīng)典案例

    第七篇【傳奇開(kāi)心果系列】Python微項(xiàng)目技術(shù)點(diǎn)案例示例:數(shù)據(jù)可視化界面圖形化經(jīng)典案例

    在學(xué)校或培訓(xùn)班,教學(xué)管理頭緒繁雜,分析報(bào)告枯燥乏味。如果能編寫(xiě)一個(gè)程序?qū)崿F(xiàn)數(shù)據(jù)可視化,界面圖形化,那就可以讓數(shù)據(jù)形象直觀生動(dòng)起來(lái),變得有趣生動(dòng),而且有靈魂。于是我靈感頓悟就有了寫(xiě)一個(gè)數(shù)據(jù)可視化界面圖形化示例的想法。我打算使用Python的nicegui庫(kù)創(chuàng)建界

    2024年02月20日
    瀏覽(31)
  • 【機(jī)器學(xué)習(xí)】最經(jīng)典案例:房?jī)r(jià)預(yù)測(cè)(完整流程:數(shù)據(jù)分析及處理、模型選擇及微調(diào))

    【機(jī)器學(xué)習(xí)】最經(jīng)典案例:房?jī)r(jià)預(yù)測(cè)(完整流程:數(shù)據(jù)分析及處理、模型選擇及微調(diào))

    環(huán)境:anaconda+jupyter notebook 首先要明白一點(diǎn): 數(shù)據(jù)決定模型的上限!數(shù)據(jù)決定模型的上限!數(shù)據(jù)決定模型的上限! (重要的事情說(shuō)三遍。)對(duì)于數(shù)據(jù)的處理在一個(gè)完整案例中花費(fèi)精力的比重應(yīng)該占到一半以上。 以下分為:數(shù)據(jù)分析、數(shù)據(jù)清洗兩部分。 數(shù)據(jù)分析主要包括:查

    2024年02月05日
    瀏覽(25)
  • 第六篇【傳奇開(kāi)心果系列】Python的自動(dòng)化辦公庫(kù)技術(shù)點(diǎn)案例示例:大學(xué)生數(shù)據(jù)全方位分析挖掘經(jīng)典案例

    第六篇【傳奇開(kāi)心果系列】Python的自動(dòng)化辦公庫(kù)技術(shù)點(diǎn)案例示例:大學(xué)生數(shù)據(jù)全方位分析挖掘經(jīng)典案例

    Pandas在大學(xué)生數(shù)據(jù)的分析和挖掘中發(fā)揮著重要作用,幫助研究人員和教育工作者更好地理解大學(xué)生群體、優(yōu)化教學(xué)管理和提升教育質(zhì)量。 Pandas庫(kù)可以用來(lái)分析挖掘大學(xué)生數(shù)據(jù)的各各方面,包括但不限于: 學(xué)生成績(jī)數(shù)據(jù):可以通過(guò)Pandas對(duì)大學(xué)生的成績(jī)數(shù)據(jù)進(jìn)行統(tǒng)計(jì)分析、可視

    2024年03月15日
    瀏覽(39)
  • Hadoop系統(tǒng)應(yīng)用之MapReduce相關(guān)操作【IDEA版】---經(jīng)典案例“倒排索引、數(shù)據(jù)去重、TopN”

    Hadoop系統(tǒng)應(yīng)用之MapReduce相關(guān)操作【IDEA版】---經(jīng)典案例“倒排索引、數(shù)據(jù)去重、TopN”

    ? 倒排索引是文檔檢索系統(tǒng)中最常用的數(shù)據(jù)結(jié)構(gòu),被廣泛應(yīng)用于全文搜索引擎。倒排索引主要用來(lái)存儲(chǔ)某個(gè)單詞(或詞組)在一組文檔中的存儲(chǔ)位置的映射,提供了可以根據(jù)內(nèi)容來(lái)查找文檔的方式,而不是根據(jù)文檔來(lái)確定內(nèi)容,因此稱為倒排索引(Inverted Index)。帶有倒排索引

    2024年02月07日
    瀏覽(23)
  • 多線程四大經(jīng)典案例

    多線程四大經(jīng)典案例

    本節(jié)內(nèi)容很重要, 希 望 大 家 可 以 好 好 看 看 , 一 起 加 油~ 1.單線模式 1.1餓漢模式 1.2懶漢模式 2.阻塞式隊(duì)列 2.1阻塞隊(duì)列是什么 2.2生產(chǎn)者消費(fèi)者模型 2.3標(biāo)準(zhǔn)庫(kù)中的阻塞隊(duì)列 2.4阻塞隊(duì)列的實(shí)現(xiàn) 3.定時(shí)器 3.1定時(shí)器是什么 3.2標(biāo)準(zhǔn)庫(kù)中的定時(shí)器 3.3實(shí)現(xiàn)定時(shí)器 4.線程池 4.1什么

    2023年04月27日
    瀏覽(21)
  • solidity經(jīng)典案例-----智能投票

    solidity經(jīng)典案例-----智能投票

    角色分析:包括主持人、選民 功能分析: 僅主持人能授權(quán)給每個(gè)選民1票,即每個(gè)參與投票的選民擁有1票投票權(quán)。 選民可以選擇將票數(shù)委托給其它選民,當(dāng)然,收委托的選民仍然可以將票數(shù)繼續(xù)委托給其它選民,即存在a—b–c–d,但是,一旦將票數(shù)委托給其它選民后,自己

    2024年01月16日
    瀏覽(22)
  • 路由器故障排錯(cuò)三大經(jīng)典案例

    對(duì)于網(wǎng)絡(luò)管理員來(lái)說(shuō),熟悉與掌握路由排錯(cuò)的思路和技巧是非常必要的。小編將通過(guò)三例典型的路由故障排錯(cuò)案例進(jìn)行分析。 案例1 不堪重負(fù),路由器外網(wǎng)口關(guān)閉 1、網(wǎng)絡(luò)環(huán)境 某單位使用的是Cisco路由器,租用電信30MB做本地接入和l0MB教育網(wǎng)雙線路上網(wǎng),兩年來(lái)網(wǎng)絡(luò)運(yùn)行穩(wěn)定,

    2024年02月05日
    瀏覽(20)
  • 經(jīng)典智能合約案例之發(fā)紅包

    經(jīng)典智能合約案例之發(fā)紅包

    角色分析:發(fā)紅包的人和搶紅包的人 功能分析: 發(fā)紅包:發(fā)紅包的功能,可以借助構(gòu)造函數(shù)實(shí)現(xiàn),核心是將ether打入合約; 搶紅包:搶紅包的功能,搶成功需要一些斷言判斷,核心操作是合約轉(zhuǎn)賬給搶紅包的人; 退還:當(dāng)紅包有剩余的時(shí)候,允許發(fā)紅包的人收回余額,可以

    2024年02月07日
    瀏覽(35)
  • 阿里后端開(kāi)發(fā):抽象建模經(jīng)典案例

    阿里后端開(kāi)發(fā):抽象建模經(jīng)典案例

    在互聯(lián)網(wǎng)行業(yè),軟件工程師面對(duì)的產(chǎn)品需求大都是以具象的現(xiàn)實(shí)世界事物概念來(lái)描述的,遵循的是人類世界的自然語(yǔ)言,而軟件世界里通行的則是機(jī)器語(yǔ)言,兩者間跨度太大,需要一座橋梁來(lái)聯(lián)通,抽象建模便是打造這座橋梁的關(guān)鍵?;诔橄蠼?,不斷地去粗取精,從現(xiàn)實(shí)

    2024年02月09日
    瀏覽(92)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包