国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

【跟小嘉學(xué) Apache Flink】二、Flink 快速上手

這篇具有很好參考價(jià)值的文章主要介紹了【跟小嘉學(xué) Apache Flink】二、Flink 快速上手。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

系列文章目錄

【跟小嘉學(xué) Apache Flink】一、Apache Flink 介紹
【跟小嘉學(xué) Apache Flink】二、Flink 快速上手

一、創(chuàng)建工程

1.1、創(chuàng)建 Maven 工程

創(chuàng)建 maven 工程 并且添加如下依賴

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.xiaojia</groupId>
    <artifactId>flinkdemo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <properties>
        <maven.compiler.source>19</maven.compiler.source>
        <maven.compiler.target>19</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>1.17.1</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>1.17.1</version>
        </dependency>


    </dependencies>

</project>

1.2、log4j 配置

在 resource 目錄下創(chuàng)建 log4j.properties 文件,寫入如下內(nèi)容

log4j.rootLogger=error,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n

二、批處理單詞統(tǒng)計(jì)(DataSet API)

2.1、創(chuàng)建 BatchWordCount 類型

package org.xiaojia.demo.wc;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.AggregateOperator;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

public class BatchWordCount {
    public static void main(String[] args) {
        // 1、創(chuàng)建執(zhí)行環(huán)境
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();

        // 2、 從文件讀取數(shù)據(jù)
        DataSource<String> lineDataSource = executionEnvironment.readTextFile("input/words.txt");

        // 3、將每一行數(shù)據(jù)進(jìn)行分詞,轉(zhuǎn)換為二元組類型
        FlatMapOperator<String, Tuple2<String, Long>>  wordAndOneTuple = lineDataSource.flatMap((String line, Collector<Tuple2<String, Long>> collector) -> {
            String[] words = line.split(" ");
            for (String word : words) {
                collector.collect(Tuple2.of(word, 1L));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));

        // 4、按照 word 進(jìn)行分組
        UnsortedGrouping<Tuple2<String, Long>> wordAndOneGroup = wordAndOneTuple.groupBy(0);

        // 5、分組內(nèi)進(jìn)行聚合統(tǒng)計(jì)
        AggregateOperator<Tuple2<String, Long>> sum = wordAndOneGroup.sum(1);

        try {
            sum.print();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }
}

2.4、運(yùn)行結(jié)果

【跟小嘉學(xué) Apache Flink】二、Flink 快速上手,跟小嘉學(xué)Apache Flink,apache,flink,大數(shù)據(jù),數(shù)據(jù)倉(cāng)庫(kù)
實(shí)際上在 Flink 里面已經(jīng)做到流批處理統(tǒng)一,官方推薦使用 DateStream API,在跳任務(wù)時(shí)通過(guò)執(zhí)行模式設(shè)置為 Batch 來(lái)進(jìn)行批處理

bin/fliink run -Dexecution.runtime-mode=BATCH BatchWordCount.jar

三、流處理單詞統(tǒng)計(jì)(DataSet API)

使用 DataSet API可以很容易實(shí)現(xiàn)批處理。對(duì)于Flink而言,流處理才是處理邏輯的底層核心,所以流批統(tǒng)一之后的 DataStream API 更加強(qiáng)大,可以直接處理批處理和流處理的所有場(chǎng)景。

在 Flink 的視角里,一切數(shù)據(jù)都可以認(rèn)為是流,流數(shù)據(jù)是無(wú)界流,而批數(shù)據(jù)是有界流。所以批處理,其實(shí)可以看作是有界流的處理。

3.1、讀取文件流

3.1.1、過(guò)時(shí)的寫法

package org.xiaojia.demo.wc.stream;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class BoundStreamWordCount {
    public static void main(String[] args) throws Exception {
        // 1、創(chuàng)建流式的執(zhí)行環(huán)境
        StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2、讀取文件
        DataStreamSource<String> lineDataStreamSource = streamExecutionEnvironment.readTextFile("input/words.txt");

        // 3、將每一行數(shù)據(jù)進(jìn)行分詞,轉(zhuǎn)換為二元組類型
        SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOneTuple = lineDataStreamSource.flatMap((String line, Collector<Tuple2<String, Long>> collector) -> {
            String[] words = line.split(" ");
            for (String word : words) {
                collector.collect(Tuple2.of(word, 1L));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));

        // 4、按照 word 進(jìn)行分組
        KeyedStream<Tuple2<String, Long>, String> wordAndOneKeyStream = wordAndOneTuple.keyBy((data) -> data.f0);

        // 5、求和
        SingleOutputStreamOperator<Tuple2<String, Long>> sum = wordAndOneKeyStream.sum(1);

        // 6、打印
        sum.print();

        // 7、執(zhí)行等待
        streamExecutionEnvironment.execute();

    }
}

3.1.2、執(zhí)行錯(cuò)誤的處理

Unable to make field private final byte[] java.lang.String.value accessible: module java.base does not "opens java.lang" to unnamed module
    @7ce6a65d

如果出現(xiàn)上述類似錯(cuò)誤,解決方案,通過(guò)添加 VM參數(shù)打開(kāi)對(duì)應(yīng)模塊的對(duì)應(yīng)模塊包

--add-opens java.base/java.lang=ALL-UNNAMED 
--add-opens java.base/java.util=ALL-UNNAMED

【跟小嘉學(xué) Apache Flink】二、Flink 快速上手,跟小嘉學(xué)Apache Flink,apache,flink,大數(shù)據(jù),數(shù)據(jù)倉(cāng)庫(kù)

3.1.3、執(zhí)行結(jié)果

【跟小嘉學(xué) Apache Flink】二、Flink 快速上手,跟小嘉學(xué)Apache Flink,apache,flink,大數(shù)據(jù),數(shù)據(jù)倉(cāng)庫(kù)

3.1.4、readTextFile 過(guò)時(shí)問(wèn)題

【跟小嘉學(xué) Apache Flink】二、Flink 快速上手,跟小嘉學(xué)Apache Flink,apache,flink,大數(shù)據(jù),數(shù)據(jù)倉(cāng)庫(kù)
解決方案可以按照提示給出的 使用 FileSource(需要用到Flink的連接器)


<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-files</artifactId>
  <version>${flink.version}</version>
</dependency>

3.2、讀取 socket 網(wǎng)絡(luò)流

3.2.1、讀取socket 流代碼

package org.xiaojia.demo.wc.stream;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class StreamWordCount {
    public static void main(String[] args) throws Exception {
        // 1、創(chuàng)建流式的執(zhí)行環(huán)境
        StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2、讀取socket流
        String hostname = "127.0.0.1";
        int port = 8888;
        DataStreamSource<String> lineDataStreamSource = streamExecutionEnvironment.socketTextStream(hostname, port);

        // 3、將每一行數(shù)據(jù)進(jìn)行分詞,轉(zhuǎn)換為二元組類型
        SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOneTuple = lineDataStreamSource.flatMap((String line, Collector<Tuple2<String, Long>> collector) -> {
            String[] words = line.split(" ");
            for (String word : words) {
                collector.collect(Tuple2.of(word, 1L));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));

        // 4、按照 word 進(jìn)行分組
        KeyedStream<Tuple2<String, Long>, String> wordAndOneKeyStream = wordAndOneTuple.keyBy((data) -> data.f0);

        // 5、求和
        SingleOutputStreamOperator<Tuple2<String, Long>> sum = wordAndOneKeyStream.sum(1);

        // 6、打印
        sum.print();

        // 7、執(zhí)行等待
        streamExecutionEnvironment.execute();
    }
}

3.2.2、使用 nc 監(jiān)聽(tīng)端口

(base) xiaojiadeMacBook-Pro:~ xiaojia$ nc -lk 8888
hello java
hello flink
hello world

3.2.3、執(zhí)行結(jié)果

【跟小嘉學(xué) Apache Flink】二、Flink 快速上手,跟小嘉學(xué)Apache Flink,apache,flink,大數(shù)據(jù),數(shù)據(jù)倉(cāng)庫(kù)
此時(shí),只要有數(shù)據(jù)進(jìn)來(lái),就會(huì)統(tǒng)計(jì)

3.2.4、從命令行參數(shù)獲取主機(jī)名和端口號(hào)

package org.xiaojia.demo.wc.stream;

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class StreamWordCount {
    public static void main(String[] args) throws Exception {
        // 1、創(chuàng)建流式的執(zhí)行環(huán)境
        StreamExecutionEnvironment streamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2、讀取socket流

        ParameterTool parameterTool = ParameterTool.fromArgs(args);

        String hostname = parameterTool.get("host");
        int port = parameterTool.getInt("port");
        DataStreamSource<String> lineDataStreamSource = streamExecutionEnvironment.socketTextStream(hostname, port);

        // 3、將每一行數(shù)據(jù)進(jìn)行分詞,轉(zhuǎn)換為二元組類型
        SingleOutputStreamOperator<Tuple2<String, Long>> wordAndOneTuple = lineDataStreamSource.flatMap((String line, Collector<Tuple2<String, Long>> collector) -> {
            String[] words = line.split(" ");
            for (String word : words) {
                collector.collect(Tuple2.of(word, 1L));
            }
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));

        // 4、按照 word 進(jìn)行分組
        KeyedStream<Tuple2<String, Long>, String> wordAndOneKeyStream = wordAndOneTuple.keyBy((data) -> data.f0);

        // 5、求和
        SingleOutputStreamOperator<Tuple2<String, Long>> sum = wordAndOneKeyStream.sum(1);

        // 6、打印
        sum.print();

        // 7、執(zhí)行等待
        streamExecutionEnvironment.execute();
    }
}

命令行參數(shù)傳遞
【跟小嘉學(xué) Apache Flink】二、Flink 快速上手,跟小嘉學(xué)Apache Flink,apache,flink,大數(shù)據(jù),數(shù)據(jù)倉(cāng)庫(kù)文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-707299.html

到了這里,關(guān)于【跟小嘉學(xué) Apache Flink】二、Flink 快速上手的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 【大數(shù)據(jù)-實(shí)時(shí)流計(jì)算】圖文詳解 Apache Flink 架構(gòu)原理

    目錄 Apache?Flink架構(gòu)介紹 一、Flink組件棧 二、Flink運(yùn)行時(shí)架構(gòu) 在Flink的整個(gè)

    2024年02月02日
    瀏覽(22)
  • 流數(shù)據(jù)湖平臺(tái)Apache Paimon(二)集成 Flink 引擎

    流數(shù)據(jù)湖平臺(tái)Apache Paimon(二)集成 Flink 引擎

    Paimon目前支持Flink 1.17, 1.16, 1.15 和 1.14。本課程使用Flink 1.17.0。 環(huán)境準(zhǔn)備 2.1.1 安裝 Flink 1)上傳并解壓Flink安裝包 tar -zxvf flink-1.17.0-bin-scala_2.12.tgz -C /opt/module/ 2)配置環(huán)境變量 2.1.2 上傳 jar 包 1)下載并上傳Paimon的jar包 jar包下載地址:https://repository.apache.org/snapshots/org/apache/pa

