国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Flink1.17.0數(shù)據(jù)流

這篇具有很好參考價值的文章主要介紹了Flink1.17.0數(shù)據(jù)流。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。


一.基本概念

官網(wǎng)介紹

Apache Flink 是一個框架和分布式處理引擎,用于對無界和有界數(shù)據(jù)流進(jìn)行有狀態(tài)計算。Flink 被設(shè)計為在所有常見的集群環(huán)境中運(yùn)行,以內(nèi)存中的速度和任何規(guī)模執(zhí)行計算。

1.無限流有一個開始,但沒有定義的結(jié)束。它們不會在生成數(shù)據(jù)時終止并提供數(shù)據(jù)。必須連續(xù)處理無限流,即事件必須在攝取后立即處理。不可能等待所有輸入數(shù)據(jù)到達(dá),因為輸入是無限的,并且在任何時間點都不會完成。處理無界數(shù)據(jù)通常需要按特定順序(例如事件發(fā)生的順序)引入事件,以便能夠推斷結(jié)果完整性。(即實時數(shù)據(jù))

2.有界流具有定義的開始和結(jié)束??梢酝ㄟ^在執(zhí)行任何計算之前引入所有數(shù)據(jù)來處理有界流。處理有界流不需要有序引入,因為始終可以對有界數(shù)據(jù)集進(jìn)行排序。有界流的處理也稱為批處理。(即存儲的數(shù)據(jù))

有狀態(tài)流處-flink處理流程
flink流,大數(shù)據(jù),flink
flink流,大數(shù)據(jù),flink
flink流,大數(shù)據(jù),flink
flink流,大數(shù)據(jù),flink
flink流,大數(shù)據(jù),flink

較為合適的應(yīng)用場景
flink流,大數(shù)據(jù),flink
傳統(tǒng)事務(wù)處理
flink流,大數(shù)據(jù),flink


二.Flink和Spark

  • 概念區(qū)別

    • Spark強(qiáng)勁的分布式大數(shù)據(jù)處理框架.它使用內(nèi)存中緩存和優(yōu)化的查詢執(zhí)行方式,可針對任何規(guī)模的數(shù)據(jù)進(jìn)行快速分析查詢,支持跨多個工作負(fù)載重用代碼—批處理、交互式查詢、實時分析、機(jī)器學(xué)習(xí)和圖形處理等。Spark底層基于批處理.(流是批處理不可切分的特殊情況)
    • Flink基于流(批處理是一種有界流)
  • 數(shù)據(jù)模型

    • spark采用RDD模型,spark streaming 的 DStream 實際上也就是一組組小批數(shù)據(jù)RDD的集合
    • flink基本數(shù)據(jù)模型是數(shù)據(jù)流,以及事件(Event)序列
  • 運(yùn)行時架構(gòu)

    • spark是批計算,將DAG劃分為不同的 stage,一個完成后才可以計算下一個
    • flink是標(biāo)準(zhǔn)的流執(zhí)行模式,一個事件在一個節(jié)點處理完后可以直接發(fā)往下一個節(jié)點進(jìn)行處理

三. Flink配置文件

flink流,大數(shù)據(jù),flink
jobmanager.sh 資源調(diào)度,工作分配腳本
taskmanager.sh 工作任務(wù)執(zhí)行腳本
flink 啟動集群后,命令執(zhí)行器

四. yarn部署flink

4.1 session-cluster模式

flink流,大數(shù)據(jù),flink

# 啟動hadhoop集群

# -n(--container) taskManager的數(shù)量 不建議指定.動態(tài)分配
# -s(--slot) 每個taskManager的slot數(shù)量,默認(rèn)一個slot一個core.默認(rèn)每個taskmanager的slot個數(shù)為1
# -jm: jobManager的內(nèi)存 mb.
# -tm: 每個taskManager的內(nèi)存 mb
# -nm: yarn的appName 
./yarn-session.sh -s 2 -jm 1024 -tm 1024 -nm test -d

# 提交job
./flink run -c com.vector.wc.StreamWordCount
FlinkTutorial-1.0-SNAPSHOT-jar-with-dependencies.jar --host localhost -port 7777

./flink list -a

# 取消yarn-session
yarn application --kill application_12451231_0001

4.2 pre-job-cluster模式

flink流,大數(shù)據(jù),flink

1)啟動hadoop集群(略)
2)不啟動yarn-session ,直接執(zhí)行job

./flink run -m yarn-cluster -c com.vector.wc.StreamWordCount
FlinkTutorial-1.0-SNAPSHOT-jar-with-dependencies.jar --host localhost -port 7777

??五.Flink運(yùn)行時架構(gòu)

flink運(yùn)行時組件: jobManager,TaskManager,ResourceManager,Dispacher

JobManager控制一個應(yīng)用程序執(zhí)行的主進(jìn)程,也就是說,每個應(yīng)用程序都會被一個不同的JobManager所控制執(zhí)行。

  • JobManager 會先接收到要執(zhí)行的應(yīng)用程序,這個應(yīng)用程序會包括: 作業(yè)圖(JobGraph)、邏輯數(shù)據(jù)流圖(logical dataflow graph)和打包了所有的類、庫和其它資源的JAR包。
  • JobManager 會把JobGraph轉(zhuǎn)換成一個物理層面的數(shù)據(jù)流圖,這個圖被叫做"“執(zhí)行圖”(ExecutionGraph),包含了所有可以并發(fā)執(zhí)行的任務(wù)。
  • JobManager 會向資源管理器(ResourceManager)請求執(zhí)行任務(wù)必要的資源,也就是任務(wù)管理器( TaskManager)上的插槽((slot)。一旦它獲取到了足夠的資源,就會將執(zhí)行圖分發(fā)到真正運(yùn)行它們的TaskManager上。而在運(yùn)行過程中,JobManager會負(fù)責(zé)所有需要中央?yún)f(xié)調(diào)的操作,比如說檢查點(checkpoints)的協(xié)調(diào)

TaskManager

  • Flink中的工作進(jìn)程。通常在Flink中會有多個TaskManager運(yùn)行,每一個TaskManager都包含了一定數(shù)量的插槽(slots)。插槽的數(shù)量限制了TaskManager能夠執(zhí)行的任務(wù)數(shù)量。
  • 啟動之后,TaskManager會向資源管理器注冊它的插槽;收到資源管理器的指令后,TaskManager就會將一個或者多個插槽提供給JobManager調(diào)用。JobManager就可以向插槽分配任務(wù)(tasks)來執(zhí)行了。
  • 在執(zhí)行過程中,一個TaskManager可以跟其它運(yùn)行同一應(yīng)用程序的TaskManager交換數(shù)據(jù)。

ResourceManager

  • 主要負(fù)責(zé)管理任務(wù)管理器(TaskManager)的插槽(slot) ,TaskManger插槽是Flink中定義的處理資源單元。
  • Flink為不同的環(huán)境和資源管理工具提供了不同資源管理器,比如YARN.Mesos、K8s,以及standalone部署。
  • 當(dāng)JobManager申請插槽資源時,ResourceManager會將有空閑插槽的TaskManager分配給JobManager。如果ResourceManager沒有足夠的插槽來滿足JobManager的請求,它還可以向資源提供平臺發(fā)起會話,以提供啟動TaskManager進(jìn)程的容器。

