国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

【Apache Flink】Flink DataStream API的基本使用

這篇具有很好參考價(jià)值的文章主要介紹了【Apache Flink】Flink DataStream API的基本使用。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

Flink DataStream API的基本使用

前言

Flink DataStream API主要用于處理無(wú)界和有界數(shù)據(jù)流 。
無(wú)界數(shù)據(jù)流是一個(gè)持續(xù)生成數(shù)據(jù)的數(shù)據(jù)源,它沒(méi)有明確的結(jié)束點(diǎn),例如實(shí)時(shí)的交易數(shù)據(jù)或傳感器數(shù)據(jù)。這種類型的數(shù)據(jù)流需要使用Apache Flink的實(shí)時(shí)處理功能來(lái)連續(xù)地處理和分析。

有界數(shù)據(jù)流是一個(gè)具有明確開(kāi)始和結(jié)束點(diǎn)的數(shù)據(jù)集,例如一個(gè)文件或數(shù)據(jù)庫(kù)表。這種類型的數(shù)據(jù)流通常在批處理場(chǎng)景中使用,其中所有數(shù)據(jù)都已經(jīng)可用,并可以一次性處理。

Flink的DataStream API提供了一套豐富的操作符,如map、filter、reduce、aggregations、windowing、join等,以支持各種復(fù)雜的數(shù)據(jù)處理和分析需求。此外,DataStream API還提供了容錯(cuò)保證,能確保在發(fā)生故障時(shí),應(yīng)用程序能從最近的檢查點(diǎn)(checkpoint)恢復(fù),從而實(shí)現(xiàn)精確一次(exactly-once)的處理語(yǔ)義。

1. 基本使用方法

  1. 創(chuàng)建執(zhí)行環(huán)境:

    每一個(gè)Flink程序都需要?jiǎng)?chuàng)建一個(gè)StreamExecutionEnvironment(執(zhí)行環(huán)境),它可以被用來(lái)設(shè)置參數(shù)和創(chuàng)建從外部系統(tǒng)讀取數(shù)據(jù)的流。

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
  2. 創(chuàng)建數(shù)據(jù)流:

    你可以從各種數(shù)據(jù)源中創(chuàng)建數(shù)據(jù)流,如本地集合,文件,socket等。下面的代碼是從本地集合創(chuàng)建數(shù)據(jù)流的示例:

    DataStream<String> dataStream = env.fromElements("hello", "flink");
    
  3. 轉(zhuǎn)換操作:

    Flink提供了豐富的轉(zhuǎn)換操作,如mapfilter,reduce等。以下代碼首先將每個(gè)字符串映射為其長(zhǎng)度,然后過(guò)濾出長(zhǎng)度大于5的元素:

    DataStream<Integer> transformedStream = dataStream
        .map(s -> s.length())
        .filter(l -> l > 5);
    
  4. 數(shù)據(jù)輸出:

    Flink支持將數(shù)據(jù)流輸出到各種存儲(chǔ)系統(tǒng),如文件,socket,數(shù)據(jù)庫(kù)等。下面的代碼將數(shù)據(jù)流輸出到標(biāo)準(zhǔn)輸出:

    transformedStream.print();
    
  5. 執(zhí)行程序:

    將上述所有步驟放在main函數(shù)中,并在最后調(diào)用env.execute()方法來(lái)啟動(dòng)程序。Flink程序是懶加載的,只有在調(diào)用execute方法時(shí)才會(huì)真正開(kāi)始執(zhí)行。

    env.execute("Flink Basic API Usage");
    

2. 核心示例代碼

使用Flink DataStream API構(gòu)建一個(gè)實(shí)時(shí)Word Count程序,它會(huì)從一個(gè)socket端口讀取文本數(shù)據(jù),統(tǒng)計(jì)每個(gè)單詞的出現(xiàn)次數(shù),并將結(jié)果輸出到標(biāo)準(zhǔn)輸出。

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class WordCountExample {
    public static void main(String[] args) throws Exception {
        // 1. 創(chuàng)建執(zhí)行環(huán)境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2. 創(chuàng)建數(shù)據(jù)流,從socket接收數(shù)據(jù),需要在本地啟動(dòng)一個(gè)端口為9000的socket服務(wù)器
        DataStream<String> textStream = env.socketTextStream("localhost", 9000);

        // 3. 轉(zhuǎn)換操作
        DataStream<Tuple2<String, Integer>> wordCountStream = textStream
                .flatMap(new LineSplitter()) // 將文本行切分為單詞
                .keyBy(0) // 按單詞分組
                .sum(1); // 對(duì)每個(gè)單詞的計(jì)數(shù)求和

        // 4. 數(shù)據(jù)輸出
        wordCountStream.print();

        // 5. 執(zhí)行程序
        env.execute("Socket Word Count Example");
    }

    // 自定義一個(gè)FlatMapFunction,將輸入的每一行文本切分為單詞,并輸出為Tuple2,第一個(gè)元素是單詞,第二個(gè)元素是計(jì)數(shù)(初始值為1)
    public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
            for (String word : line.split(" ")) {
                out.collect(new Tuple2<>(word, 1));
            }
        }
    }
}