    2024年02月09日
    瀏覽(45)
  • 流數(shù)據(jù)湖平臺(tái)Apache Paimon(三)Flink進(jìn)階使用

    流數(shù)據(jù)湖平臺(tái)Apache Paimon(三)Flink進(jìn)階使用

    2.9.1 寫入性能 Paimon的寫入性能與檢查點(diǎn)密切相關(guān),因此需要更大的寫入吞吐量: 增加檢查點(diǎn)間隔,或者僅使用批處理模式。 增加寫入緩沖區(qū)大小。 啟用寫緩沖區(qū)溢出。 如果您使用固定存儲(chǔ)桶模式,請(qǐng)重新調(diào)整存儲(chǔ)桶數(shù)量。 2.9.1.1 并行度 建議sink的并行度小于等于bucket的數(shù)量

    2024年02月09日
    瀏覽(24)
  • 【大數(shù)據(jù)】深入淺出 Apache Flink:架構(gòu)、案例和優(yōu)勢(shì)

    【大數(shù)據(jù)】深入淺出 Apache Flink:架構(gòu)、案例和優(yōu)勢(shì)

    Apache Flink 是一個(gè)強(qiáng)大的開(kāi)源流處理框架,近年來(lái)在大數(shù)據(jù)社區(qū)大受歡迎。它允許用戶實(shí)時(shí)處理和分析大量流式數(shù)據(jù),使其成為 欺詐檢測(cè) 、 股市分析 和 機(jī)器學(xué)習(xí) 等現(xiàn)代應(yīng)用的理想選擇。 在本文中,我們將詳細(xì)介紹什么是 Apache Flink 以及如何使用它來(lái)為您的業(yè)務(wù)帶來(lái)益處。