Dispacher

  • 可以跨作業(yè)運(yùn)行,它為應(yīng)用提交提供了REST接口。
  • 當(dāng)一個應(yīng)用被提交執(zhí)行時,分發(fā)器就會啟動并將應(yīng)用移交給一個JobManager。
  • Dispatcher也會啟動一個Web Ul,用來方便地展示和監(jiān)控作業(yè)執(zhí)行的信息。
  • Dispatcher在架構(gòu)中可能并不是必需的,這取決于應(yīng)用提交運(yùn)行的方式。

5.1 任務(wù)提交流程

flink流,大數(shù)據(jù),flink

5.2 如何實現(xiàn)并行計算

并行度 可以在代碼中指定,提交job指定,也可以在集群配置給默認(rèn)的并行度.
優(yōu)先級:代碼>提交job>集群配置的并行度

  • 一個特定算子的子任務(wù) (subtask)的個數(shù)被稱之為其并行度(parallelism) 。一般情況下,一個stream 的并行度,可以認(rèn)為就是其所有算子中最大的并行度。

slots
推薦按照cpu核心數(shù)設(shè)置slot

  • Flink 中每一個TaskManager都是一個JVM進(jìn)程,它可能會在獨立的線程上執(zhí)行一個或多個子任務(wù)
  • 為了控制一個TaskManager能接收多少個task,taskManager通過task slot來進(jìn)行控制(一個TaskManager至少有一個slot)
  • 默認(rèn)情況下,F(xiàn)link允許子任務(wù)共享slot,即使它們是不同任務(wù)的子任務(wù)。這樣的結(jié)果是,一個slot可以保存作業(yè)的整個管道。
  • Task Slot是靜態(tài)的概念,是指TaskManager具有的并發(fā)執(zhí)行能力

至少需要的slot數(shù) = SUM(MAX(同一個共享組的任務(wù)數(shù),同一個共享組的任務(wù)數(shù)的最大并行度))

情況1
flink流,大數(shù)據(jù),flink
情況2
flink流,大數(shù)據(jù),flink

.setParallelism(4).slotSharingGroup("01"); 設(shè)置并行度和共享組 顯示設(shè)置共享組可以指定不同的slot并行執(zhí)行.如果有地方?jīng)]配,則和前一個處于同一個共享組.如果為首部.則為defalut共享組

public class StreamWordCount {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 設(shè)置并行度
        env.setParallelism(8);
        // 從文件中讀取數(shù)據(jù) 有界流
//        String inputPath = System.getProperty("user.dir") + "/src/main/resources/text.txt";
//        FileSource<String> source = FileSource
//                .forRecordStreamFormat(
//                        new TextLineInputFormat("UTF-8"),
//                        new Path(inputPath))
//                        .build();
//        DataStream<String> inputDataStream =
//                env.fromSource(source, WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)), "text");
        
//        ParameterTool parameterTool = ParameterTool.fromArgs(args);
//        String host = parameterTool.get("host");
//        int port = parameterTool.getInt("port");
        // 從socket文本流讀取數(shù)據(jù) nc -lk 7777 無界流
        DataStreamSource<String> inputDataStream =
                env.socketTextStream("localhost", 7777);
        // 基于數(shù)據(jù)流進(jìn)行轉(zhuǎn)換計算
        SingleOutputStreamOperator<Tuple2<String, Integer>> resultSet =
                inputDataStream.flatMap(new WordCount.MyFlatMapper())
                .slotSharingGroup("02")
                .keyBy(KeySelector -> KeySelector.f0)
                .sum(1).setParallelism(4).slotSharingGroup("01");
        resultSet.print();
        // 執(zhí)行任務(wù)
        env.execute();
    }

    public static class MyFlatMapper implements FlatMapFunction<String, Tuple2<String, Integer>> {

        @Override
        public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
            // 按句號分詞
            String[] words = s.split("");
            // 遍歷所有word,包成二元組輸出
            for (String word : words) {
                collector.collect(new Tuple2<>(word, 1));
            }
        }
    }
}

5.3 執(zhí)行圖

Flink 中的執(zhí)行圖可以分成四層: StreamGraph-> JobGraph -> ExecutionGraph->物理執(zhí)行圖
StreamGraph:是根據(jù)用戶通過Stream API編寫的代碼生成的最初的圖。用來表示程序的拓?fù)浣Y(jié)構(gòu)。
JobGraph: StreamGraph經(jīng)過優(yōu)化后生成了JobGraph,提交給JobManager的數(shù)據(jù)結(jié)構(gòu)。主要的優(yōu)化為,將多個符合條件的節(jié)點chain 在一起作為一個節(jié)點
ExecutionGraph: JobManager根據(jù)JobGraph生成ExecutionGraph。ExecutionGraph: 是JobGraph的并行化版本,是調(diào)度層最核心的數(shù)據(jù)結(jié)構(gòu)。
物理執(zhí)行圖: JobManager根據(jù)ExecutionGraph對Job進(jìn)行調(diào)度后,在各個TaskManager上部署Task后形成的“圖”,并不是一個具體的數(shù)據(jù)結(jié)構(gòu)。
flink流,大數(shù)據(jù),flink

5.4 數(shù)據(jù)的傳輸形式

算子之間傳輸數(shù)據(jù)的形式可以是one-to-one (forwarding)的模式也可以是redistributing的模式,具體是哪一種形式,取決于算子的種類
One-to-one: stream維護(hù)著分區(qū)以及元素的順序(比如source和map之間)。這意味著map算子的子任務(wù)看到的元素的個數(shù)以及順序跟source算子的子任務(wù)生產(chǎn)的元素的個數(shù)、順序相同。map、fliter、flatMap等算子都是one-to-one的對應(yīng)關(guān)系。
Redistributing: stream的分區(qū)會發(fā)生改變。每一個算子的子任務(wù)依據(jù)所選擇的transformation發(fā)送數(shù)據(jù)到不同的目標(biāo)任務(wù)。例如,keyBy基于hashCode重分區(qū)、而broadcast和rebalance 會隨機(jī)重新分區(qū),這些算子都會引起redistribute過程,而redistribute過程就類似于Spark 中的shuffle 過程。

5.5 任務(wù)鏈

Flink 采用了一種稱為任務(wù)鏈的優(yōu)化技術(shù),可以在特定條件下減少本地通信的開銷。為了滿足任務(wù)鏈的要求,必須將兩個或多個算子設(shè)為相同的并行度,并通過本地轉(zhuǎn)發(fā)(local forward)的方式進(jìn)行連接
·相同并行度的one-to-one操作,F(xiàn)link這樣相連的算子鏈接在一起形成一個task,原來的算子成為里面的subtask
·并行度相同、并且是one-to-one操作
·位于同一個共享組 三個條件缺一不可

六. 流處理API

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>1.17.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.17.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>1.17.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-base -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-base</artifactId>
            <version>1.17.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-files</artifactId>
            <version>1.17.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>1.17.0</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.26</version>
        </dependency>

flink系列 1.17.0版本

