国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

springboot集成flink步驟,及demo

這篇具有很好參考價(jià)值的文章主要介紹了springboot集成flink步驟,及demo。希望對大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

springboot集成flink,寫代碼學(xué)習(xí)flink,集成步驟如下:

1、maven引入依賴:

<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-java</artifactId>
   <version>${flink.version}</version>
</dependency>
<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
   <version>${flink.version}</version>
</dependency>
<dependency>
   <groupId>org.apache.flink</groupId>
   <artifactId>flink-clients_2.11</artifactId>
   <version>${flink.version}</version>
</dependency>

2、配置文件配置相關(guān)參數(shù):

# Flink配置
flink.jobmanager.host=localhost
flink.jobmanager.port=6123
flink.parallelism=1

3、寫測試類,代碼如下 :文章來源地址http://www.zghlxwxcb.cn/news/detail-488057.html

import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.timestamps.AscendingTimestampExtractor;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;

import java.util.Random;



public class Demo {

    public static void main(String[] args) throws Exception {

        // 創(chuàng)建執(zhí)行環(huán)境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 創(chuàng)建數(shù)據(jù)源
        DataStream<String> stream = env.addSource(new SourceFunction<String>() {
            private volatile boolean isRunning = true;

            @Override
            public void run(SourceContext<String> ctx) throws Exception {
                Random random = new Random();
                while (isRunning) {
                    Thread.sleep(10);
                    long timestamp = System.currentTimeMillis() - random.nextInt(5) * 1000;
                    String str = "key" + random.nextInt(10) + "," + timestamp;
                    ctx.collectWithTimestamp(str, timestamp);
                    ctx.emitWatermark(new Watermark(timestamp));
                }
            }

            @Override
            public void cancel() {
                isRunning = false;
            }
        });


        // 將數(shù)據(jù)源解析成二元組(key, timestamp)
        DataStream<Tuple2<String, Long>> parsedStream = stream.map((String line)  -> {
            String[] parts = line.split(",");
            return new Tuple2<>((String)parts[0], Long.parseLong(parts[1]));
        }).returns(Types.TUPLE(Types.STRING, Types.LONG));

        // 設(shè)置事件時(shí)間和水位線
        DataStream<Tuple2<String, Long>> withTimestampsAndWatermarks = parsedStream
                .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple2<String, Long>>() {
                    @Override
                    public long extractAscendingTimestamp(Tuple2<String, Long> element) {
                        return element.f1;
                    }
                });

        // 按鍵值進(jìn)行分組
        KeyedStream<Tuple2<String, Long>, Tuple> keyedStream = withTimestampsAndWatermarks.keyBy(0);

        // 每5秒鐘統(tǒng)計(jì)最近一分鐘的數(shù)據(jù)(使用滾動時(shí)間窗口)
        WindowedStream<Tuple2<String, Long>, Tuple, TimeWindow> windowedStream = keyedStream.window(TumblingEventTimeWindows.of(Time.minutes(1)));

        // 進(jìn)行聚合計(jì)算
        DataStream<Tuple2<String, Long>> resultStream = windowedStream
                .reduce((Tuple2<String, Long> v1, Tuple2<String, Long> v2) -> new Tuple2<>(v1.f0, v1.f1 + v2.f1));

        // 輸出結(jié)果
        resultStream.print();

        // 啟動作業(yè)
        env.execute("Demo");
    }
}

