Flink提供了基于JDBC的方式,可以將讀取到的數(shù)據(jù)寫入到MySQL中;本文通過兩種方式將數(shù)據(jù)下入到MySQL數(shù)據(jù)庫,其他的基于JDBC的數(shù)據(jù)庫類似,另外,Table API方式的Catalog指定為Hive Catalog方式,持久化DDL操作。
另外,JDBC 連接器允許使用 JDBC 驅動程序從任何關系數(shù)據(jù)庫讀取數(shù)據(jù)并將數(shù)據(jù)寫入其中。 本文檔介紹如何設置 JDBC 連接器以針對關系數(shù)據(jù)庫運行 SQL 查詢。
如果 DDL 上定義了主鍵,則 JDBC sink 以 upsert 模式與外部系統(tǒng)交換 UPDATE/DELETE 消息,否則,它以 append 模式運行,不支持消費 UPDATE/DELETE 消息。
默認提供 exactly-once的保證。
MySQL配置
MySQL中創(chuàng)建表Events數(shù)據(jù)表
-- flink.events definition
CREATE TABLE `events` (
`user` varchar(100) DEFAULT NULL,
`url` varchar(200) DEFAULT NULL,
`timestamp` bigint(20) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
插入幾條數(shù)據(jù)
INSERT INTO flink.events
(`user`, url, `timestamp`)
VALUES('Alice', './home', 1000);
Maven依賴,包含了Hive Catalog的相關依賴
?
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.14.4</flink.version>
<scala.binary.version>2.12</scala.binary.version>
<hadoop.version>3.1.2</hadoop.version>
<hive.version>3.1.2</hive.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.49</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Flink 的 Hive 連接器-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Hive 依賴 -->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
<exclusions>
<exclusion>
<groupId>org.apache.calcite</groupId>
<artifactId>*</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- Hive 依賴 -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!-- Hive 依賴 -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>${hadoop.version}</version>
</dependency>
<!-- Hive 依賴 -->
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${hadoop.version}</version>
</dependency>
</dependencies>
DataStream方式讀寫MySQL數(shù)據(jù)
import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.sql.PreparedStatement;
import java.sql.SQLException;
public class JdbcSinkExample {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<Event> eventStream = env.fromElements(
new Event("Alice", "./home", 1000L),
new Event("Bob", "./cart", 2000L),
new Event("Alice", "./prod?id=1", 5 * 1000L),
new Event("Cary", "./home", 60 * 1000L),
new Event("Bob", "./prod?id=3", 90 * 1000L),
new Event("Alice", "./prod?id=1", 105 * 1000L)
);
eventStream.addSink(JdbcSink.sink(
"insert into events (user, url, timestamp) values (?, ?, ?)",
new JdbcStatementBuilder<Event>() {
@Override
public void accept(PreparedStatement preparedStatement, Event event) throws SQLException {
preparedStatement.setString(1, event.user);
preparedStatement.setString(2, event.url);
preparedStatement.setLong(3, event.timestamp);
}
},
JdbcExecutionOptions.builder()
.withBatchSize(1000)
.withBatchIntervalMs(200)
.withMaxRetries(5)
.build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:mysql://127.0.0.1:3306/flink")
.withDriverName("com.mysql.jdbc.Driver")
.withUsername("root")
.withPassword("xxx")
.build()
));
env.execute();
}
}
Table API的方式讀寫MySQL,其中Flink的Catalog使用Hive Catalog的方式。熟悉 Flink 或者 Spark 等大數(shù)據(jù)引擎的同學應該都知道這兩個計算引擎都有一個共同的組件叫 Catalog。下面是 Flink 的 Catalog 的官方定義。
Catalog 提供了元數(shù)據(jù)信息,例如數(shù)據(jù)庫、表、分區(qū)、視圖以及數(shù)據(jù)庫或其他外部系統(tǒng)中存儲的函數(shù)和信息。
Table API與SQL API實現(xiàn)了Apache Flink的批流統(tǒng)一的實現(xiàn)方式。Table API與SQL API的核心概念就是TableEnviroment。TableEnviroment對象提供方法注冊數(shù)據(jù)源與數(shù)據(jù)表信息。那么數(shù)據(jù)源與數(shù)據(jù)表的信息則存儲在CataLog中。所以,CataLog是TableEnviroment的重要組成部分。”
Apache Flink在獲取TableEnviroment對象后,可以通過Register實現(xiàn)對數(shù)據(jù)源與數(shù)據(jù)表進行注冊。注冊完成后數(shù)據(jù)庫與數(shù)據(jù)表的原信息則存儲在CataLog中。CataLog中保存了所有的表結構信息、數(shù)據(jù)目錄信息等。
簡單來說,Catalog 就是元數(shù)據(jù)管理中心,其中元數(shù)據(jù)包括數(shù)據(jù)庫、表、表結構等信息。
public class JDBCTable {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
SingleOutputStreamOperator<Event> eventStream = env.fromElements(
new Event("Alice", "./home", 1000L),
new Event("Bob", "./cart", 2000L),
new Event("Alice", "./prod?id=1", 5 * 1000L),
new Event("Cary", "./home", 60 * 1000L),
new Event("Bob", "./prod?id=3", 90 * 1000L),
new Event("Alice", "./prod?id=1", 105 * 1000L)
);
//獲取表環(huán)境
//StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();
TableEnvironment tableEnv = TableEnvironment.create(settings);
String name = "myhive";
String defaultDatabase = "default";
String hiveConfDir = "/opt/hive";
// 創(chuàng)建一個 HiveCatalog,并在表環(huán)境中注冊
HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
tableEnv.registerCatalog("myhive", hive);
// 使用 HiveCatalog 作為當前會話的 catalog
tableEnv.useCatalog("myhive");
TableResult tableResult = tableEnv.executeSql("CREATE TABLE IF NOT EXISTS EventTable (\n" +
"`user` STRING,\n" +
"url STRING,\n" +
"`timestamp` BIGINT\n" +
") WITH (\n" +
"'connector' = 'jdbc',\n" +
"'url' = 'jdbc:mysql://127.0.0.1:3306/flink',\n" +
"'table-name' = 'events',\n" +
"'username'='root',\n" +
"'password'='xxx'\n" +
")");
tableEnv.executeSql("insert into EventTable values('Alice','./prod?id=3',106000)");
}
MySQL中的數(shù)據(jù)文章來源:http://www.zghlxwxcb.cn/news/detail-406004.html
文章來源地址http://www.zghlxwxcb.cn/news/detail-406004.html
到了這里,關于Flink 讀寫MySQL數(shù)據(jù)(DataStream和Table API)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!