創(chuàng)建一個執(zhí)行環(huán)境,表示當(dāng)前執(zhí)行程序的上下文。如果程序是獨立調(diào)用的,則此方法返回本地執(zhí)行環(huán)境;如果從命令行客戶端調(diào)用程序以提交到集群,則此方法返回此集群的執(zhí)行環(huán)境,也就是說,getExecutionEnvironment會根據(jù)查詢運(yùn)行的方式?jīng)Q定返回什么樣的運(yùn)行環(huán)境,是最常用的一種創(chuàng)建執(zhí)行環(huán)境的方式。

如果沒有設(shè)置并行度默認(rèn)flink-conf.yaml配置文件的1
flink流,大數(shù)據(jù),flink

// 封裝了對本地執(zhí)行環(huán)境和遠(yuǎn)程執(zhí)行環(huán)境的判斷 流處理
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 批處理
ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();

source.txt

sensor_1,1547718199,35.8
sensor_2,1547718201,15.4
sensor_3,1547718202,6.7
sensor_4,1547718205,38.1
sensor_1,1547718191,32.8
sensor_1,1547714191,26.8
sensor_3,1547718202,6.7

SensorReadingEntity .class

// 傳感器溫度讀數(shù)數(shù)據(jù)類型
@Data
@AllArgsConstructor
@NoArgsConstructor
public class SensorReadingEntity {
    private String id;
    private Long timestamp;
    private Double temperature;
}

6.1文件處理

public class SourceTest2_File {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 從文件中讀取數(shù)據(jù)
        String inputPath = System.getProperty("user.dir") + "/src/main/resources/source.txt";


        FileSource<String> build = FileSource
                .forRecordStreamFormat(new TextLineInputFormat(), new Path(inputPath))
                .build();
        DataStream<String> dataStream = env
                .fromSource(build, WatermarkStrategy.noWatermarks(),"source.txt");
        // 打印輸出
        dataStream.print();
        env.execute();


    }
}

??6.2kafka處理

基本配置

# 修改kafka主機(jī)ip  配置localhost或127.0.0.1在wsl上可能會有問題
conf/server.properties

listeners = PLAINTEXT://非回環(huán)ip:9092
advertised.listeners=PLAINTEXT://非回環(huán)ip:9092


#開啟kafka zookeeper服務(wù)
bin/zookeeper-server-start.sh config/zookeeper.properties

# 開啟kafka服務(wù)
bin/kafka-server-start.sh config/server.properties

實例化交換機(jī)消息隊列

# 交換機(jī)ip端口 172.27.188.96:9092  主題交換機(jī)名稱aaaaa
bin/kafka-console-producer.sh  --broker-list 172.27.188.96:9092 --topic aaaaa

java連接代碼

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        String brokers = "172.27.188.96:9092";
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers(brokers)
                .setTopics("aaaaa")
//                .setGroupId("my-group")
                .setStartingOffsets(OffsetsInitializer.latest()) // 從最早的數(shù)據(jù)開始讀取
                .setValueOnlyDeserializer(new SimpleStringSchema()) // 只需要value
                .build();

        DataStream<String> kafkaSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
        // 打印
        kafkaSource.print();
        // 執(zhí)行
        env.execute();

    }

flink流,大數(shù)據(jù),flink

??6.3 自定義數(shù)據(jù)源

/**
 * @author YuanJie
 * @projectName flink
 * @package com.vector.apitest
 * @className com.vector.apitest.SourceTest4_UDF
 * @copyright Copyright 2020 vector, Inc All rights reserved.
 * @date 2023/8/21 15:15
 */
public class SourceTest4_UDF {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);

        // 從自定義的數(shù)據(jù)源讀取數(shù)據(jù)
        DataStream<SensorReading> dataStream = env.addSource(new MySensorSource());
        // 打印輸出
        dataStream.print();
        // 執(zhí)行
        env.execute();


    }

    // 實現(xiàn)自定義的SourceFunction<OUT>接口,自定義的數(shù)據(jù)源
    private static class MySensorSource implements SourceFunction<SensorReading> {
        // 定義一個標(biāo)志位,用來表示數(shù)據(jù)源是否正常運(yùn)行發(fā)出數(shù)據(jù)
        private boolean running = true;

        @Override
        public void run(SourceContext<SensorReading> sourceContext) throws Exception {
            // 定義一個隨機(jī)數(shù)發(fā)生器
            Random random = new Random();
            // 設(shè)置10個傳感器的初始溫度 0~120℃正態(tài)分布
            HashMap<String, Double> sensorTempMap = new HashMap<>();
            for (int i = 0; i < 10; i++) {
                sensorTempMap.put("sensor_" + (i + 1), 60 + random.nextGaussian() * 20);
            }
            while (running) {
                // 在while循環(huán)中,隨機(jī)生成SensorReading數(shù)據(jù)
                for (String sensorId : sensorTempMap.keySet()) {
                    // 在當(dāng)前溫度基礎(chǔ)上隨機(jī)波動
                    Double newTemp = sensorTempMap.get(sensorId) + random.nextGaussian();
                    sensorTempMap.put(sensorId, newTemp);
                    sourceContext.collect(new SensorReading(sensorId, System.currentTimeMillis(), newTemp));
                }
                // 每隔1秒鐘發(fā)送一次傳感器數(shù)據(jù)
                TimeUnit.MILLISECONDS.sleep(1000L);
            }

        }
        @Override
        public void cancel() {
            running = false;
        }
    }
}

??6.4 Transform-轉(zhuǎn)換算子

6.4.1 map

flink流,大數(shù)據(jù),flink

6.4.2 flatMap

flink流,大數(shù)據(jù),flink

6.4.3 filter

flink流,大數(shù)據(jù),flink
demo

 public static void main(String[] args) throws Exception {
      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      env.setParallelism(2);

      // 從文件讀數(shù)據(jù)
      FileSource<String> build = FileSource
              .forRecordStreamFormat(new TextLineInputFormat(),
                      new Path(System.getProperty("user.dir") + "/src/main/resources/source.txt"))
              .build();

      // map 轉(zhuǎn)換成長度輸出
      DataStream<Integer> map = env.fromSource(build,  WatermarkStrategy.noWatermarks(), "source.txt")
              .map(String::length);
      // flatMap 按逗號分隔
      DataStream<String> flatMap = env.fromSource(build,  WatermarkStrategy.noWatermarks(), "source.txt")
              .flatMap((String s,Collector<String> collector) -> {
                  String[] split = s.split(",");
                  for (String s1 : split) {
                      collector.collect(s1);
                  }
              })
              .returns(Types.STRING);

      // filter 篩選 sensor_1 開頭的id
      DataStream<String> filter = env.fromSource(build,  WatermarkStrategy.noWatermarks(), "source.txt")
              .filter(s -> s.startsWith("sensor_1"));

      map.print("map");
      flatMap.print("flatMap");
      filter.print("filter");

      env.execute();

  }

flink流,大數(shù)據(jù),flink

6.5 ??分組聚合

6.5.1 keyBy

Flink中分組后才能聚合

根據(jù)某個字段將數(shù)據(jù)分到不同分區(qū).每個分區(qū)包含相同的key. 內(nèi)部以hash形式實現(xiàn).(存在hash沖突,因此能保證需要的均在某分區(qū),但無法保證該分區(qū)key唯一)

6.5.2 滾動聚合算子(Rolling Aggregation)

這些算子可以針對KeyedStream的每一個支流做聚合。

  • sum()
  • min()
  • max()
  • minBy()
  • maxBy()