3. 完成工程代碼

下面是一個(gè)基于Apache Flink的實(shí)時(shí)單詞計(jì)數(shù)應(yīng)用程序的完整工程代碼,包括Pom.xml文件和所有Java類。

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">

    <modelVersion>4.0.0</modelVersion>
    <groupId>com.example</groupId>
    <artifactId>flink-wordcount-example</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <properties>
        <flink.version>1.13.2</flink.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

WordCountExample

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class WordCountExample {
    public static void main(String[] args) throws Exception {
        // 1. 創(chuàng)建執(zhí)行環(huán)境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2. 創(chuàng)建數(shù)據(jù)流,從socket接收數(shù)據(jù),需要在本地啟動(dòng)一個(gè)端口為9000的socket服務(wù)器
        DataStream<String> textStream = env.socketTextStream("localhost", 9000);

        // 3. 轉(zhuǎn)換操作
        DataStream<Tuple2<String, Integer>> wordCountStream = textStream
                .flatMap(new LineSplitter())  // 將文本行切分為單詞
                .keyBy(0)  // 按單詞分組
                .sum(1);  // 對(duì)每個(gè)單詞的計(jì)數(shù)求和

        // 4. 數(shù)據(jù)輸出
        wordCountStream.print();

        // 5. 執(zhí)行程序
        env.execute("Socket Word Count Example");
    }

    // 自定義一個(gè)FlatMapFunction,將輸入的每一行文本切分為單詞,并輸出為Tuple2,第一個(gè)元素是單詞,第二個(gè)元素是計(jì)數(shù)(初始值為1)
    public static class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String line, Collector<Tuple2<String, Integer>> out) {
            for (String word : line.split(" ")) {
                out.collect(new Tuple2<>(word, 1));
            }
        }
    }
}

現(xiàn)在,你可以使用Maven編譯并運(yùn)行這個(gè)程序。在啟動(dòng)程序之前,你需要在本地啟動(dòng)一個(gè)端口為9000的Socket服務(wù)器。這可以通過(guò)使用Netcat工具 (nc -lk 9000) 或者其他任何能打開(kāi)端口的工具實(shí)現(xiàn)。然后,你可以輸入文本行,F(xiàn)link程序會(huì)統(tǒng)計(jì)每個(gè)單詞出現(xiàn)的次數(shù),并實(shí)時(shí)打印結(jié)果。

測(cè)試驗(yàn)證

用py在本地啟動(dòng)一個(gè)socket服務(wù)器,監(jiān)聽(tīng)9000端口,

python比較簡(jiǎn)單實(shí)現(xiàn)一個(gè)socket通信 。寫(xiě)一個(gè)Python來(lái)驗(yàn)證上面寫(xiě)的例子。

import socket

server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server_socket.bind(("localhost", 9000))
server_socket.listen(1)

print("Waiting for connection...")
client_socket, client_address = server_socket.accept()
print("Connected to:", client_address)

while True:
    data = input("Enter text: ")
    client_socket.sendall(data.encode())

運(yùn)行Flink程序和Python socket服務(wù)器,然后在Python程序中輸入文本, 會(huì)看到Flink程序?qū)崟r(shí)統(tǒng)計(jì)每個(gè)單詞出現(xiàn)的次數(shù)并輸出到控制臺(tái)。

4. Stream 執(zhí)行環(huán)境

開(kāi)發(fā)學(xué)習(xí)過(guò)程中,不需要關(guān)注。每個(gè) Flink 應(yīng)用都需要有執(zhí)行環(huán)境,在該示例中為 env。流式應(yīng)用需要用到 StreamExecutionEnvironment。
【Apache Flink】Flink DataStream API的基本使用,Apache Flink,apache,flink,大數(shù)據(jù)