    2024年01月17日
    瀏覽(25)
  • Apache Hudi初探(三)(與flink的結(jié)合)--flink寫hudi的操作(真正的寫數(shù)據(jù))

    在之前的文章中Apache Hudi初探(二)(與flink的結(jié)合)–flink寫hudi的操作(JobManager端的提交操作) 有說(shuō)到寫hudi數(shù)據(jù)會(huì)涉及到 寫hudi真實(shí)數(shù)據(jù) 以及 寫hudi元數(shù)據(jù) ,這篇文章來(lái)說(shuō)一下具體的實(shí)現(xiàn) 這里的操作就是在 HoodieFlinkWriteClient.upsert 方法: initTable 初始化HoodieFlinkTable preWrite 在這里幾乎沒(méi)

    2024年02月10日
    瀏覽(19)
  • 怎么使用 Flink 向 Apache Doris 表中寫 Bitmap 類型的數(shù)據(jù)

    Bitmap是一種經(jīng)典的數(shù)據(jù)結(jié)構(gòu),用于高效地對(duì)大量的二進(jìn)制數(shù)據(jù)進(jìn)行壓縮存儲(chǔ)和快速查詢。Doris支持bitmap數(shù)據(jù)類型,在Flink計(jì)算場(chǎng)景中,可以結(jié)合Flink doris Connector對(duì)bitmap數(shù)據(jù)做計(jì)算。 社區(qū)里很多小伙伴在是Doris Flink Connector的時(shí)候,不知道怎么寫B(tài)itmap類型的數(shù)據(jù),本文將介紹如何