demo - max

求每組中首個設(shè)備溫度變化
flink流,大數(shù)據(jù),flink

sensor_1,1547718199,35.8
sensor_2,1547718201,15.4
sensor_3,1547718202,6.7
sensor_4,1547718205,38.1
sensor_1,1547718191,39.8
sensor_1,1547714191,40.8
sensor_3,1547718202,6.7
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);

        FileSource<String> build = FileSource
                .forRecordStreamFormat(new TextLineInputFormat(),
                        new Path(System.getProperty("user.dir") + "/src/main/resources/source.txt"))
                .build();
        DataStream<String> streamSource = env.fromSource(build,
                WatermarkStrategy.noWatermarks(),
                "source.txt");
        // 轉(zhuǎn)換成SensorReading 類型
        DataStream<SensorReadingEntity> dataStream = streamSource.map(item -> {
            String[] split = item.split(",");
            return new SensorReadingEntity(split[0], Long.parseLong(split[1]), Double.parseDouble(split[2]));
        });

        // 分組 滾動聚合
        DataStream<SensorReadingEntity> temperature = dataStream.keyBy(SensorReadingEntity::getId)
                // 滾動聚合
                .max("temperature");

        temperature.print("temperature");
        env.execute();
    }
demo-maxBy

求實時數(shù)據(jù)中每組當(dāng)前最大溫度數(shù)據(jù)
flink流,大數(shù)據(jù),flink

6.5.3 reduce

求最大溫度值,以及當(dāng)前最新的時間戳
flink流,大數(shù)據(jù),flink

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);

        FileSource<String> build = FileSource
                .forRecordStreamFormat(new TextLineInputFormat(),
                        new Path(System.getProperty("user.dir") + "/src/main/resources/source.txt"))
                .build();
        DataStream<String> streamSource = env.fromSource(build,
                WatermarkStrategy.noWatermarks(),
                "source.txt");
        // 轉(zhuǎn)換成SensorReading 類型
        DataStream<SensorReadingEntity> dataStream = streamSource.map(item -> {
            String[] split = item.split(",");
            return new SensorReadingEntity(split[0], Long.parseLong(split[1]), Double.parseDouble(split[2]));
        });

        // reduce 聚合 求最大溫度值,以及當(dāng)前最新的時間戳
        DataStream<SensorReadingEntity> reduce = dataStream.keyBy(SensorReadingEntity::getId)
                .reduce((curState, newData) -> {
                    return new SensorReadingEntity(curState.getId(), newData.getTimestamp(), Math.max(curState.getTemperature(), newData.getTemperature()));
                });

        reduce.print("reduce");
        env.execute();
    }

??6.6 多流轉(zhuǎn)換算子 分流

以下api的demo集合

sensor_1,1547718199,35.8
sensor_2,1547718201,15.4
sensor_3,1547718202,6.7
sensor_4,1547718205,38.1
sensor_1,1547718191,32.8
sensor_1,1547714191,26.8
sensor_3,1547718202,6.7
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);

        FileSource<String> build = FileSource
                .forRecordStreamFormat(new TextLineInputFormat(),
                        new Path(System.getProperty("user.dir") + "/src/main/resources/source.txt"))
                .build();
        DataStream<String> streamSource = env.fromSource(build,
                WatermarkStrategy.noWatermarks(),
                "source.txt");
        // 轉(zhuǎn)換成SensorReading 類型
        DataStream<SensorReadingEntity> dataStream = streamSource.map(item -> {
            String[] split = item.split(",");
            return new SensorReadingEntity(split[0], Long.parseLong(split[1]), Double.parseDouble(split[2]));
        });

        // 分流 側(cè)輸出流 定義低溫流標(biāo)識
        final OutputTag<SensorReadingEntity> outputTag = new OutputTag<SensorReadingEntity>("low") {};
        SingleOutputStreamOperator<SensorReadingEntity> process = dataStream.process(new ProcessFunction<SensorReadingEntity, SensorReadingEntity>() {
            @Override
            public void processElement(SensorReadingEntity sensorReadingEntity, ProcessFunction<SensorReadingEntity, SensorReadingEntity>.Context context, Collector<SensorReadingEntity> collector) throws Exception {
                if (sensorReadingEntity.getTemperature() > 30) {
                    collector.collect(sensorReadingEntity);
                } else {
                    context.output(outputTag, sensorReadingEntity);
                }
            }
        });

        // 低溫流 側(cè)輸出流
        process.getSideOutput(outputTag).print("low");
        // 高溫流 主流
        process.print("high");


        // 連接流(數(shù)據(jù)類型不同) + 合流(完全相同) 轉(zhuǎn)換為二元組
        DataStream<Object> map = process.map(item -> {
                    SensorReadingEntity sensorReadingEntity = (SensorReadingEntity) item;
                    return new Tuple2<>(sensorReadingEntity.getId(), sensorReadingEntity.getTemperature());
                })
                .returns(Types.TUPLE(Types.STRING, Types.DOUBLE))
                .connect(process.getSideOutput(outputTag))
                .map(new CoMapFunction<Tuple2<String, Double>, SensorReadingEntity, Object>() {
                    @Override
                    public Object map1(Tuple2<String, Double> tuple2) throws Exception {
                        return new Tuple3<>(tuple2.f0, tuple2.f1, "高溫報警");
                    }
                    @Override
                    public Object map2(SensorReadingEntity sensorReadingEntity) throws Exception {
                        return new Tuple2<>(sensorReadingEntity.getId(), "正常");
                    }
                });
        map.print("connect");


        // union 合流
        DataStream<SensorReadingEntity> union = process.union(process.getSideOutput(outputTag));
        union.print("union");
        env.execute();
    }
6.6.1 getSideOutput 分流輸出

用于將實時數(shù)據(jù),根據(jù)條件分流輸出.
根據(jù)30℃標(biāo)準(zhǔn)值分為高溫主流和低溫 側(cè)輸出流
flink流,大數(shù)據(jù),flink

6.6.2 connect 和CoMap 合流

connect只能連接兩條流
flink流,大數(shù)據(jù),flink

6.6.3 union 合流

可以合并多條流,但是流的類型必須一致
DataStream →DataStream:對兩個或者兩個以上的DataStream進(jìn)行union操作,產(chǎn)生一個包含所有DataStream元素的新DataStream.
flink流,大數(shù)據(jù),flink

七. 函數(shù)類

7.1 ??UDF函數(shù)類

Flink暴露了所有udf函數(shù)的接口(實現(xiàn)方式為接口或者抽象類)。例如MapFunction,FilterFunction,ProcessFunction等.

7.2 ??RichFunction

