国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

flink1.17 eventWindow不要配置processTrigger

這篇具有很好參考價值的文章主要介紹了flink1.17 eventWindow不要配置processTrigger。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

理論上可以eventtime processtime混用,但是下面代碼測試發(fā)現(xiàn)bug,輸入一條數(shù)據(jù)會一直輸出.

flink github無法提bug/問題. apache jira賬戶新建后竟然flink又需要一個賬戶,放棄

bug復(fù)現(xiàn)操作

idea運(yùn)行代碼后 往source kafka發(fā)送一條數(shù)據(jù)??

a,1,1690304400000

可以看到無限輸出:

flink1.17 eventWindow不要配置processTrigger,free,flink

理論上時間語義不建議混用,但是在rich函數(shù)中的確可以做到混用且正常使用

問題復(fù)現(xiàn)代碼

package com.yy.flinkWindowAndTrigger

import com.yy.flinkWindow.M1
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.configuration.{Configuration, RestOptions}
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.time.Time.seconds
import org.apache.flink.streaming.api.windowing.triggers.{ContinuousProcessingTimeTrigger, CountTrigger, ProcessingTimeTrigger}
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.joda.time.Seconds


object flinkEventWindowAndProcessTriggerBUGLearn {
  def main(args: Array[String]): Unit = {


    // flink 啟動本地webui
    val conf = new Configuration
    conf.setInteger(RestOptions.PORT, 28080)

    //    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf)
    //    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.configure(conf)


    /*
    kafka輸入:
        a,1,1690304400000        //對應(yīng) 2023-07-26 01:00:00 (無限輸出)       //如果傳入 a,1,1693037756000 對應(yīng):2023-08-26 16:15:56 (1條/s)
        a,1,7200000               // 1970-01-1 10:00:00
     */
    val brokers = "172.18.105.147:9092"
    val source = KafkaSource.builder[String].setBootstrapServers(brokers)
      .setTopics("t1")
      .setGroupId("my-group-23asdf46")
      .setStartingOffsets(OffsetsInitializer.latest())
      // .setDeserializer() // 參數(shù): KafkaRecordDeserializationSchema
      .setDeserializer(new M1())
      .build()


    val ds1 = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source")


    val s1 = ds1
      .map(_.split(","))
      .map(x => C1(x(0), x(1).toInt, x(2).toLong)) // key number 時間戳
      .assignTimestampsAndWatermarks(new OTAWatermarks(Time.seconds(0)))
      .keyBy(_.f1)
      .window(TumblingEventTimeWindows.of(seconds(10)))
      .trigger(ContinuousProcessingTimeTrigger.of[TimeWindow](seconds(10L)))
      .reduce((x, y) => C1(x.f1, x.f2 + y.f2, 100L))


    s1.print()


    env.execute("KafkaNewSourceAPi")
  }

  // 亂序流
  class OTAWatermarks(time: Time) extends BoundedOutOfOrdernessTimestampExtractor[C1](time) {
    override def extractTimestamp(element: C1): Long = {
      element.f3
    }
  }


  // key num timestamp
  case class C1(f1: String, f2: Int, f3: Long)
}

-文章來源地址http://www.zghlxwxcb.cn/news/detail-627504.html

flink1.17 eventWindow不要配置processTrigger,free,flink

-

maven pom

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>FlinkLocalDemo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>FlinkLocalDemo</name>
    <url>http://maven.apache.org</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.17.1</flink.version>
        <scala.binary.version>2.12</scala.binary.version>
        <scala.version>2.12.8</scala.version>
    </properties>



    <dependencies>
        <!-- https://mvnrepository.com/artifact/joda-time/joda-time -->
        <dependency>
            <groupId>joda-time</groupId>
            <artifactId>joda-time</artifactId>
            <version>2.12.5</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-avro</artifactId>
            <version>${flink.version}</version>
        </dependency>


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/com.alibaba.fastjson2/fastjson2 -->
        <dependency>
            <groupId>com.alibaba.fastjson2</groupId>
            <artifactId>fastjson2</artifactId>
            <version>2.0.33</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.83</version>
<!--            <version>1.2.17</version>-->
        </dependency>


        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.1</version>
            <scope>test</scope>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-common -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- 引入flink1.13.0 scala2.12.12   -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-json</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- Either... -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- or... -->

<!--        下面幾個是代碼中寫sql需要的包 四個中一個都不能少 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-planner-loader -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-loader</artifactId>
            <version>${flink.version}</version>
<!--            <scope>test</scope>-->
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-table-runtime -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <!--  https://mvnrepository.com/artifact/org.apache.flink/flink-connector-files -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-files</artifactId>
            <version>${flink.version}</version>
        </dependency>