    2024年02月07日
    瀏覽(18)
  • 使用 Flink CDC 實(shí)現(xiàn) MySQL 數(shù)據(jù),表結(jié)構(gòu)實(shí)時(shí)入 Apache Doris

    現(xiàn)有數(shù)據(jù)庫(kù):mysql 數(shù)據(jù):庫(kù)表較多,每個(gè)企業(yè)用戶一個(gè)分庫(kù),每個(gè)企業(yè)下的表均不同,無(wú)法做到聚合,且表可以被用戶隨意改動(dòng),增刪改列等,增加表 分析:用戶自定義分析,通過(guò)拖拽定義圖卡,要求實(shí)時(shí),點(diǎn)擊確認(rèn)即出現(xiàn)相應(yīng)結(jié)果,其中有無(wú)法預(yù)判的過(guò)濾 問(wèn)題:隨業(yè)務(wù)增長(zhǎng)

    2023年04月08日
    瀏覽(21)
  • Kudu與Apache Flink的集成:實(shí)時(shí)數(shù)據(jù)處理的新方法

    隨著數(shù)據(jù)的增長(zhǎng),實(shí)時(shí)數(shù)據(jù)處理變得越來(lái)越重要。傳統(tǒng)的批處理系統(tǒng)已經(jīng)不能滿足現(xiàn)在的需求。因此,實(shí)時(shí)數(shù)據(jù)處理技術(shù)逐漸成為了研究的熱點(diǎn)。Kudu和Apache Flink是兩個(gè)非常重要的實(shí)時(shí)數(shù)據(jù)處理系統(tǒng),它們各自具有獨(dú)特的優(yōu)勢(shì)。Kudu是一個(gè)高性能的列式存儲(chǔ)系統(tǒng),適用于實(shí)時(shí)數(shù)

    2024年02月21日
    瀏覽(24)
  • 重磅!flink-table-store將作為獨(dú)立數(shù)據(jù)湖項(xiàng)目重入apache

    重磅!flink-table-store將作為獨(dú)立數(shù)據(jù)湖項(xiàng)目重入apache

    數(shù)據(jù)湖是大數(shù)據(jù)近年來(lái)的網(wǎng)紅項(xiàng)目,大家熟知的開(kāi)源數(shù)據(jù)湖三劍客 Apache hudi、Apache iceberg 、Databricks delta 近年來(lái)野蠻生長(zhǎng),目前各自背后也都有商業(yè)公司支持,投入了大量的人力物力去做研發(fā)和宣傳。然而今天我們要講的是數(shù)據(jù)湖界的后起之秀 —— flink-table-store。 熟悉 Flin

    2024年02月08日
    瀏覽(15)
  • Apache Flink 和 Apache Kafka 兩者之間的集成架構(gòu) Flink and Apache Kafka: A Winning Partnership

    作者:禪與計(jì)算機(jī)程序設(shè)計(jì)藝術(shù) Apache Flink 和 Apache Kafka 是構(gòu)建可靠、高吞吐量和低延遲的數(shù)據(jù)管道(data pipeline)的兩個(gè)著名的開(kāi)源項(xiàng)目。2019年4月,兩者宣布合作共贏。在這次合作中,Apache Kafka 將提供強(qiáng)大的消息存儲(chǔ)能力、Flink 將作為一個(gè)分布式數(shù)據(jù)流處理平臺(tái)來(lái)對(duì)其進(jìn)行

    2024年02月11日
    瀏覽(21)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包