“富函數(shù)”是DataStream API提供的一個函數(shù)類的接口,所有Flink函數(shù)類都有其Rich 版本。它與常規(guī)函數(shù)的不同在于,可以獲取運(yùn)行環(huán)境的上下文,并擁有一些生命周期方法,所以可以實現(xiàn)更復(fù)雜的功能。
flink流,大數(shù)據(jù),flink

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);

        FileSource<String> build = FileSource
                .forRecordStreamFormat(new TextLineInputFormat(),
                        new Path(System.getProperty("user.dir") + "/src/main/resources/source.txt"))
                .build();
        DataStream<String> streamSource = env.fromSource(build,
                WatermarkStrategy.noWatermarks(),
                "source.txt");
        // 轉(zhuǎn)換成SensorReading 類型
        DataStream<SensorReadingEntity> dataStream = streamSource.map(item -> {
            String[] split = item.split(",");
            return new SensorReadingEntity(split[0], Long.parseLong(split[1]), Double.parseDouble(split[2]));
        });

        DataStream<Tuple2<String, Integer>> resultStream = dataStream.map(new MyMapFunction());
        resultStream.print("MyMapFunction");

        DataStream<Tuple2<String, Integer>> resultRichStream = dataStream.map(new MyRichMapFunction());
        resultStream.print("MyRichMapFunction");
        env.execute();
    }

    public static class MyMapFunction implements MapFunction<SensorReadingEntity, Tuple2<String, Integer>> {

        @Override
        public Tuple2<String, Integer> map(SensorReadingEntity value) throws Exception {
            return new Tuple2<>(value.getId(), value.getId().length());
        }
    }

    public static class MyRichMapFunction extends RichMapFunction<SensorReadingEntity,Tuple2<String,Integer>>{

            @Override
            public Tuple2<String, Integer> map(SensorReadingEntity value) throws Exception {
                return new Tuple2<>(value.getId(), getRuntimeContext().getIndexOfThisSubtask());
            }

            @Override
            public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
                // 初始化工作,一般是定義狀態(tài),或者建立數(shù)據(jù)庫連接
                // 每個并行實例都會調(diào)用一次
                System.out.println("open");
            }

            @Override
            public void close() throws Exception {
                // 一般是關(guān)閉連接和清空狀態(tài)的收尾操作
                // 每個并行實例都會調(diào)用一次
                System.out.println("close");;
            }
    }

7.3 重分區(qū)

上述介紹KeyBy是Hash重分區(qū)
broadcast 下游廣播
shuffle 隨機(jī)把當(dāng)前任務(wù)分配到下游子分區(qū)
forward 直通分區(qū)
rebalance 輪詢分配到下游子分區(qū)
rescale 分組輪詢到下游子分區(qū)
global 只傳輸?shù)较掠蔚谝粋€子分區(qū)
partitionCustom 自定義傳輸分區(qū)

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        FileSource<String> build = FileSource
                .forRecordStreamFormat(new TextLineInputFormat(),
                        new Path(System.getProperty("user.dir") + "/src/main/resources/source.txt"))
                .build();
        DataStream<String> streamSource = env.fromSource(build,
                WatermarkStrategy.noWatermarks(),
                "source.txt");

        streamSource.print("input");

        // 1. shuffle
        DataStream<String> shuffle = streamSource.shuffle();
        shuffle.print("shuffle");
        // 2. rebalance
        DataStream<String> rebalance = streamSource.rebalance();
        rebalance.print("rebalance");
        // 3. rescale
        DataStream<String> rescale = streamSource.rescale();
        rescale.print("rescale");
        // 4. global
        DataStream<String> global = streamSource.global();
        global.print("global");
        // 5. broadcast
        DataStream<String> broadcast = streamSource.broadcast();
        broadcast.print("broadcast");
        // 6. forward
        DataStream<String> forward = streamSource.forward();
        forward.print("forward");
        // 7. keyBy
        streamSource.keyBy(item -> {
            String[] split = item.split(",");
            return split[0];
        }).print("keyBy");
        env.execute();
    }

7.4 sink 寫入庫

Flink沒有類似于spark 中 foreach方法,讓用戶進(jìn)行迭代的操作。雖有對外的輸出操作都要利用sink完成。最后通過類似如下方式完成整個任務(wù)最終輸出操作。

stream. addsink( new Mysink ( xxxx))

flink1.17.0提供的連接器
flink流,大數(shù)據(jù),flink

??7.4.1 讀kafka-寫kafka

Kafka基本配置

# 修改kafka主機(jī)ip  配置localhost或127.0.0.1在wsl上可能會有問題
conf/server.properties

listeners = PLAINTEXT://非回環(huán)ip:9092
advertised.listeners=PLAINTEXT://非回環(huán)ip:9092


#開啟kafka zookeeper服務(wù)
bin/zookeeper-server-start.sh config/zookeeper.properties

# 開啟kafka服務(wù)
bin/kafka-server-start.sh config/server.properties

實例化交換機(jī)消息隊列

# 交換機(jī)ip端口 172.27.181.61:9092  主題交換機(jī)名稱topic-sink 生產(chǎn)者
bin/kafka-console-producer.sh  --broker-list 172.27.181.61:9092 --topic topic-producer
# 消費者
bin/kafka-console-consumer.sh  --bootstrap-server 172.27.181.61:9092 --topic topic-sink
public class SinkTest1_Kafka {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);


        String brokers = "172.27.181.61:9092";
        // 從kafka讀取數(shù)據(jù)
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers(brokers)
                .setTopics("topic-producer")
                .setStartingOffsets(OffsetsInitializer.latest()) // 從最新的數(shù)據(jù)開始讀取
                .setValueOnlyDeserializer(new SimpleStringSchema()) // 只需要value
                .build();

        DataStream<String> kafkaSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

        // 轉(zhuǎn)換成SensorReading 類型
        DataStream<String> dataStream = kafkaSource.map(item -> {
            String[] split = item.split(",");
            try {
                return new SensorReadingEntity(split[0], Long.parseLong(split[1]), Double.parseDouble(split[2])).toString();
            } catch (Exception e) {
                return item;
            }
        });


        // 輸出到kafka
        KafkaSink<String> sink = KafkaSink.<String>builder()
                .setBootstrapServers(brokers)
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic("topic-sink")
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .build()
                )
                .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) // 至少一次
                .build();
        dataStream.sinkTo(sink);

        env.execute();
    }
}

flink流,大數(shù)據(jù),flink

??7.4.2 讀kafka-寫非關(guān)系數(shù)據(jù)庫redis-redisson-自定義flink連接器

1.flink官方為我們提供了多種連接器,我們可以直接使用 官方連接地址 這里就不講這種方法了

2.自定義連接器,以整合redisson為例
額外引入pom

   <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>1.17.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.17.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>1.17.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-base -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-base</artifactId>
            <version>1.17.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-files</artifactId>
            <version>1.17.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>1.17.0</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.26</version>
        </dependency>

        <dependency>
            <groupId>org.redisson</groupId>
            <artifactId>redisson-spring-boot-starter</artifactId>
            <version>3.20.0</version>
        </dependency>
    </dependencies>

配置redisson數(shù)據(jù)源,springboot可以修改該配置

//@Configuration
@Slf4j
//@ConfigurationProperties(prefix = "redisson")
//@Data
public class RedissonConfig {