<!--        注意: flink-table-planner-loader 不能和 flink-table-planner_${scala.binary.version} 共存-->
        <!--        <dependency>-->
        <!--            <groupId>org.apache.flink</groupId>-->
        <!--            <artifactId>flink-table-planner_${scala.binary.version}</artifactId>-->
        <!--            <version>${flink.version}</version>-->
        <!--            <scope>provided</scope>-->
        <!--        </dependency>-->


        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc</artifactId>
            <version>3.1.0-1.17</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.11</version>
        </dependency>


    </dependencies>
    <build>
            <plugins>
                <!-- 打jar插件 -->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-shade-plugin</artifactId>
                    <version>2.4.3</version>
                    <executions>
                        <execution>
                            <phase>package</phase>
                            <goals>
                                <goal>shade</goal>
                            </goals>
                            <configuration>
                                <filters>
                                    <filter>
                                        <artifact>*:*</artifact>
                                        <excludes>
                                            <exclude>META-INF/*.SF</exclude>
                                            <exclude>META-INF/*.DSA</exclude>
                                            <exclude>META-INF/*.RSA</exclude>
                                        </excludes>
                                    </filter>
                                </filters>
                            </configuration>
                        </execution>
                    </executions>
                </plugin>
                <plugin>
                    <groupId>org.scala-tools</groupId>
                    <artifactId>maven-scala-plugin</artifactId>
                    <version>2.15.2</version>
                    <executions>
                        <execution>
                            <goals>
                                <goal>compile</goal>
                                <goal>testCompile</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
                <version>3.2.2</version>
                <executions>
                    <execution>
                        <id>scala-compile-first</id>
                        <phase>process-resources</phase>
                        <goals>
                            <goal>add-source</goal>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <scalaVersion>${scala.version}</scalaVersion>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>2.5.5</version>
                <configuration>
                    <!--這部分可有可無,加上的話則直接生成可運(yùn)行jar包-->
                    <!--<archive>-->
                    <!--<manifest>-->
                    <!--<mainClass>${exec.mainClass}</mainClass>-->
                    <!--</manifest>-->
                    <!--</archive>-->
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
            </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.1</version>
                    <configuration>
                        <source>11</source>
                        <target>11</target>
                    </configuration>
                </plugin>
        </plugins>
    </build>
</project>















?

到了這里,關(guān)于flink1.17 eventWindow不要配置processTrigger的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • Flink1.17.0數(shù)據(jù)流

    Flink1.17.0數(shù)據(jù)流

    官網(wǎng)介紹 Apache Flink 是一個框架和分布式處理引擎,用于對無界和有界數(shù)據(jù)流進(jìn)行有狀態(tài)計算。Flink 被設(shè)計為在所有常見的集群環(huán)境中運(yùn)行,以內(nèi)存中的速度和任何規(guī)模執(zhí)行計算。 1.無限流有一個開始,但沒有定義的結(jié)束。它們不會在生成數(shù)據(jù)時終止并提供數(shù)據(jù)。必須連續(xù)處

    2024年02月11日
    瀏覽(26)
  • Flink1.17最新版本學(xué)習(xí)記錄

    Flink1.17最新版本學(xué)習(xí)記錄

    1)Apache Flink 是一個框架和分布式處理引擎,用于在 無邊界和有邊界 數(shù)據(jù)流上進(jìn)行有狀態(tài)的計算。 2)Flink 能在所有常見集群環(huán)境中運(yùn)行,并能以內(nèi)存速度和任意規(guī)模進(jìn)行計算。 1)批流一體 任何類型的數(shù)據(jù)都可以形成一種事件流。信用卡交易、傳感器測量、機(jī)器日志、網(wǎng)站

    2024年02月08日
    瀏覽(19)
  • flink1.17 自定義trigger ContinuousEventTimeTrigger

    在?ContinuousEventTimeTrigger 的基礎(chǔ)上新增了timeout,如果超時后窗口都沒關(guān)閉,那么就硬輸出一波,避免間斷數(shù)據(jù),留存窗口太久. ContinuousEventTimeTrigger連續(xù)事件時間觸發(fā)器與ContinuousProcessingTimeTrigger連續(xù)處理時間觸發(fā)器,指定一個固定時間間隔interval,不需要等到窗口結(jié)束才能獲取結(jié)果

    2024年02月14日
    瀏覽(41)
  • flink1.17.0 集成kafka,并且計算

    flink1.17.0 集成kafka,并且計算

    flink是實時計算的重要集成組件,這里演示如何集成,并且使用一個小例子。例子是kafka輸入消息,用逗號隔開,統(tǒng)計每個相同單詞出現(xiàn)的次數(shù),這么一個功能。 這里我使用的kafka版本是3.2.0,部署的方法可以參考, kafka部署 啟動后查看java進(jìn)程是否存在,存在后執(zhí)行下一步。

    2024年02月09日
    瀏覽(18)
  • Python 編寫 Flink 應(yīng)用程序經(jīng)驗記錄(Flink1.17.1)

    Python 編寫 Flink 應(yīng)用程序經(jīng)驗記錄(Flink1.17.1)

    目錄 官方API文檔 提交作業(yè)到集群運(yùn)行 官方示例 環(huán)境 編寫一個 Flink Python Table API 程序 執(zhí)行一個 Flink Python Table API 程序 實例處理Kafka后入庫到Mysql 下載依賴 flink-kafka jar 讀取kafka數(shù)據(jù) 寫入mysql數(shù)據(jù) flink-mysql jar https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/dev/python/overview/

    2024年02月08日
    瀏覽(22)
  • 尚硅谷大數(shù)據(jù)Flink1.17實戰(zhàn)教程-筆記02【Flink部署】

    尚硅谷大數(shù)據(jù)Flink1.17實戰(zhàn)教程-筆記02【Flink部署】

    尚硅谷大數(shù)據(jù)技術(shù)-教程-學(xué)習(xí)路線-筆記匯總表【課程資料下載】 視頻地址:尚硅谷大數(shù)據(jù)Flink1.17實戰(zhàn)教程從入門到精通_嗶哩嗶哩_bilibili 尚硅谷大數(shù)據(jù)Flink1.17實戰(zhàn)教程-筆記01【Flink概述、Flink快速上手】 尚硅谷大數(shù)據(jù)Flink1.17實戰(zhàn)教程-筆記02【Flink部署】 尚硅谷大數(shù)據(jù)Flink1.17實

    2024年02月11日
    瀏覽(31)
  • 尚硅谷大數(shù)據(jù)Flink1.17實戰(zhàn)教程-筆記01【Flink概述、Flink快速上手】

    尚硅谷大數(shù)據(jù)Flink1.17實戰(zhàn)教程-筆記01【Flink概述、Flink快速上手】

    尚硅谷大數(shù)據(jù)技術(shù)-教程-學(xué)習(xí)路線-筆記匯總表【課程資料下載】 視頻地址:尚硅谷大數(shù)據(jù)Flink1.17實戰(zhàn)教程從入門到精通_嗶哩嗶哩_bilibili 尚硅谷大數(shù)據(jù)Flink1.17實戰(zhàn)教程-筆記01【Flink概述、Flink快速上手】 尚硅谷大數(shù)據(jù)Flink1.17實戰(zhàn)教程-筆記02【Flink部署】 尚硅谷大數(shù)據(jù)Flink1.17實

    2024年02月09日
    瀏覽(51)
  • 尚硅谷大數(shù)據(jù)Flink1.17實戰(zhàn)教程-筆記03【Flink運(yùn)行時架構(gòu)】

    尚硅谷大數(shù)據(jù)Flink1.17實戰(zhàn)教程-筆記03【Flink運(yùn)行時架構(gòu)】

    尚硅谷大數(shù)據(jù)技術(shù)-教程-學(xué)習(xí)路線-筆記匯總表【課程資料下載】 視頻地址:尚硅谷大數(shù)據(jù)Flink1.17實戰(zhàn)教程從入門到精通_嗶哩嗶哩_bilibili 尚硅谷大數(shù)據(jù)Flink1.17實戰(zhàn)教程-筆記01【Flink概述、Flink快速上手】 尚硅谷大數(shù)據(jù)Flink1.17實戰(zhàn)教程-筆記02【Flink部署】 尚硅谷大數(shù)據(jù)Flink1.17實

    2024年02月16日
    瀏覽(44)
  • 尚硅谷大數(shù)據(jù)Flink1.17實戰(zhàn)教程-筆記02【部署】

    尚硅谷大數(shù)據(jù)Flink1.17實戰(zhàn)教程-筆記02【部署】

    尚硅谷大數(shù)據(jù)技術(shù)-教程-學(xué)習(xí)路線-筆記匯總表【課程資料下載】 視頻地址:尚硅谷大數(shù)據(jù)Flink1.17實戰(zhàn)教程從入門到精通_嗶哩嗶哩_bilibili 尚硅谷大數(shù)據(jù)Flink1.17實戰(zhàn)教程-筆記01【Flink概述、Flink快速上手】 尚硅谷大數(shù)據(jù)Flink1.17實戰(zhàn)教程-筆記02【Flink部署】 尚硅谷大數(shù)據(jù)Flink1.17實

    2024年02月09日
    瀏覽(19)
  • CentOS7安裝Flink1.17偽分布式

    CentOS7安裝Flink1.17偽分布式

    擁有1臺CentOS7 CentOS7安裝好jdk,官方文檔要求java 11,使用java 8也可以。可參考?CentOS7安裝jdk8 下載安裝包 ? 創(chuàng)建軟鏈接 添加如下環(huán)境變量 讓環(huán)境變量立即生效 進(jìn)入flink配置目錄,查看flink的配置文件 配置flink-conf.yaml 找到如下配置項,并按照如下修改,其中node1為機(jī)器主機(jī)名

    2024年04月14日
    瀏覽(26)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包