使用Flink實現(xiàn)Kafka到MySQL的數(shù)據(jù)流轉(zhuǎn)換
在現(xiàn)代數(shù)據(jù)處理架構(gòu)中,Kafka和MySQL是兩種非常流行的技術(shù)。Kafka作為一個高吞吐量的分布式消息系統(tǒng),常用于構(gòu)建實時數(shù)據(jù)流管道。而MySQL則是廣泛使用的關(guān)系型數(shù)據(jù)庫,適用于存儲和查詢數(shù)據(jù)。在某些場景下,我們需要將Kafka中的數(shù)據(jù)實時地寫入到MySQL數(shù)據(jù)庫中,本文將介紹如何使用Apache Flink來實現(xiàn)這一過程。
環(huán)境準(zhǔn)備
在開始之前,請確保你的開發(fā)環(huán)境中已經(jīng)安裝并配置了以下組件:
Apache Flink 準(zhǔn)備相關(guān)pom依賴
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>EastMoney</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>1.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
<version>1.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_2.11</artifactId>
<version>1.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala_2.11</artifactId>
<version>1.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>1.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>1.14.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.14.0</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.25</version>
</dependency>
</dependencies>
</project>
Kafka消息隊列
1. 啟動zookeeper
zkServer start
2. 啟動kafka服務(wù)
kafka-server-start /opt/homebrew/etc/kafka/server.properties
3. 創(chuàng)建topic
kafka-topics --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic east_money
6. 生產(chǎn)數(shù)據(jù)
kafka-console-producer --broker-list localhost:9092 --topic east_money
MySQL數(shù)據(jù)庫
初始化mysql表
CREATE TABLE `t_stock_code_price` (
`id` bigint NOT NULL AUTO_INCREMENT,
`code` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '股票代碼',
`name` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_0900_ai_ci NOT NULL COMMENT '股票名稱',
`close` double DEFAULT NULL COMMENT '最新價',
`change_percent` double DEFAULT NULL COMMENT '漲跌幅',
`change` double DEFAULT NULL COMMENT '漲跌額',
`volume` double DEFAULT NULL COMMENT '成交量(手)',
`amount` double DEFAULT NULL COMMENT '成交額',
`amplitude` double DEFAULT NULL COMMENT '振幅',
`turnover_rate` double DEFAULT NULL COMMENT '換手率',
`peration` double DEFAULT NULL COMMENT '市盈率',
`volume_rate` double DEFAULT NULL COMMENT '量比',
`hign` double DEFAULT NULL COMMENT '最高',
`low` double DEFAULT NULL COMMENT '最低',
`open` double DEFAULT NULL COMMENT '今開',
`previous_close` double DEFAULT NULL COMMENT '昨收',
`pb` double DEFAULT NULL COMMENT '市凈率',
`create_time` varchar(64) NOT NULL COMMENT '寫入時間',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=5605 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci
步驟解釋
獲取流執(zhí)行環(huán)境:首先,我們通過StreamExecutionEnvironment.getExecutionEnvironment獲取Flink的流執(zhí)行環(huán)境,并設(shè)置其運(yùn)行模式為流處理模式。
創(chuàng)建流表環(huán)境:接著,我們通過StreamTableEnvironment.create創(chuàng)建一個流表環(huán)境,這個環(huán)境允許我們使用SQL語句來操作數(shù)據(jù)流。
val senv = StreamExecutionEnvironment.getExecutionEnvironment
.setRuntimeMode(RuntimeExecutionMode.STREAMING)
val tEnv = StreamTableEnvironment.create(senv)
定義Kafka數(shù)據(jù)源表:我們使用一個SQL語句創(chuàng)建了一個Kafka表re_stock_code_price_kafka,這個表代表了我們要從Kafka讀取的數(shù)據(jù)結(jié)構(gòu)和連接信息。
tEnv.executeSql(
"CREATE TABLE re_stock_code_price_kafka (" +
"`id` BIGINT," +
"`code` STRING," +
"`name` STRING," +
"`close` DOUBLE NULL," +
"`change_percent` DOUBLE," +
"`change` DOUBLE," +
"`volume` DOUBLE," +
"`amount` DOUBLE," +
"`amplitude` DOUBLE," +
"`turnover_rate` DOUBLE," +
"`operation` DOUBLE," +
"`volume_rate` DOUBLE," +
"`high` DOUBLE ," +
"`low` DOUBLE," +
"`open` DOUBLE," +
"`previous_close` DOUBLE," +
"`pb` DOUBLE," +
"`create_time` STRING," +
"rise int"+
") WITH (" +
"'connector' = 'kafka'," +
"'topic' = 'east_money'," +
"'properties.bootstrap.servers' = '127.0.0.1:9092'," +
"'properties.group.id' = 'mysql2kafka'," +
"'scan.startup.mode' = 'earliest-offset'," +
"'format' = 'csv'," +
"'csv.field-delimiter' = ','" +
")"
)
val result = tEnv.executeSql("select * from re_stock_code_price_kafka")
定義MySQL目標(biāo)表:然后,我們定義了一個MySQL表re_stock_code_price,指定了與MySQL的連接參數(shù)和表結(jié)構(gòu)。
val sink_table: String =
"""
|CREATE TEMPORARY TABLE re_stock_code_price (
| id BIGINT NOT NULL,
| code STRING NOT NULL,
| name STRING NOT NULL,
| `close` DOUBLE,
| change_percent DOUBLE,
| change DOUBLE,
| volume DOUBLE,
| amount DOUBLE,
| amplitude DOUBLE,
| turnover_rate DOUBLE,
| peration DOUBLE,
| volume_rate DOUBLE,
| hign DOUBLE,
| low DOUBLE,
| `open` DOUBLE,
| previous_close DOUBLE,
| pb DOUBLE,
| create_time STRING NOT NULL,
| rise int,
| PRIMARY KEY (id) NOT ENFORCED
|) WITH (
| 'connector' = 'jdbc',
| 'url' = 'jdbc:mysql://localhost:3306/mydb',
| 'driver' = 'com.mysql.cj.jdbc.Driver',
| 'table-name' = 're_stock_code_price',
| 'username' = 'root',
| 'password' = '12345678'
|)
|""".stripMargin
tEnv.executeSql(sink_table)
數(shù)據(jù)轉(zhuǎn)換和寫入:最后,我們執(zhí)行了一個插入操作,將從Kafka讀取的數(shù)據(jù)轉(zhuǎn)換并寫入到MySQL中。文章來源:http://www.zghlxwxcb.cn/news/detail-851655.html
tEnv.executeSql("insert into re_stock_code_price select * from re_stock_code_price_kafka")
result.print()
全部代碼
package org.east
import org.apache.flink.api.common.RuntimeExecutionMode
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment
object Kafka2Mysql {
def main(args: Array[String]): Unit = {
val senv = StreamExecutionEnvironment.getExecutionEnvironment
.setRuntimeMode(RuntimeExecutionMode.STREAMING)
val tEnv = StreamTableEnvironment.create(senv)
tEnv.executeSql(
"CREATE TABLE re_stock_code_price_kafka (" +
"`id` BIGINT," +
"`code` STRING," +
"`name` STRING," +
"`close` DOUBLE NULL," +
"`change_percent` DOUBLE," +
"`change` DOUBLE," +
"`volume` DOUBLE," +
"`amount` DOUBLE," +
"`amplitude` DOUBLE," +
"`turnover_rate` DOUBLE," +
"`operation` DOUBLE," +
"`volume_rate` DOUBLE," +
"`high` DOUBLE ," +
"`low` DOUBLE," +
"`open` DOUBLE," +
"`previous_close` DOUBLE," +
"`pb` DOUBLE," +
"`create_time` STRING," +
"rise int"+
") WITH (" +
"'connector' = 'kafka'," +
"'topic' = 'east_money'," +
"'properties.bootstrap.servers' = '127.0.0.1:9092'," +
"'properties.group.id' = 'mysql2kafka'," +
"'scan.startup.mode' = 'earliest-offset'," +
"'format' = 'csv'," +
"'csv.field-delimiter' = ','" +
")"
)
val result = tEnv.executeSql("select * from re_stock_code_price_kafka")
val sink_table: String =
"""
|CREATE TEMPORARY TABLE re_stock_code_price (
| id BIGINT NOT NULL,
| code STRING NOT NULL,
| name STRING NOT NULL,
| `close` DOUBLE,
| change_percent DOUBLE,
| change DOUBLE,
| volume DOUBLE,
| amount DOUBLE,
| amplitude DOUBLE,
| turnover_rate DOUBLE,
| peration DOUBLE,
| volume_rate DOUBLE,
| hign DOUBLE,
| low DOUBLE,
| `open` DOUBLE,
| previous_close DOUBLE,
| pb DOUBLE,
| create_time STRING NOT NULL,
| rise int,
| PRIMARY KEY (id) NOT ENFORCED
|) WITH (
| 'connector' = 'jdbc',
| 'url' = 'jdbc:mysql://localhost:3306/mydb',
| 'driver' = 'com.mysql.cj.jdbc.Driver',
| 'table-name' = 're_stock_code_price',
| 'username' = 'root',
| 'password' = '12345678'
|)
|""".stripMargin
tEnv.executeSql(sink_table)
tEnv.executeSql("insert into re_stock_code_price select * from re_stock_code_price_kafka")
result.print()
print("數(shù)據(jù)打印完成?。?!")
}
}
如有遇到問題可以找小編溝通交流哦。另外小編幫忙輔導(dǎo)大課作業(yè),學(xué)生畢設(shè)等。不限于python,java,大數(shù)據(jù),模型訓(xùn)練等。文章來源地址http://www.zghlxwxcb.cn/news/detail-851655.html
到了這里,關(guān)于使用Flink實現(xiàn)Kafka到MySQL的數(shù)據(jù)流轉(zhuǎn)換:一個基于Flink的實踐指南的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!