   // private String host;
    //private String password;
    //private Integer database;
	// @Bean(destroyMethod = "shutdown")
    public static RedissonClient redisson() {
        Config config = new Config();
        //config.useClusterServers().addNodeAddress("127.0.0.1:6379").setPassword("123456");
        config.useSingleServer()
                .setAddress("redis://"+ "localhost:6379") // 單機(jī)模式
                .setPassword("123456") // 密碼
                .setSubscriptionConnectionMinimumIdleSize(1) // 對于支持多個Redis連接的RedissonClient對象,
                .setSubscriptionConnectionPoolSize(50) // 對于支持綁定多個Redisson連接的RedissonClient對象,
                .setConnectionMinimumIdleSize(32) // 最小空閑連接數(shù)
                .setConnectionPoolSize(64) // 只能用于單機(jī)模式
                .setDnsMonitoringInterval(5000) // DNS監(jiān)控間隔時間,單位:毫秒
                .setIdleConnectionTimeout(10000) // 空閑連接超時時間,單位:毫秒
                .setConnectTimeout(10000) // 連接超時時間,單位:毫秒
                .setPingConnectionInterval(10000) // 集群狀態(tài)掃描間隔時間,單位:毫秒
                .setTimeout(5000) // 命令等待超時時間,單位:毫秒
                .setRetryAttempts(3) // 命令重試次數(shù)
                .setRetryInterval(1500) // 命令重試發(fā)送時間間隔,單位:毫秒
                .setDatabase(0) // 數(shù)據(jù)庫編號
                .setSubscriptionsPerConnection(5); // 每個連接的最大訂閱數(shù)量
        config.setCodec(new JsonJacksonCodec()); // 設(shè)置編碼方式
        return Redisson.create(config);
    }
}

source.txt

sensor_1,1547718199,35.8
sensor_2,1547718201,15.4
sensor_3,1547718202,6.7
sensor_4,1547718205,38.1
sensor_1,1547718191,32.8
sensor_1,1547714191,26.8
sensor_3,1547718202,6.7

自定義RichSinkFunction
open在創(chuàng)建sink時候只調(diào)用一次,所以這里可以用于初始化一些資源配置
invok(Object value, Context context)方法在每次有數(shù)據(jù)流入時都會調(diào)用,所以從源中每過來一個數(shù)據(jù)都會執(zhí)行
close()方法用于關(guān)閉sink時調(diào)用,一般用于釋放資源

如果使用springboot整合那直接注入,就不需要在open中初始化

public class SinkTest2_Redis_UDF {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);

        FileSource<String> build = FileSource
                .forRecordStreamFormat(new TextLineInputFormat(),
                        new Path(System.getProperty("user.dir") + "/src/main/resources/source.txt"))
                .build();
        DataStream<String> streamSource = env.fromSource(build,
                WatermarkStrategy.noWatermarks(),
                "source.txt");
        // 轉(zhuǎn)換成SensorReading 類型
        DataStream<SensorReadingEntity> dataStream = streamSource.map(item -> {
            String[] split = item.split(",");
            return new SensorReadingEntity(split[0], Long.parseLong(split[1]), Double.parseDouble(split[2]));
        });

        dataStream.addSink(new RedisSink_UDF());

        env.execute();
    }


    public static class RedisSink_UDF extends RichSinkFunction<SensorReadingEntity> {
        RedissonClient redisClient;

        @Override
        public void open(Configuration parameters) throws Exception {
            // springboot不需要這樣獲取,直接注入redisson配置類
            super.open(parameters);
            redisClient = RedissonConfig.redisson();
        }

        @Override
        public void close() throws Exception {
            super.close();
            redisClient.shutdown();
        }

        @Override
        public void invoke(SensorReadingEntity sensorReadingEntity, Context context) throws Exception {
            if(redisClient == null){
                redisClient = RedissonConfig.redisson();
            }
            System.out.println(sensorReadingEntity);
            RMap<String, SensorReadingEntity> map = redisClient.getMap("real-time-key");
            map.expire(10, TimeUnit.MINUTES);
            map.put(sensorReadingEntity.getId(),sensorReadingEntity);
        }
    }
}

flink流,大數(shù)據(jù),flink

flink流,大數(shù)據(jù),flink
flink流,大數(shù)據(jù),flink

八. Window API

一般真實的流都是無界的,怎樣處理無界的數(shù)據(jù)? 即需要統(tǒng)計數(shù)據(jù)到未來某個時間段.

  • 可以把無限的數(shù)據(jù)流進(jìn)行切分,得到有限的數(shù)據(jù)集進(jìn)行處理——也就是得到有界流
  • 窗口(window)就是將無限流切割為有限流的一種方式,它會將流數(shù)據(jù)分發(fā)到有限大小的桶(bucket)中進(jìn)行分析. flink的窗口概念更類似令牌桶(條件桶)的概念,因為無限數(shù)據(jù)是無法確定數(shù)據(jù)量的,需要將符合條件的數(shù)據(jù)放入對應(yīng)的窗口.亂序數(shù)據(jù)也因此可以變得有序.

window類型
時間窗口 (按時間截取)

  • 滾動時間窗口
  • 滑動時間窗口
  • 會話窗口

計數(shù)窗口 (按數(shù)據(jù)個數(shù)截取)

  • 滾動計數(shù)窗口
  • 滑動計數(shù)窗口

flink流,大數(shù)據(jù),flink


flink流,大數(shù)據(jù),flink


會話窗口 session window
flink流,大數(shù)據(jù),flink

由一系列事件組合一個指定時間長度的timeout間隙組成,也就是一段時間沒有接收到新數(shù)據(jù)就會生成新的窗口
特點:時間無對齊

8.1 窗口分配器 && 窗口函數(shù)

窗口分配器———window()方法
我們可以用.window()來定義一個窗口,然后基于這個window去做一些聚合或者其它處理操作。注意window()方法必須在keyBy之后才能用。Flink 提供了更加簡單的.timeWindow和.countWindow方法,用于定義時間窗口和計數(shù)窗口。
flink流,大數(shù)據(jù),flink

sensor_1,1547718199,35.8
sensor_2,1547718201,15.4
sensor_3,1547718202,6.7
sensor_4,1547718205,38.1
sensor_1,1547718191,58.8
sensor_1,1547714191,40.8
sensor_3,1547718202,6.7
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);

        // socketText  nc -lk 7777
        DataStreamSource<String> streamSource = env.socketTextStream("localhost", 7777);

        // 轉(zhuǎn)換成SensorReading 類型
        DataStream<SensorReadingEntity> dataStream = streamSource.map(item -> {
            String[] split = item.split(",");
            return new SensorReadingEntity(split[0], Long.parseLong(split[1]), Double.parseDouble(split[2]));
        });

        // 窗口測試
        // dataStream.windowAll(); //global
        DataStream<Integer> resultStream = dataStream
                .keyBy(SensorReadingEntity::getId)
                // 滾動窗口 參數(shù)1為一個窗口1分鐘s接收時間,參數(shù)2時間偏移15s開一個窗口
                .window(TumblingProcessingTimeWindows.of((Time.minutes(1)), Time.seconds(15)))
                .aggregate(new AggregateFunction<SensorReadingEntity, AtomicInteger, Integer>() {
                    @Override
                    public AtomicInteger createAccumulator() {
                        return new AtomicInteger(0);
                    }

                    @Override
                    public AtomicInteger add(SensorReadingEntity sensorReadingEntity, AtomicInteger atomicInteger) {
                        atomicInteger.incrementAndGet();
                        return atomicInteger;
                    }

                    @Override
                    public Integer getResult(AtomicInteger atomicInteger) {
                        return atomicInteger.get();
                    }

                    @Override
                    public AtomicInteger merge(AtomicInteger atomicInteger, AtomicInteger acc1) {
                        int i = atomicInteger.get() + acc1.get();
                        atomicInteger.set(i);
                        return atomicInteger;
                    }
                });

        resultStream.print();
