国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Flink讀取數(shù)據(jù)的5種方式(文件,Socket,Kafka,MySQL,自定義數(shù)據(jù)源)

這篇具有很好參考價(jià)值的文章主要介紹了Flink讀取數(shù)據(jù)的5種方式(文件,Socket,Kafka,MySQL,自定義數(shù)據(jù)源)。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

從文件中讀取數(shù)據(jù)

這是最簡(jiǎn)單的數(shù)據(jù)讀取方式。當(dāng)需要進(jìn)行功能測(cè)試時(shí),可以將數(shù)據(jù)保存在文件中,讀取后驗(yàn)證流處理的邏輯是否符合預(yù)期。

程序代碼:

package cn.jihui.flink

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment

object readFile {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val file_name = "C:\\Users\\32985\\IdeaProjects\\flink_demo1\\resources\\wc.txt"

    val streamData = env.readTextFile(file_name)

    streamData.print

    env.execute("read data from file")
  }
}

輸出結(jié)果

"C:\Program Files\Java\jdk-11\bin\java.exe" "-javaagent:D:\Program Files\JetBrains\IntelliJ IDEA Community Edition 2019.3.3\lib\idea_rt.jar=61478:D:\Program Files\JetBrains\IntelliJ IDEA Community Edition 2019.3.3\bin" -Dfile.encoding=UTF-8 -classpath C:\Users\32985\IdeaProjects\flink_demo1\out\production\flink_demo1;D:\Bigdata\scala-2.12.18\lib\scala-library.jar;D:\Bigdata\scala-2.12.18\lib\scala-parser-combinators_2.12-1.0.7.jar;D:\Bigdata\scala-2.12.18\lib\scala-reflect.jar;D:\Bigdata\scala-2.12.18\lib\scala-swing_2.12-2.0.3.jar;D:\Bigdata\scala-2.12.18\lib\scala-xml_2.12-2.1.0.jar;D:\Bigdata\flink-1.10.1\lib\log4j-1.2.17.jar;D:\Bigdata\flink-1.10.1\lib\slf4j-log4j12-1.7.15.jar;D:\Bigdata\flink-1.10.1\lib\flink-dist_2.12-1.10.1.jar;D:\Bigdata\flink-1.10.1\lib\flink-table_2.12-1.10.1.jar;D:\Bigdata\flink-1.10.1\lib\flink-table-blink_2.12-1.10.1.jar cn.jihui.flink.readFile
log4j:WARN No appenders could be found for logger (org.apache.flink.api.java.typeutils.TypeExtractor).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/D:/Bigdata/flink-1.10.1/lib/flink-dist_2.12-1.10.1.jar) to field java.lang.String.value
WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
2> hello world
3> how are you
5> I am fine
7> how old are you

Process finished with exit code 0

從Socket中讀取數(shù)據(jù)

用于驗(yàn)證一些通過(guò)Socket傳輸數(shù)據(jù)的場(chǎng)景非常方便。

程序代碼:

package cn.jihui.flink

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment

object readSocket {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val ip = "172.16.3.6"
    val port = 9999

    val streamData = env.socketTextStream(ip, port)

    streamData.print

    env.execute("read data from socket")
  }
}

測(cè)試時(shí),需要先在172.16.3.6的服務(wù)器上啟動(dòng)nc,然后再啟動(dòng)Flink讀取數(shù)據(jù)。如

[bigdata@vm6 ~]$ nc -lk 9999
hello world
how are you  
happy new year

在nc每輸入一行數(shù)據(jù),在Flink上均可接收到該行數(shù)據(jù)。

Flink輸出內(nèi)容如下:

"C:\Program Files\Java\jdk-11\bin\java.exe" "-javaagent:D:\Program Files\JetBrains\IntelliJ IDEA Community Edition 2019.3.3\lib\idea_rt.jar=61731:D:\Program Files\JetBrains\IntelliJ IDEA Community Edition 2019.3.3\bin" -Dfile.encoding=UTF-8 -classpath C:\Users\32985\IdeaProjects\flink_demo1\out\production\flink_demo1;D:\Bigdata\scala-2.12.18\lib\scala-library.jar;D:\Bigdata\scala-2.12.18\lib\scala-parser-combinators_2.12-1.0.7.jar;D:\Bigdata\scala-2.12.18\lib\scala-reflect.jar;D:\Bigdata\scala-2.12.18\lib\scala-swing_2.12-2.0.3.jar;D:\Bigdata\scala-2.12.18\lib\scala-xml_2.12-2.1.0.jar;D:\Bigdata\flink-1.10.1\lib\log4j-1.2.17.jar;D:\Bigdata\flink-1.10.1\lib\slf4j-log4j12-1.7.15.jar;D:\Bigdata\flink-1.10.1\lib\flink-dist_2.12-1.10.1.jar;D:\Bigdata\flink-1.10.1\lib\flink-table_2.12-1.10.1.jar;D:\Bigdata\flink-1.10.1\lib\flink-table-blink_2.12-1.10.1.jar cn.jihui.flink.readSocket
log4j:WARN No appenders could be found for logger (org.apache.flink.api.java.ClosureCleaner).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/D:/Bigdata/flink-1.10.1/lib/flink-dist_2.12-1.10.1.jar) to field java.lang.String.value
WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
3> hello world
4> how are you
5> happy new year

從Kafka中讀取數(shù)據(jù)

從Kafka中讀取數(shù)據(jù),可能是Flink應(yīng)用最廣泛的一種方式。目前我的客戶也使用這種方式進(jìn)行流數(shù)據(jù)的處理。

從Kafka中讀取數(shù)據(jù),可以使用flink-connector-kafka。

創(chuàng)建一個(gè)Maven工程,加入下面的依賴:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.11_2.12</artifactId>
    <version>1.10.2</version>
</dependency>

程序代碼:

package cn.jihui.flink

import java.util.Properties

import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011

object readKafka {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "172.16.3.6:9092")
    properties.setProperty("group.id", "data-group")

    val streamData = env.addSource(new FlinkKafkaConsumer011[String]("sensor", new SimpleStringSchema, properties))

    streamData.print()
    env.execute()
  }
}

啟動(dòng)Kafka

[bigdata@vm6 kafka-2.6.0]$ ./bin/kafka-server-start.sh -daemon config/server.properties

查看是否成功

[bigdata@vm6 ~]$ jps
8690 QuorumPeerMain
21068 Jps
10366 Kafka
[bigdata@vm6 ~]$

啟動(dòng)Kafka的測(cè)試腳本

[bigdata@vm6 bin]$ ./kafka-console-producer.sh --broker-list 172.16.3.6:9092 --topic sensor
>hello world
>happy new year
>what are you doing here?   
>

當(dāng)在Kafka的測(cè)試腳本中輸入文本時(shí),啟動(dòng)Flink程序可以接收到數(shù)據(jù),并進(jìn)行輸出。

輸出結(jié)果

"C:\Program Files\Java\jdk-11\bin\java.exe" "-javaagent:D:\Program Files\JetBrains\IntelliJ IDEA Community Edition 2019.3.3\lib\idea_rt.jar=60325:D:\Program Files\JetBrains\IntelliJ IDEA Community Edition 2019.3.3\bin" -Dfile.encoding=UTF-8 -classpath C:\Users\32985\IdeaProjects\flink_demo\target\classes;C:\Users\32985\.m2\repository\org\apache\flink\flink-scala_2.12\1.10.1\flink-scala_2.12-1.10.1.jar;C:\Users\32985\.m2\repository\org\apache\flink\flink-core\1.10.1\flink-core-1.10.1.jar;C:\Users\32985\.m2\repository\org\apache\flink\flink-annotations\1.10.1\flink-annotations-1.10.1.jar;C:
......
:\Users\32985\.m2\repository\net\jpountz\lz4\lz4\1.3.0\lz4-1.3.0.jar;C:\Users\32985\.m2\repository\org\xerial\snappy\snappy-java\1.1.2.6\snappy-java-1.1.2.6.jar;C:\Users\32985\.m2\repository\mysql\mysql-connector-java\5.1.46\mysql-connector-java-5.1.46.jar;D:\Bigdata\scala-2.12.18\lib\scala-library.jar;D:\Bigdata\scala-2.12.18\lib\scala-parser-combinators_2.12-1.0.7.jar;D:\Bigdata\scala-2.12.18\lib\scala-reflect.jar;D:\Bigdata\scala-2.12.18\lib\scala-swing_2.12-2.0.3.jar;D:\Bigdata\scala-2.12.18\lib\scala-xml_2.12-2.1.0.jar cn.jihui.flink.readKafka
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/C:/Users/32985/.m2/repository/org/apache/flink/flink-core/1.10.1/flink-core-1.10.1.jar) to field java.lang.String.value
WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
7> hello world
7> happy new year
7> what are you doing here?

