国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

使用Flink實現(xiàn)Kafka到MySQL的數(shù)據(jù)流轉(zhuǎn)換:一個基于Flink的實踐指南

這篇具有很好參考價值的文章主要介紹了使用Flink實現(xiàn)Kafka到MySQL的數(shù)據(jù)流轉(zhuǎn)換:一個基于Flink的實踐指南。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

使用Flink實現(xiàn)Kafka到MySQL的數(shù)據(jù)流轉(zhuǎn)換

在現(xiàn)代數(shù)據(jù)處理架構(gòu)中,Kafka和MySQL是兩種非常流行的技術(shù)。Kafka作為一個高吞吐量的分布式消息系統(tǒng),常用于構(gòu)建實時數(shù)據(jù)流管道。而MySQL則是廣泛使用的關(guān)系型數(shù)據(jù)庫,適用于存儲和查詢數(shù)據(jù)。在某些場景下,我們需要將Kafka中的數(shù)據(jù)實時地寫入到MySQL數(shù)據(jù)庫中,本文將介紹如何使用Apache Flink來實現(xiàn)這一過程。

mysql kafka數(shù)據(jù)轉(zhuǎn)換,flink,kafka,mysql,etl

環(huán)境準(zhǔn)備

在開始之前,請確保你的開發(fā)環(huán)境中已經(jīng)安裝并配置了以下組件:
Apache Flink 準(zhǔn)備相關(guān)pom依賴

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>EastMoney</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>1.14.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala-bridge_2.11</artifactId>
            <version>1.14.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_2.11</artifactId>
            <version>1.14.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-scala_2.11</artifactId>
            <version>1.14.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_2.11</artifactId>
            <version>1.14.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-csv</artifactId>
            <version>1.14.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>1.14.0</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.25</version>
        </dependency>
    </dependencies>

</project>

Kafka消息隊列

1. 啟動zookeeper
 zkServer start
2. 啟動kafka服務(wù)
 kafka-server-start /opt/homebrew/etc/kafka/server.properties
3. 創(chuàng)建topic
 kafka-topics --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic east_money
6. 生產(chǎn)數(shù)據(jù)
 kafka-console-producer --broker-list localhost:9092 --topic east_money

MySQL數(shù)據(jù)庫
初始化mysql表