//        dataStream.keyBy(SensorReadingEntity::getId)
//                // 滑動窗口 1個窗口30s接收時間,窗口間隔15s 開一個窗口
//                        .window(SlidingProcessingTimeWindows.of(Time.seconds(30),Time.seconds(15)));
//        dataStream.keyBy(SensorReadingEntity::getId)
//                // session窗口
//                        .window(ProcessingTimeSessionWindows.withGap(Time.minutes(1)));
//
//        dataStream.keyBy(SensorReadingEntity::getId)
//                // 滾動統(tǒng)計窗口
//                        .countWindow(5);
//        dataStream.keyBy(SensorReadingEntity::getId)
//                // 滑動統(tǒng)計窗口
//                        .countWindow(5,3);
        env.execute();
    }

先前的窗口分配器將分組的數(shù)據(jù)做了分桶. 窗口函數(shù)是對桶數(shù)據(jù)做聚合運(yùn)算.
window function定義了要對窗口中收集的數(shù)據(jù)做的計算操作可以分為兩類

8.1.1 增量聚合函數(shù)(incremental aggregation functions)
  • 每條數(shù)據(jù)到來就進(jìn)行計算,保持一個簡單的狀態(tài).
  • ReduceFunction, AggregateFunction
    flink流,大數(shù)據(jù),flink
    AggregateFunction 參數(shù)1 入?yún)㈩愋?參數(shù)2 中間累加狀態(tài) 參數(shù)3 輸出類型
8.1.2 全窗口函數(shù)(full window functions)
  • 先把窗口所有數(shù)據(jù)收集起來,等到計算的時候會遍歷所有數(shù)據(jù).
  • ProcessWindowFunction,WindowFunction
    flink流,大數(shù)據(jù),flink
8.1.3 窗口統(tǒng)計函數(shù)
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);

        // socketText  nc -lk 7777
        DataStreamSource<String> streamSource = env.socketTextStream("localhost", 7777);

        // 轉(zhuǎn)換成SensorReading 類型
        DataStream<SensorReadingEntity> dataStream = streamSource.map(item -> {
            String[] split = item.split(",");
            return new SensorReadingEntity(split[0], Long.parseLong(split[1]), Double.parseDouble(split[2]));
        });

        // 開窗計數(shù)窗口
        DataStream<Double> aggregate = dataStream.keyBy(SensorReadingEntity::getId)
                .countWindow(10, 2)
                .aggregate(new AvgTemp());
        aggregate.print();
        env.execute();
    }

    public static class AvgTemp implements AggregateFunction<SensorReadingEntity, Tuple2<Double,Integer>,Double> {
        @Override
        public Tuple2<Double, Integer> createAccumulator() {
            return new Tuple2<>(0.0,0);
        }

        @Override
        public Tuple2<Double, Integer> add(SensorReadingEntity sensorReadingEntity, Tuple2<Double, Integer> objects) {
            objects.f0 += sensorReadingEntity.getTemperature();
            objects.f1 += 1;
            return objects;
        }

        @Override
        public Double getResult(Tuple2<Double, Integer> objects) {
            return objects.f0/objects.f1;
        }

        @Override
        public Tuple2<Double, Integer> merge(Tuple2<Double, Integer> objects, Tuple2<Double, Integer> acc1) {
            return new Tuple2<>(objects.f0+acc1.f0,objects.f1+acc1.f1);
        }
    }
8.1.4 其他api函數(shù)

.trigger()——觸發(fā)器定義 window 什么時候關(guān)閉,觸發(fā)計算并輸出結(jié)果
.evictor()——移除器定義移除某些數(shù)據(jù)的邏輯
.allowedLateness()——允許處理遲到的數(shù)據(jù)
.sideOutputLateData()——將遲到的數(shù)據(jù)放入側(cè)輸出流
.getSideOutput()——獲取側(cè)輸出流

8.1.5 windows api總結(jié)

flink流,大數(shù)據(jù),flink

九.時間定義

  • Event Time 事件創(chuàng)建時間
  • Ingestion Time: 數(shù)據(jù)進(jìn)入Flink的時間
  • Processing Time: 執(zhí)行本地算子的本地系統(tǒng)時間,和操作系統(tǒng)相關(guān)

一般的更關(guān)心事件時間.而不是系統(tǒng)處理時間. 以用戶產(chǎn)生時間為準(zhǔn),所見即所得.
flink默認(rèn)時間語義在1.12已改為是事件時間