Process finished with exit code -1

從MySQL中讀取數(shù)據(jù)

當(dāng)需要從關(guān)系數(shù)據(jù)庫(kù)中讀取數(shù)據(jù)時(shí),可以簡(jiǎn)單的JDBC方式進(jìn)行讀取。

程序代碼:

package cn.jihui.flink

import java.sql.{DriverManager, ResultSet}

import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._

//case class SensorReading(id: String, timestamp: Long, temperature: Double)

object readMySQL {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val streamData = env.addSource(new MySQLSource)
    streamData.print()
    env.execute("read data from mysql")
  }
}

class MySQLSource extends SourceFunction[SensorReading] {
  var running: Boolean = true;

  override def run(sourceContext: SourceFunction.SourceContext[SensorReading]): Unit = {
    val strConn = "jdbc:mysql://172.16.3.6:3306/flink"
    val conn = DriverManager.getConnection(strConn, "jihui", "111111")
    val selectStmt = conn.prepareStatement("select * from t_sensor;")

    val resultRs: ResultSet = selectStmt.executeQuery()

    while (resultRs.next()) {
      if (running) {
        val id = resultRs.getString(1)
        val timestamp = resultRs.getLong(2)
        val temperature = resultRs.getDouble(3)

        sourceContext.collect(SensorReading(id, timestamp, temperature))
      }
    }

    resultRs.close()
    selectStmt.close()
    conn.close()
  }

  override def cancel(): Unit = {
    running = false
  }
}

輸出內(nèi)容:文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-592602.html

"C:\Program Files\Java\jdk-11\bin\java.exe" "-javaagent:D:\Program Files\JetBrains\IntelliJ IDEA Community Edition 2019.3.3\lib\idea_rt.jar=61886:D:\Program Files\JetBrains\IntelliJ IDEA Community Edition 2019.3.3\bin" -Dfile.encoding=UTF-8 -classpath 
......
C:\Users\32985\.m2\repository\org\apache\kafka\kafka-clients\0.11.0.2\kafka-clients-0.11.0.2.jar;C:\Users\32985\.m2\repository\net\jpountz\lz4\lz4\1.3.0\lz4-1.3.0.jar;C:\Users\32985\.m2\repository\org\xerial\snappy\snappy-java\1.1.2.6\snappy-java-1.1.2.6.jar;C:\Users\32985\.m2\repository\mysql\mysql-connector-java\5.1.46\mysql-connector-java-5.1.46.jar;D:\Bigdata\scala-2.12.18\lib\scala-library.jar;D:\Bigdata\scala-2.12.18\lib\scala-parser-combinators_2.12-1.0.7.jar;D:\Bigdata\scala-2.12.18\lib\scala-reflect.jar;D:\Bigdata\scala-2.12.18\lib\scala-swing_2.12-2.0.3.jar;D:\Bigdata\scala-2.12.18\lib\scala-xml_2.12-2.1.0.jar cn.jihui.flink.readMySQL
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/C:/Users/32985/.m2/repository/org/apache/flink/flink-core/1.10.1/flink-core-1.10.1.jar) to field java.lang.String.value
WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
7> SensorReading(Sensor_4,1687944878278,42.6205)
1> SensorReading(Sensor_10,1687944879294,31.3545)
7> SensorReading(Sensor_7,1687944878278,16.3773)
1> SensorReading(Sensor_4,1687944879294,42.6205)
7> SensorReading(Sensor_8,1687944879294,7.3655)
7> SensorReading(Sensor_2,1687944878278,32.3747)
8> SensorReading(Sensor_1,1687944879294,20.0778)
8> SensorReading(Sensor_9,1687944878278,7.65736)
5> SensorReading(Sensor_3,1687944878278,32.9043)
6> SensorReading(Sensor_1,1687944880296,20.0778)
5> SensorReading(Sensor_10,1687944880296,31.3545)
2> SensorReading(Sensor_8,1687944880296,7.3655)
6> SensorReading(Sensor_6,1687944878278,4.37994)
2> SensorReading(Sensor_5,1687944880296,42.1028)
2> SensorReading(Sensor_2,1687944880296,32.3747)
4> SensorReading(Sensor_10,1687944878278,31.3545)
4> SensorReading(Sensor_9,1687944879294,7.65736)
4> SensorReading(Sensor_9,1687944880296,7.65736)
5> SensorReading(Sensor_2,1687944879294,32.3747)
5> SensorReading(Sensor_6,1687944879294,4.37994)
3> SensorReading(Sensor_5,1687944879294,42.1028)
4> SensorReading(Sensor_7,1687944879294,16.3773)
6> SensorReading(Sensor_5,1687944878278,42.1028)
8> SensorReading(Sensor_8,1687944878278,7.3655)
1> SensorReading(Sensor_4,1687944880296,42.6205)
6> SensorReading(Sensor_3,1687944879294,32.9043)
3> SensorReading(Sensor_7,1687944880296,16.3773)
8> SensorReading(Sensor_6,1687944880296,4.37994)
3> SensorReading(Sensor_3,1687944880296,32.9043)
3> SensorReading(Sensor_1,1687944878278,20.0778)