到了這里,關(guān)于springboot集成flink步驟,及demo的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • springboot集成kafka詳細(xì)步驟(發(fā)送及監(jiān)聽消息示例)

    1、本機(jī)的kafka環(huán)境配置,不再贅述 2、添加?pom 文件 3、配置application.yml 4、復(fù)寫kafka的相關(guān)配置類:生產(chǎn)、消費(fèi)相關(guān)配置 5、生產(chǎn)、消費(fèi)的偽代碼 6、測試消息發(fā)送 經(jīng)過測試!

    2024年02月11日
    瀏覽(20)
  • springboot集成starrocks、以及采用flink實(shí)現(xiàn)mysql與starrocks亞秒級同步

    (因采用dynamic-datasource-spring-boot-starter動態(tài)數(shù)據(jù)源,所以才是以下配置文件的樣式,像redis,druid根據(jù)自己情況導(dǎo)入依賴) 這個(gè)配置文件的場景是把starrocks當(dāng)成slave庫在用。某些大數(shù)據(jù)慢查詢就走starrocks 就這樣配置好后就可把starrocks當(dāng)mysql用了 重點(diǎn):采用這種方式有限制,插入

    2024年01月21日
    瀏覽(18)
  • SpringBoot集成Flink-CDC 采集PostgreSQL變更數(shù)據(jù)發(fā)布到Kafka

    SpringBoot集成Flink-CDC 采集PostgreSQL變更數(shù)據(jù)發(fā)布到Kafka

    最近做的一個(gè)項(xiàng)目,使用的是pg數(shù)據(jù)庫,公司沒有成熟的DCD組件,為了實(shí)現(xiàn)數(shù)據(jù)變更消息發(fā)布的功能,我使用SpringBoot集成Flink-CDC 采集PostgreSQL變更數(shù)據(jù)發(fā)布到Kafka。 監(jiān)聽數(shù)據(jù)變化,進(jìn)行異步通知,做系統(tǒng)內(nèi)異步任務(wù)。 架構(gòu)方案(懶得寫了,看圖吧): -- 創(chuàng)建pg 高線數(shù)據(jù)同步用

    2024年02月02日
    瀏覽(31)
  • Elasticsearch 集成--Flink 框架集成

    Elasticsearch 集成--Flink 框架集成

    ? ? ? ?Apache Spark 是一種基于內(nèi)存的快速、通用、可擴(kuò)展的大數(shù)據(jù)分析計(jì)算引擎。 Apache Spark 掀開了內(nèi)存計(jì)算的先河,以內(nèi)存作為賭注,贏得了內(nèi)存計(jì)算的飛速發(fā)展。 但是在其火熱的同時(shí),開發(fā)人員發(fā)現(xiàn),在 Spark 中,計(jì)算框架普遍存在的缺點(diǎn)和不足依然沒 有完全解決,而這

    2024年02月09日
    瀏覽(27)
  • Flink與ApacheAirflow集成

    在大數(shù)據(jù)處理領(lǐng)域,流處理和批處理是兩個(gè)非常重要的領(lǐng)域。Apache Flink 是一個(gè)流處理框架,Apache Airflow 是一個(gè)工作流管理器。在實(shí)際應(yīng)用中,我們可能需要將這兩個(gè)系統(tǒng)集成在一起,以實(shí)現(xiàn)更高效的數(shù)據(jù)處理和管理。本文將詳細(xì)介紹 Flink 與 Airflow 的集成方法,并提供一些實(shí)

    2024年02月20日
    瀏覽(17)
  • Hudi集成Flink

    Hudi集成Flink

    安裝Maven 1)上傳apache-maven-3.6.3-bin.tar.gz到/opt/software目錄,并解壓更名 tar -zxvf apache-maven-3.6. 3 -bin.tar.gz -C /opt/module/ mv ? apache -maven-3.6. 3 ?maven 2)添加環(huán)境變量到/etc/profile中 sudo ?vim /etc/profile #MAVEN_HOME export MAVEN_HOME=/opt/module/maven export PATH=$PATH:$MAVEN_HOME/bin 3)測試安裝結(jié)果 sourc

    2023年04月13日
    瀏覽(23)
  • Hudi(四)集成Flink(2)

    Hudi(四)集成Flink(2)

    ????????當(dāng)前表 默認(rèn)是快照讀取 ,即讀取最新的全量快照數(shù)據(jù)并一次性返回。通過參數(shù) read.streaming.enabled 參數(shù)開啟流讀模式,通過 read.start-commit 參數(shù)指定起始消費(fèi)位置,支持指定 earliest 從最早消費(fèi)。 1、WITH參數(shù) 名稱 Required 默認(rèn)值 說明 read.streaming.enabled false false 設(shè)置

    2024年02月07日
    瀏覽(27)
  • Flink單機(jī)版安裝教程 - 步驟詳解

    Flink單機(jī)版安裝教程 - 步驟詳解

    本教程詳細(xì)介紹了如何在單機(jī)環(huán)境下安裝和啟動Apache Flink 1.16.0版本。包括下載穩(wěn)定版安裝包,使用tar命令解壓,以及通過start-cluster.sh腳本啟動Flink集群。

    2024年02月11日
    瀏覽(16)
  • 基于Hadoop搭建Flink集群詳細(xì)步驟

    基于Hadoop搭建Flink集群詳細(xì)步驟

    目錄 1.xftp上傳flink壓縮包至hadoop102的/opt/software/目錄下 2.解壓flink壓縮包至/opt/module/目錄下 3. 配置flink-conf.yaml 4.配置masters 5.配置workers 6.配置環(huán)境變量my_env.sh 7.重啟環(huán)境變量 8.分發(fā)/opt/module/flink-1.13.0和/etc/profile.d/my_env.sh 9.另外兩臺重啟環(huán)境變量 10.開啟hadoop集群和flink集群 11.瀏

    2024年02月09日
    瀏覽(28)
  • 第二章 Flink集成Iceberg的集成方式及基本SQL使用

    第二章 Flink集成Iceberg的集成方式及基本SQL使用

    注意事項(xiàng):一般都是用基于Flink的Hive Catalog,使用HMS存儲表模型數(shù)據(jù) 1、集成方式 (1)下載jar包 下載地址 (2)啟動FlinkSQL ①StandLone模式啟動 ②Flink On Yarn模式啟動 2、基本使用 2.1、創(chuàng)建catalog 核心:可創(chuàng)建hive、hadoop、自定義等目錄,創(chuàng)建模板如下 type : 必須的 iceberg 。(必需

    2024年02月08日
    瀏覽(28)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包