DataStream API 將你的應(yīng)用構(gòu)建為一個(gè) job graph,并附加到 StreamExecutionEnvironment 。當(dāng)調(diào)用 env.execute() 時(shí)此 graph 就被打包并發(fā)送到 JobManager 上,后者對(duì)作業(yè)并行處理并將其子任務(wù)分發(fā)給 Task Manager 來(lái)執(zhí)行。每個(gè)作業(yè)的并行子任務(wù)將在 task slot 中執(zhí)行。

注意,如果沒(méi)有調(diào)用 execute(),應(yīng)用就不會(huì)運(yùn)行。

Flink runtime: client, job manager, task managers
此分布式運(yùn)行時(shí)取決于你的應(yīng)用是否是可序列化的。它還要求所有依賴對(duì)集群中的每個(gè)節(jié)點(diǎn)均可用。

5. 參考文檔

https://nightlies.apache.org/flink/flink-docs-release-1.18/zh/docs/learn-flink/datastream_api/文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-735389.html

到了這里,關(guān)于【Apache Flink】Flink DataStream API的基本使用的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 【Flink】DataStream API使用之源算子(Source)

    【Flink】DataStream API使用之源算子(Source)

    創(chuàng)建環(huán)境之后,就可以構(gòu)建數(shù)據(jù)的業(yè)務(wù)處理邏輯了,F(xiàn)link可以從各種來(lái)源獲取數(shù)據(jù),然后構(gòu)建DataStream進(jìn)項(xiàng)轉(zhuǎn)換。一般將數(shù)據(jù)的輸入來(lái)源稱為數(shù)據(jù)源(data source),而讀取數(shù)據(jù)的算子就叫做源算子(source operator)。所以,Source就是整個(gè)程序的輸入端。 Flink中添加source的方式,是

    2024年02月10日
    瀏覽(19)
  • 什么是API網(wǎng)關(guān),解釋API網(wǎng)關(guān)的作用和特點(diǎn)?解釋什么是數(shù)據(jù)流處理,如Apache Flink和Spark Streaming的應(yīng)用?

    API網(wǎng)關(guān)是一種在分布式系統(tǒng)中的組件,用于管理不同系統(tǒng)之間的通信和交互。API網(wǎng)關(guān)的作用是在不同系統(tǒng)之間提供統(tǒng)一的接口和協(xié)議,從而簡(jiǎn)化系統(tǒng)之間的集成和互操作性。 API網(wǎng)關(guān)的特點(diǎn)包括: 路由和分發(fā)請(qǐng)求:API網(wǎng)關(guān)可以根據(jù)請(qǐng)求的URL、方法、參數(shù)等信息,將請(qǐng)求分發(fā)到

    2024年02月11日
    瀏覽(26)
  • Flink 讀寫(xiě)MySQL數(shù)據(jù)(DataStream和Table API)

    Flink 讀寫(xiě)MySQL數(shù)據(jù)(DataStream和Table API)

    Flink提供了基于JDBC的方式,可以將讀取到的數(shù)據(jù)寫(xiě)入到MySQL中;本文通過(guò)兩種方式將數(shù)據(jù)下入到MySQL數(shù)據(jù)庫(kù),其他的基于JDBC的數(shù)據(jù)庫(kù)類似,另外,Table API方式的Catalog指定為Hive Catalog方式,持久化DDL操作。 另外,JDBC 連接器允許使用 JDBC 驅(qū)動(dòng)程序從任何關(guān)系數(shù)據(jù)庫(kù)讀取數(shù)據(jù)并將

    2023年04月09日
    瀏覽(32)
  • 怎么使用 Flink 向 Apache Doris 表中寫(xiě) Bitmap 類型的數(shù)據(jù)

    Bitmap是一種經(jīng)典的數(shù)據(jù)結(jié)構(gòu),用于高效地對(duì)大量的二進(jìn)制數(shù)據(jù)進(jìn)行壓縮存儲(chǔ)和快速查詢。Doris支持bitmap數(shù)據(jù)類型,在Flink計(jì)算場(chǎng)景中,可以結(jié)合Flink doris Connector對(duì)bitmap數(shù)據(jù)做計(jì)算。 社區(qū)里很多小伙伴在是Doris Flink Connector的時(shí)候,不知道怎么寫(xiě)B(tài)itmap類型的數(shù)據(jù),本文將介紹如何

    2024年02月07日
    瀏覽(18)
  • 使用 Flink CDC 實(shí)現(xiàn) MySQL 數(shù)據(jù),表結(jié)構(gòu)實(shí)時(shí)入 Apache Doris

    現(xiàn)有數(shù)據(jù)庫(kù):mysql 數(shù)據(jù):庫(kù)表較多,每個(gè)企業(yè)用戶一個(gè)分庫(kù),每個(gè)企業(yè)下的表均不同,無(wú)法做到聚合,且表可以被用戶隨意改動(dòng),增刪改列等,增加表 分析:用戶自定義分析,通過(guò)拖拽定義圖卡,要求實(shí)時(shí),點(diǎn)擊確認(rèn)即出現(xiàn)相應(yīng)結(jié)果,其中有無(wú)法預(yù)判的過(guò)濾 問(wèn)題:隨業(yè)務(wù)增長(zhǎng)

    2023年04月08日
    瀏覽(24)
  • Flink DataStream API CDC同步MySQL數(shù)據(jù)到StarRocks

    Flink DataStream API CDC同步MySQL數(shù)據(jù)到StarRocks

    Flink:1.16.1 pom文件如下 Java代碼 SourceAndSinkInfo 類,用于定義source和sink的IP、端口、賬號(hào)、密碼信息 DataCenterShine實(shí)體類,字段與數(shù)據(jù)庫(kù)一一對(duì)應(yīng)。 StarRocksPrimary 實(shí)體類 FieldInfo注解類,用于標(biāo)記字段序號(hào)、是否為主鍵、是否為空,后續(xù)生成TableSchema需要使用到。 TableName 注解類,

    2024年02月03日
    瀏覽(32)
  • 大數(shù)據(jù)學(xué)習(xí)之Flink算子、了解DataStream API(基礎(chǔ)篇一)

    大數(shù)據(jù)學(xué)習(xí)之Flink算子、了解DataStream API(基礎(chǔ)篇一)

    注: 本文只涉及DataStream 原因:隨著大數(shù)據(jù)和流式計(jì)算需求的增長(zhǎng),處理實(shí)時(shí)數(shù)據(jù)流變得越來(lái)越重要。因此,DataStream由于其處理實(shí)時(shí)數(shù)據(jù)流的特性和能力,逐漸替代了DataSet成為了主流的數(shù)據(jù)處理方式。 目錄 DataStream API (基礎(chǔ)篇) 前摘: 一、執(zhí)行環(huán)境 1. 創(chuàng)建執(zhí)行環(huán)境 2. 執(zhí)

    2024年01月23日
    瀏覽(27)
  • 【日常Exception】第三十三回:Flink運(yùn)行jar包報(bào)錯(cuò)NoSuchMethodError: org.apache.flink.api.common.functions.Runtime....

    主要報(bào)錯(cuò)內(nèi)容: java.lang.NoSuchMethodError: org.apache.flink.api.common.functions.RuntimeContext.getMetricGroup()Lorg/apache/flink/metrics/MetricGroup; 報(bào)錯(cuò)全量信息: 原因: 升級(jí)后使用的flink安裝版本是1.14.5,而我的jar包中是使用的1.13.2 解決: 將jar包中的pom中flink的依賴版本,也換成1.14.5,與服務(wù)器上

    2024年02月16日
    瀏覽(22)
  • 數(shù)據(jù)架構(gòu)的實(shí)時(shí)分析:Apache Flink 和 Apache Storm 的比較

    實(shí)時(shí)數(shù)據(jù)處理在大數(shù)據(jù)領(lǐng)域具有重要意義,它可以幫助企業(yè)更快地獲取和分析數(shù)據(jù),從而更快地做出決策。隨著數(shù)據(jù)量的增加,傳統(tǒng)的批處理方法已經(jīng)不能滿足企業(yè)的需求,因此需要使用實(shí)時(shí)數(shù)據(jù)處理技術(shù)。 Apache Flink 和 Apache Storm 是兩個(gè)流行的實(shí)時(shí)數(shù)據(jù)處理框架,它們都可以

    2024年01月23日
    瀏覽(29)
  • 使用 Apache Flink 開(kāi)發(fā)實(shí)時(shí) ETL

    使用 Apache Flink 開(kāi)發(fā)實(shí)時(shí) ETL

    Apache Flink 是大數(shù)據(jù)領(lǐng)域又一新興框架。它與 Spark 的不同之處在于,它是使用流式處理來(lái)模擬批量處理的,因此能夠提供亞秒級(jí)的、符合 Exactly-once 語(yǔ)義的實(shí)時(shí)處理能力。Flink 的使用場(chǎng)景之一是構(gòu)建實(shí)時(shí)的數(shù)據(jù)通道,在不同的存儲(chǔ)之間搬運(yùn)和轉(zhuǎn)換數(shù)據(jù)。本文將介紹如何使用 F

    2024年02月05日
    瀏覽(25)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包