Process finished with exit code 0

從自定義數(shù)據(jù)源讀取數(shù)據(jù)

當(dāng)需要對(duì)Flink進(jìn)行一些性能測(cè)試時(shí),可以使用自定義數(shù)據(jù)源來(lái)簡(jiǎn)化測(cè)試過(guò)程。

Flink支持自定義數(shù)據(jù)源,可以使用循環(huán)生成所需要格式的測(cè)試數(shù)據(jù),使用方式非常靈活。

程序代碼:

package cn.jihui.flink

import java.util.Random

import org.apache.flink.streaming.api.functions.source.SourceFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala._

case class SensorReading(id: String, timestamp: Long, temperature: Double)

object readCustom {
  def main(args: Array[String]): Unit = {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    val dataCustom = env.addSource(new MySensorSource())

    dataCustom.print()

    env.execute("custom data source")
  }
}

class MySensorSource extends SourceFunction[SensorReading] {
  var running: Boolean = true

  override def run(sourceContext: SourceFunction.SourceContext[SensorReading]): Unit = {
    val rand = new Random()

    var currTemp = 1.to(10).map(i => ("Sensor_" + i, rand.nextDouble() * 50))

    while(running) {
      val currTime = System.currentTimeMillis()

      currTemp.map(
        data => (data._1, data._2 + rand.nextGaussian())
      )

      currTemp.foreach(
        data => sourceContext.collect(SensorReading(data._1, currTime, data._2))
      )
    }
  }

  override def cancel(): Unit = running = false
}


運(yùn)行程序后,會(huì)產(chǎn)生源源不斷的數(shù)據(jù)。

輸出內(nèi)容:

"C:\Program Files\Java\jdk-11\bin\java.exe" "-javaagent:D:\Program Files\JetBrains\IntelliJ IDEA Community Edition 2019.3.3\lib\idea_rt.jar=62739:D:\Program Files\JetBrains\IntelliJ IDEA Community Edition 2019.3.3\bin" -Dfile.encoding=UTF-8 -classpath C:\Users\32985\IdeaProjects\flink_demo1\out\production\flink_demo1;D:\Bigdata\scala-2.12.18\lib\scala-library.jar;D:\Bigdata\scala-2.12.18\lib\scala-parser-combinators_2.12-1.0.7.jar;D:\Bigdata\scala-2.12.18\lib\scala-reflect.jar;D:\Bigdata\scala-2.12.18\lib\scala-swing_2.12-2.0.3.jar;D:\Bigdata\scala-2.12.18\lib\scala-xml_2.12-2.1.0.jar;D:\Bigdata\flink-1.10.1\lib\log4j-1.2.17.jar;D:\Bigdata\flink-1.10.1\lib\slf4j-log4j12-1.7.15.jar;D:\Bigdata\flink-1.10.1\lib\flink-dist_2.12-1.10.1.jar;D:\Bigdata\flink-1.10.1\lib\flink-table_2.12-1.10.1.jar;D:\Bigdata\flink-1.10.1\lib\flink-table-blink_2.12-1.10.1.jar cn.jihui.flink.readCustom
log4j:WARN No appenders could be found for logger (org.apache.flink.api.scala.ClosureCleaner$).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/D:/Bigdata/flink-1.10.1/lib/flink-dist_2.12-1.10.1.jar) to field java.lang.String.value
WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
7> SensorReading(Sensor_4,1687944878278,42.62052120270371)
1> SensorReading(Sensor_6,1687944878278,4.3799399141547735)
3> SensorReading(Sensor_8,1687944878278,7.365503979569099)
5> SensorReading(Sensor_2,1687944878278,32.374658798648426)
6> SensorReading(Sensor_3,1687944878278,32.9042516066123)
2> SensorReading(Sensor_7,1687944878278,16.37730093567341)
8> SensorReading(Sensor_5,1687944878278,42.1027668775983)
4> SensorReading(Sensor_1,1687944878278,20.077801500835886)
5> SensorReading(Sensor_10,1687944878278,31.35446091105863)
4> SensorReading(Sensor_9,1687944878278,7.657356771011931)
5> SensorReading(Sensor_8,1687944879294,7.365503979569099)
3> SensorReading(Sensor_6,1687944879294,4.3799399141547735)
6> SensorReading(Sensor_1,1687944879294,20.077801500835886)
1> SensorReading(Sensor_4,1687944879294,42.62052120270371)
4> SensorReading(Sensor_7,1687944879294,16.37730093567341)
2> SensorReading(Sensor_5,1687944879294,42.1027668775983)
6> SensorReading(Sensor_9,1687944879294,7.657356771011931)
7> SensorReading(Sensor_2,1687944879294,32.374658798648426)
8> SensorReading(Sensor_3,1687944879294,32.9042516066123)
7> SensorReading(Sensor_10,1687944879294,31.35446091105863)
4> SensorReading(Sensor_5,1687944880296,42.1027668775983)
1> SensorReading(Sensor_2,1687944880296,32.374658798648426)
5> SensorReading(Sensor_6,1687944880296,4.3799399141547735)
7> SensorReading(Sensor_8,1687944880296,7.365503979569099)
6> SensorReading(Sensor_7,1687944880296,16.37730093567341)
2> SensorReading(Sensor_3,1687944880296,32.9042516066123)
3> SensorReading(Sensor_4,1687944880296,42.62052120270371)
8> SensorReading(Sensor_1,1687944880296,20.077801500835886)
1> SensorReading(Sensor_10,1687944880296,31.35446091105863)
8> SensorReading(Sensor_9,1687944880296,7.657356771011931)

Process finished with exit code -1