在Flink 1.12中,默認(rèn)的流時間特性已更改為 TimeCharacteristic.EventTime,因此您不再需要調(diào)用此方法來啟用事件時間支持。顯式地使用處理時間窗口和計時器在事件時間模式下工作。如果您需要禁用水印,請使用 ExecutionConfig.setAutoWatermarkInterval(長). 如果你正在使用 TimeCharacteristic。IngestionTime,請手動設(shè)置合適的 WatermarkStrategy. 如果您正在使用通用的“時間窗口”操作(例如 org.apache.flink.streaming.api.datastream.KeyedStream.timeWindow (org.apache.flink.streaming.api.windowing.time.Time) 根據(jù)時間特征改變行為,請使用顯式指定處理時間或事件時間的等效操作。

當(dāng)然還要將事件時間關(guān)聯(lián)數(shù)據(jù)的時間,當(dāng)Flink 以Event Time模式處理數(shù)據(jù)流時,它會根據(jù)數(shù)據(jù)里的時間戳來處理基于時間的算子. 由于網(wǎng)絡(luò)、分布式等原因,會導(dǎo)致亂序數(shù)據(jù)的產(chǎn)生
flink流,大數(shù)據(jù),flink

9.1 WaterMark定義

怎樣避免亂序數(shù)據(jù)帶來計算不正確?
遇到一個時間戳達(dá)到了窗口關(guān)閉時間,不應(yīng)該立刻觸發(fā)窗口計算,而是等待一段時間,等遲到的數(shù)據(jù)來了再關(guān)閉窗口
Watermark是一種衡量Event Time進(jìn)展的機(jī)制,可以設(shè)定延遲觸發(fā)
Watermark是用于處理亂序事件的,而正確的處理亂序事件,通常用Watermark機(jī)制結(jié)合window來實現(xiàn);
數(shù)據(jù)流中的Watermark用于表示timestamp小于Watermark 的數(shù)據(jù),都已經(jīng)到達(dá)了,因此,window的執(zhí)行也是由Watermark觸發(fā)的。
watermark用來讓程序自己平衡延遲和結(jié)果正確性

watermark是一條特殊的數(shù)據(jù)記錄
watermark必須單調(diào)遞增,以確保任務(wù)的事件時間時鐘在向前推進(jìn),而不是在后退
watermark 與數(shù)據(jù)的時間戳相關(guān)

基于上述條件. 當(dāng)數(shù)據(jù)亂序來臨時 —>
watermark = Max(當(dāng)前窗口最大事件時間,新來的事件時間) - 設(shè)置的延遲時間
即為watermark 標(biāo)準(zhǔn)時間.當(dāng)watermark 標(biāo)準(zhǔn)時間到達(dá)關(guān)閉要求,則直接關(guān)閉舊窗口

9.2 WaterMark上下游算子傳遞

WaterMark是慢鐘標(biāo)準(zhǔn)時間, 因此廣播到下游算子.
由于flink多是并行計算,那并行的上游任務(wù)WaterMark可能不相同.那下游獲得的WaterMark以哪個上游任務(wù)的WaterMark為準(zhǔn)呢?
下游WaterMark = Min(上游并行算子)
文章來源地址http://www.zghlxwxcb.cn/news/detail-674860.html

到了這里,關(guān)于Flink1.17.0數(shù)據(jù)流的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • 數(shù)據(jù)流處理框架Flink與Kafka

    在大數(shù)據(jù)時代,數(shù)據(jù)流處理技術(shù)已經(jīng)成為了一種重要的技術(shù)手段,用于處理和分析大量實時數(shù)據(jù)。Apache Flink和Apache Kafka是兩個非常重要的開源項目,它們在數(shù)據(jù)流處理領(lǐng)域具有廣泛的應(yīng)用。本文將深入探討Flink和Kafka的關(guān)系以及它們在數(shù)據(jù)流處理中的應(yīng)用,并提供一些最佳實踐

    2024年04月23日
    瀏覽(27)
  • 實時Flink數(shù)據(jù)流與ApacheHadoop集成

    在大數(shù)據(jù)時代,實時數(shù)據(jù)處理和批處理數(shù)據(jù)分析都是非常重要的。Apache Flink 和 Apache Hadoop 是兩個非常受歡迎的大數(shù)據(jù)處理框架。Flink 是一個流處理框架,專注于實時數(shù)據(jù)處理,而 Hadoop 是一個批處理框架,專注于大規(guī)模數(shù)據(jù)存儲和分析。在某些場景下,我們需要將 Flink 和 H

    2024年02月19日
    瀏覽(25)
  • 掌握實時數(shù)據(jù)流:使用Apache Flink消費Kafka數(shù)據(jù)

    掌握實時數(shù)據(jù)流:使用Apache Flink消費Kafka數(shù)據(jù)

    ? ? ? ? 導(dǎo)讀:使用Flink實時消費Kafka數(shù)據(jù)的案例是探索實時數(shù)據(jù)處理領(lǐng)域的絕佳方式。不僅非常實用,而且對于理解現(xiàn)代數(shù)據(jù)架構(gòu)和流處理技術(shù)具有重要意義。 ????????Apache Flink ?是一個在 有界 數(shù)據(jù)流和 無界 數(shù)據(jù)流上進(jìn)行有狀態(tài)計算分布式處理引擎和框架。Flink 設(shè)計旨

    2024年02月03日
    瀏覽(31)
  • 使用Flink實現(xiàn)Kafka到MySQL的數(shù)據(jù)流轉(zhuǎn)換:一個基于Flink的實踐指南

    使用Flink實現(xiàn)Kafka到MySQL的數(shù)據(jù)流轉(zhuǎn)換:一個基于Flink的實踐指南

    在現(xiàn)代數(shù)據(jù)處理架構(gòu)中,Kafka和MySQL是兩種非常流行的技術(shù)。Kafka作為一個高吞吐量的分布式消息系統(tǒng),常用于構(gòu)建實時數(shù)據(jù)流管道。而MySQL則是廣泛使用的關(guān)系型數(shù)據(jù)庫,適用于存儲和查詢數(shù)據(jù)。在某些場景下,我們需要將Kafka中的數(shù)據(jù)實時地寫入到MySQL數(shù)據(jù)庫中,本文將介紹

    2024年04月15日
    瀏覽(25)
  • 實時大數(shù)據(jù)流處理技術(shù):Spark Streaming與Flink的深度對比

    引言 在當(dāng)前的大數(shù)據(jù)時代,企業(yè)和組織越來越多地依賴于實時數(shù)據(jù)流處理技術(shù)來洞察和響應(yīng)業(yè)務(wù)事件。實時數(shù)據(jù)流處理不僅能夠加快數(shù)據(jù)分析的速度,還能提高決策的效率和準(zhǔn)確性。Apache Spark Streaming和Apache Flink是目前兩個主要的實時數(shù)據(jù)流處理框架,它們各自擁有獨特的特

    2024年03月10日
    瀏覽(26)
  • 【天衍系列 04】深入理解Flink的ElasticsearchSink組件:實時數(shù)據(jù)流如何無縫地流向Elasticsearch

    【天衍系列 04】深入理解Flink的ElasticsearchSink組件:實時數(shù)據(jù)流如何無縫地流向Elasticsearch

    Flink的Elasticsearch Sink是用于將Flink數(shù)據(jù)流(DataStream)中的數(shù)據(jù)發(fā)送到Elasticsearch的組件。它是Flink的一個連接器(Connector),用于實現(xiàn)將實時處理的結(jié)果或數(shù)據(jù)持續(xù)地寫入Elasticsearch集群中的索引中。 下面是一些關(guān)于Flink的Elasticsearch Sink的基礎(chǔ)概念: 數(shù)據(jù)源(Source) :Flink數(shù)據(jù)流

    2024年02月20日
    瀏覽(23)
  • 大數(shù)據(jù)流處理與實時分析:Spark Streaming和Flink Stream SQL的對比與選擇

    作者:禪與計算機(jī)程序設(shè)計藝術(shù)

    2024年02月07日
    瀏覽(26)
  • 尚硅谷大數(shù)據(jù)Flink1.17實戰(zhàn)教程-筆記02【Flink部署】

    尚硅谷大數(shù)據(jù)Flink1.17實戰(zhàn)教程-筆記02【Flink部署】

    尚硅谷大數(shù)據(jù)技術(shù)-教程-學(xué)習(xí)路線-筆記匯總表【課程資料下載】 視頻地址:尚硅谷大數(shù)據(jù)Flink1.17實戰(zhàn)教程從入門到精通_嗶哩嗶哩_bilibili 尚硅谷大數(shù)據(jù)Flink1.17實戰(zhàn)教程-筆記01【Flink概述、Flink快速上手】 尚硅谷大數(shù)據(jù)Flink1.17實戰(zhàn)教程-筆記02【Flink部署】 尚硅谷大數(shù)據(jù)Flink1.17實

    2024年02月11日
    瀏覽(31)
  • 尚硅谷大數(shù)據(jù)Flink1.17實戰(zhàn)教程-筆記01【Flink概述、Flink快速上手】

    尚硅谷大數(shù)據(jù)Flink1.17實戰(zhàn)教程-筆記01【Flink概述、Flink快速上手】

    尚硅谷大數(shù)據(jù)技術(shù)-教程-學(xué)習(xí)路線-筆記匯總表【課程資料下載】 視頻地址:尚硅谷大數(shù)據(jù)Flink1.17實戰(zhàn)教程從入門到精通_嗶哩嗶哩_bilibili 尚硅谷大數(shù)據(jù)Flink1.17實戰(zhàn)教程-筆記01【Flink概述、Flink快速上手】 尚硅谷大數(shù)據(jù)Flink1.17實戰(zhàn)教程-筆記02【Flink部署】 尚硅谷大數(shù)據(jù)Flink1.17實

    2024年02月09日
    瀏覽(51)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包