CREATE TABLE `t_stock_code_price` (
  `id` bigint NOT NULL AUTO_INCREMENT,
  `code` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '股票代碼',
  `name` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '股票名稱',
  `close` double DEFAULT NULL COMMENT '最新價',
  `change_percent` double DEFAULT NULL COMMENT '漲跌幅',
  `change` double DEFAULT NULL COMMENT '漲跌額',
  `volume` double DEFAULT NULL COMMENT '成交量(手)',
  `amount` double DEFAULT NULL COMMENT '成交額',
  `amplitude` double DEFAULT NULL COMMENT '振幅',
  `turnover_rate` double DEFAULT NULL COMMENT '換手率',
  `peration` double DEFAULT NULL COMMENT '市盈率',
  `volume_rate` double DEFAULT NULL COMMENT '量比',
  `hign` double DEFAULT NULL COMMENT '最高',
  `low` double DEFAULT NULL COMMENT '最低',
  `open` double DEFAULT NULL COMMENT '今開',
  `previous_close` double DEFAULT NULL COMMENT '昨收',
  `pb` double DEFAULT NULL COMMENT '市凈率',
  `create_time` varchar(64) NOT NULL COMMENT '寫入時間',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=5605 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci

步驟解釋

獲取流執(zhí)行環(huán)境:首先,我們通過StreamExecutionEnvironment.getExecutionEnvironment獲取Flink的流執(zhí)行環(huán)境,并設(shè)置其運(yùn)行模式為流處理模式。

創(chuàng)建流表環(huán)境:接著,我們通過StreamTableEnvironment.create創(chuàng)建一個流表環(huán)境,這個環(huán)境允許我們使用SQL語句來操作數(shù)據(jù)流。

val senv = StreamExecutionEnvironment.getExecutionEnvironment
      .setRuntimeMode(RuntimeExecutionMode.STREAMING)
    val tEnv = StreamTableEnvironment.create(senv)

定義Kafka數(shù)據(jù)源表:我們使用一個SQL語句創(chuàng)建了一個Kafka表re_stock_code_price_kafka,這個表代表了我們要從Kafka讀取的數(shù)據(jù)結(jié)構(gòu)和連接信息。

tEnv.executeSql(
      "CREATE TABLE re_stock_code_price_kafka (" +
        "`id` BIGINT," +
        "`code` STRING," +
        "`name` STRING," +
        "`close` DOUBLE NULL," +
        "`change_percent` DOUBLE," +
        "`change` DOUBLE," +
        "`volume` DOUBLE," +
        "`amount` DOUBLE," +
        "`amplitude` DOUBLE," +
        "`turnover_rate` DOUBLE," +
        "`operation` DOUBLE," +
        "`volume_rate` DOUBLE," +
        "`high` DOUBLE ," +
        "`low` DOUBLE," +
        "`open` DOUBLE," +
        "`previous_close` DOUBLE," +
        "`pb` DOUBLE," +
        "`create_time` STRING," +
        "rise int"+
        ") WITH (" +
        "'connector' = 'kafka'," +
        "'topic' = 'east_money'," +
        "'properties.bootstrap.servers' = '127.0.0.1:9092'," +
        "'properties.group.id' = 'mysql2kafka'," +
        "'scan.startup.mode' = 'earliest-offset'," +
        "'format' = 'csv'," +
        "'csv.field-delimiter' = ','" +
        ")"
    )

    val result = tEnv.executeSql("select * from re_stock_code_price_kafka")

定義MySQL目標(biāo)表:然后,我們定義了一個MySQL表re_stock_code_price,指定了與MySQL的連接參數(shù)和表結(jié)構(gòu)。

val sink_table: String =
      """
        |CREATE TEMPORARY TABLE re_stock_code_price (
        |  id BIGINT NOT NULL,
        |  code STRING NOT NULL,
        |  name STRING NOT NULL,
        |  `close` DOUBLE,
        |  change_percent DOUBLE,
        |  change DOUBLE,
        |  volume DOUBLE,
        |  amount DOUBLE,
        |  amplitude DOUBLE,
        |  turnover_rate DOUBLE,
        |  peration DOUBLE,
        |  volume_rate DOUBLE,
        |  hign DOUBLE,
        |  low DOUBLE,
        |  `open` DOUBLE,
        |  previous_close DOUBLE,
        |  pb DOUBLE,
        |  create_time STRING NOT NULL,
        |  rise int,
        |  PRIMARY KEY (id) NOT ENFORCED
        |) WITH (
        |   'connector' = 'jdbc',
        |   'url' = 'jdbc:mysql://localhost:3306/mydb',
        |   'driver' = 'com.mysql.cj.jdbc.Driver',
        |   'table-name' = 're_stock_code_price',
        |   'username' = 'root',
        |   'password' = '12345678'
        |)
        |""".stripMargin
    tEnv.executeSql(sink_table)

數(shù)據(jù)轉(zhuǎn)換和寫入:最后,我們執(zhí)行了一個插入操作,將從Kafka讀取的數(shù)據(jù)轉(zhuǎn)換并寫入到MySQL中。

tEnv.executeSql("insert into re_stock_code_price select * from re_stock_code_price_kafka")

result.print()

全部代碼

package org.east

import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

object Kafka2Mysql {
  def main(args: Array[String]): Unit = {
    val senv = StreamExecutionEnvironment.getExecutionEnvironment
      .setRuntimeMode(RuntimeExecutionMode.STREAMING)
    val tEnv = StreamTableEnvironment.create(senv)

    tEnv.executeSql(
      "CREATE TABLE re_stock_code_price_kafka (" +
        "`id` BIGINT," +
        "`code` STRING," +
        "`name` STRING," +
        "`close` DOUBLE NULL," +
        "`change_percent` DOUBLE," +
        "`change` DOUBLE," +
        "`volume` DOUBLE," +
        "`amount` DOUBLE," +
        "`amplitude` DOUBLE," +
        "`turnover_rate` DOUBLE," +
        "`operation` DOUBLE," +
        "`volume_rate` DOUBLE," +
        "`high` DOUBLE ," +
        "`low` DOUBLE," +
        "`open` DOUBLE," +
        "`previous_close` DOUBLE," +
        "`pb` DOUBLE," +
        "`create_time` STRING," +
        "rise int"+
        ") WITH (" +
        "'connector' = 'kafka'," +
        "'topic' = 'east_money'," +
        "'properties.bootstrap.servers' = '127.0.0.1:9092'," +
        "'properties.group.id' = 'mysql2kafka'," +
        "'scan.startup.mode' = 'earliest-offset'," +
        "'format' = 'csv'," +
        "'csv.field-delimiter' = ','" +
        ")"
    )

    val result = tEnv.executeSql("select * from re_stock_code_price_kafka")


    val sink_table: String =
      """
        |CREATE TEMPORARY TABLE re_stock_code_price (
        |  id BIGINT NOT NULL,
        |  code STRING NOT NULL,
        |  name STRING NOT NULL,
        |  `close` DOUBLE,
        |  change_percent DOUBLE,
        |  change DOUBLE,
        |  volume DOUBLE,
        |  amount DOUBLE,
        |  amplitude DOUBLE,
        |  turnover_rate DOUBLE,
        |  peration DOUBLE,
        |  volume_rate DOUBLE,
        |  hign DOUBLE,
        |  low DOUBLE,
        |  `open` DOUBLE,
        |  previous_close DOUBLE,
        |  pb DOUBLE,
        |  create_time STRING NOT NULL,
        |  rise int,
        |  PRIMARY KEY (id) NOT ENFORCED
        |) WITH (
        |   'connector' = 'jdbc',
        |   'url' = 'jdbc:mysql://localhost:3306/mydb',
        |   'driver' = 'com.mysql.cj.jdbc.Driver',
        |   'table-name' = 're_stock_code_price',
        |   'username' = 'root',
        |   'password' = '12345678'
        |)
        |""".stripMargin
    tEnv.executeSql(sink_table)
    tEnv.executeSql("insert into re_stock_code_price select * from re_stock_code_price_kafka")


    result.print()
    print("數(shù)據(jù)打印完成?。?!")
  }
}

如有遇到問題可以找小編溝通交流哦。另外小編幫忙輔導(dǎo)大課作業(yè),學(xué)生畢設(shè)等。不限于python,java,大數(shù)據(jù),模型訓(xùn)練等。
mysql kafka數(shù)據(jù)轉(zhuǎn)換,flink,kafka,mysql,etl文章來源地址http://www.zghlxwxcb.cn/news/detail-851655.html

到了這里,關(guān)于使用Flink實現(xiàn)Kafka到MySQL的數(shù)據(jù)流轉(zhuǎn)換:一個基于Flink的實踐指南的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • Flink數(shù)據(jù)流

    Flink數(shù)據(jù)流

    官網(wǎng)介紹 Apache Flink 是一個框架和分布式處理引擎,用于對無界和有界數(shù)據(jù)流進(jìn)行有狀態(tài)計算。Flink 被設(shè)計為在所有常見的集群環(huán)境中運(yùn)行,以內(nèi)存中的速度和任何規(guī)模執(zhí)行計算。 1.無限流有一個開始,但沒有定義的結(jié)束。它們不會在生成數(shù)據(jù)時終止并提供數(shù)據(jù)。必須連續(xù)處

    2024年02月17日
    瀏覽(20)
  • 大數(shù)據(jù)Flink(六十):Flink 數(shù)據(jù)流和分層 API介紹

    大數(shù)據(jù)Flink(六十):Flink 數(shù)據(jù)流和分層 API介紹

    文章目錄 Flink 數(shù)據(jù)流和分層 API介紹 一、??????????????Flink 數(shù)據(jù)流

    2024年02月12日
    瀏覽(26)
  • Flink1.17.0數(shù)據(jù)流

    Flink1.17.0數(shù)據(jù)流

    官網(wǎng)介紹 Apache Flink 是一個框架和分布式處理引擎,用于對無界和有界數(shù)據(jù)流進(jìn)行有狀態(tài)計算。Flink 被設(shè)計為在所有常見的集群環(huán)境中運(yùn)行,以內(nèi)存中的速度和任何規(guī)模執(zhí)行計算。 1.無限流有一個開始,但沒有定義的結(jié)束。它們不會在生成數(shù)據(jù)時終止并提供數(shù)據(jù)。必須連續(xù)處

    2024年02月11日
    瀏覽(26)
  • 實時Flink數(shù)據(jù)流與ApacheHive集成

    在大數(shù)據(jù)時代,實時數(shù)據(jù)處理和批處理數(shù)據(jù)處理都是非常重要的。Apache Flink 是一個流處理框架,可以處理大規(guī)模的實時數(shù)據(jù)流,而 Apache Hive 是一個基于 Hadoop 的數(shù)據(jù)倉庫工具,主要用于批處理數(shù)據(jù)處理。在實際應(yīng)用中,我們可能需要將 Flink 與 Hive 集成,以實現(xiàn)流處理和批處

    2024年02月22日
    瀏覽(38)
  • 實時Flink數(shù)據(jù)流與ApacheHadoop集成

    在大數(shù)據(jù)時代,實時數(shù)據(jù)處理和批處理數(shù)據(jù)分析都是非常重要的。Apache Flink 和 Apache Hadoop 是兩個非常受歡迎的大數(shù)據(jù)處理框架。Flink 是一個流處理框架,專注于實時數(shù)據(jù)處理,而 Hadoop 是一個批處理框架,專注于大規(guī)模數(shù)據(jù)存儲和分析。在某些場景下,我們需要將 Flink 和 H

    2024年02月19日
    瀏覽(25)
  • ELK 將數(shù)據(jù)流轉(zhuǎn)換回常規(guī)索引

    ELK 將數(shù)據(jù)流轉(zhuǎn)換回常規(guī)索引

    ELK 將數(shù)據(jù)流轉(zhuǎn)換回常規(guī)索引 現(xiàn)象:創(chuàng)建索引模板是打開了數(shù)據(jù)流,導(dǎo)致不能創(chuàng)建常規(guī)索引,并且手動修改、刪除索引模板失敗 解決方法: 1、停止logstash不允許重新創(chuàng)建數(shù)據(jù)流的索引 2、kibana上刪除數(shù)據(jù)流 3、修改索引模板將數(shù)據(jù)流轉(zhuǎn)換回常規(guī)索引 4、重新啟動logstash

    2024年02月14日
    瀏覽(21)
  • Spark Streaming + Kafka構(gòu)建實時數(shù)據(jù)流

    Spark Streaming + Kafka構(gòu)建實時數(shù)據(jù)流

    1. 使用Apache Kafka構(gòu)建實時數(shù)據(jù)流 參考文檔鏈接:https://cloud.tencent.com/developer/article/1814030 2. 數(shù)據(jù)見UserBehavior.csv 數(shù)據(jù)解釋:本次實戰(zhàn)用到的數(shù)據(jù)集是CSV文件,里面是一百零四萬條淘寶用戶行為數(shù)據(jù),該數(shù)據(jù)來源是阿里云天池公開數(shù)據(jù)集 根據(jù)這一csv文檔運(yùn)用Kafka模擬實時數(shù)據(jù)流,

    2024年02月12日
    瀏覽(33)
  • Kafka數(shù)據(jù)流的實時采集與統(tǒng)計機(jī)制

    隨著大數(shù)據(jù)時代的到來,實時數(shù)據(jù)處理成為了眾多企業(yè)和組織的關(guān)注焦點。為了滿足這一需求,Apache Kafka成為了一個廣泛采用的分布式流處理平臺。Kafka以其高吞吐量、可擴(kuò)展性和容錯性而聞名,被廣泛應(yīng)用于日志收集、事件驅(qū)動架構(gòu)和實時分析等場景。 在本文中,我們將探

    2024年02月07日
    瀏覽(28)
  • 后端返回數(shù)據(jù)流,前端進(jìn)行轉(zhuǎn)換blob文件流

    后端返回數(shù)據(jù)流,前端進(jìn)行轉(zhuǎn)換blob文件流

    1. 首先相應(yīng)的頭里面請求改為 responseType: \\\'blob\\\' 2.? ?????????let res = await getPhotoVideoUrl() --此處為模擬的獲取一個視頻流的地址; ? ? ? ? const img = new Blob([res], { type: \\\'image/png\\\' }); ? ? ? ? let imgUrl = window.URL.createObjectURL(img); 3.拿到流的地址后,先進(jìn)行new Blob進(jìn)行創(chuàng)建一個對象。

    2024年02月13日
    瀏覽(22)
  • 轉(zhuǎn)換流-數(shù)據(jù)流-對象流-打印流-標(biāo)準(zhǔn)輸入輸出流

    把字節(jié)流轉(zhuǎn)換為字符流,轉(zhuǎn)換流是一種處理流。字節(jié)流有亂碼的可能。 假設(shè)input.txt文件中存放了字符串 “abc中國” ,使用字節(jié)流讀取會亂碼,使用字符流讀取是使用平臺默認(rèn)的編碼格式讀取的,如果文本存儲是不是平臺的編碼格式,也會出現(xiàn)亂碼。轉(zhuǎn)換流本質(zhì)上就是加了編

    2024年02月12日
    瀏覽(22)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包