到了這里,關(guān)于Flink讀取數(shù)據(jù)的5種方式(文件,Socket,Kafka,MySQL,自定義數(shù)據(jù)源)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • flink 從kafka讀取數(shù)據(jù)報(bào)錯(cuò)

    報(bào)錯(cuò): Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy ?? ?at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:139) ?? ?at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getGlobalFailureHandl

    2024年02月02日
    瀏覽(18)
  • 【flink番外篇】4、flink的sink(內(nèi)置、mysql、kafka、redis、clickhouse、分布式緩存、廣播變量)介紹及示例(1) - File、Socket、console

    一、Flink 專(zhuān)欄 Flink 專(zhuān)欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說(shuō)明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語(yǔ)、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月01日
    瀏覽(32)
  • 【Flink】 FlinkCDC讀取Mysql( DataStream 方式)(帶完整源碼,直接可使用)

    【Flink】 FlinkCDC讀取Mysql( DataStream 方式)(帶完整源碼,直接可使用)

    簡(jiǎn)介: ? ? FlinkCDC讀取Mysql數(shù)據(jù)源,程序中使用了自定義反序列化器,完整的Flink結(jié)構(gòu),開(kāi)箱即用。 本工程提供 1、項(xiàng)目源碼及詳細(xì)注釋?zhuān)?jiǎn)單修改即可用在實(shí)際生產(chǎn)代碼 2、成功編譯截圖 3、自己編譯過(guò)程中可能出現(xiàn)的問(wèn)題 4、mysql建表語(yǔ)句及測(cè)試數(shù)據(jù) 5、修復(fù)FlinkCDC讀取Mysql數(shù)

    2024年02月07日
    瀏覽(17)
  • 【flink番外篇】3、fflink的source(內(nèi)置、mysql、kafka、redis、clickhouse)介紹及示例(1) - File、Socket、Collection

    一、Flink 專(zhuān)欄 Flink 專(zhuān)欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說(shuō)明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語(yǔ)、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月03日
    瀏覽(21)
  • 4.3、Flink任務(wù)怎樣讀取Kafka中的數(shù)據(jù)

    4.3、Flink任務(wù)怎樣讀取Kafka中的數(shù)據(jù)

    目錄 1、添加pom依賴 2、API使用說(shuō)明 3、這是一個(gè)完整的入門(mén)案例 4、Kafka消息應(yīng)該如何解析 4.1、只獲取Kafka消息的value部分 ?4.2、獲取完整Kafka消息(key、value、Metadata) 4.3、自定義Kafka消息解析器 5、起始消費(fèi)位點(diǎn)應(yīng)該如何設(shè)置 ?5.1、earliest() 5.2、latest() 5.3、timestamp() 6、Kafka分區(qū)

    2024年02月13日
    瀏覽(19)
  • 自定義Flink SourceFunction定時(shí)讀取數(shù)據(jù)庫(kù)

    Source 是Flink獲取數(shù)據(jù)輸入的地方,可以用StreamExecutionEnvironment.addSource(sourceFunction) 將一個(gè) source 關(guān)聯(lián)到你的程序。Flink 自帶了許多預(yù)先實(shí)現(xiàn)的 source functions,不過(guò)你仍然可以通過(guò)實(shí)現(xiàn) SourceFunction 接口編寫(xiě)自定義的非并行 source,也可以通過(guò)實(shí)現(xiàn)繼承 RichSourceFunction 類(lèi)編寫(xiě)自定義的

    2024年02月02日
    瀏覽(16)
  • 【Flink】 Flink實(shí)時(shí)讀取mysql數(shù)據(jù)

    準(zhǔn)備 你需要將這兩個(gè)依賴添加到 pom.xml 中 mysql mysql-connector-java 8.0.0 讀取 kafka 數(shù)據(jù) 這里我依舊用的以前的 student 類(lèi),自己本地起了 kafka 然后造一些測(cè)試數(shù)據(jù),這里我們測(cè)試發(fā)送一條數(shù)據(jù)則 sleep 10s,意味著往 kafka 中一分鐘發(fā) 6 條數(shù)據(jù)。 package com.zhisheng.connectors.mysql.utils; impo

    2024年02月03日
    瀏覽(15)
  • 【數(shù)據(jù)湖Hudi-10-Hudi集成Flink-讀取方式&限流&寫(xiě)入方式&寫(xiě)入模式&Bucket索引】

    【數(shù)據(jù)湖Hudi-10-Hudi集成Flink-讀取方式&限流&寫(xiě)入方式&寫(xiě)入模式&Bucket索引】

    當(dāng)前表默認(rèn)是快照讀取,即讀取最新的全量快照數(shù)據(jù)并一次性返回。通過(guò)參數(shù) read.streaming.enabled 參數(shù)開(kāi)啟流讀模式,通過(guò) read.start-commit 參數(shù)指定起始消費(fèi)位置,支持指定 earliest 從最早消費(fèi)。 1.with參數(shù) 名稱 Required 默認(rèn)值 說(shuō)明 read.streaming.enabled false false 設(shè)置 true 開(kāi)啟流讀模式

    2024年02月14日
    瀏覽(21)
  • 【Flink-Kafka-To-RocketMQ】使用 Flink 自定義 Sink 消費(fèi) Kafka 數(shù)據(jù)寫(xiě)入 RocketMQ

    這里的 maven 依賴比較冗余,推薦大家都加上,后面陸續(xù)優(yōu)化。 注意: 1、此程序中所有的相關(guān)配置都是通過(guò) Mysql 讀取的(生產(chǎn)環(huán)境中沒(méi)有直接寫(xiě)死的,都是通過(guò)配置文件動(dòng)態(tài)配置),大家實(shí)際測(cè)試過(guò)程中可以將相關(guān)配置信息寫(xiě)死。 2、此程序中 Kafka 涉及到了 Kerberos 認(rèn)證操作

    2024年02月03日
    瀏覽(21)
  • Flink讀取mysql數(shù)據(jù)庫(kù)(java)

    Flink讀取mysql數(shù)據(jù)庫(kù)(java)

    代碼如下: 運(yùn)行結(jié)果如下:

    2024年02月12日
    瀏覽(22)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包