国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Flink 讀寫MySQL數(shù)據(jù)(DataStream和Table API)

這篇具有很好參考價值的文章主要介紹了Flink 讀寫MySQL數(shù)據(jù)(DataStream和Table API)。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

Flink提供了基于JDBC的方式,可以將讀取到的數(shù)據(jù)寫入到MySQL中;本文通過兩種方式將數(shù)據(jù)下入到MySQL數(shù)據(jù)庫,其他的基于JDBC的數(shù)據(jù)庫類似,另外,Table API方式的Catalog指定為Hive Catalog方式,持久化DDL操作。

另外,JDBC 連接器允許使用 JDBC 驅動程序從任何關系數(shù)據(jù)庫讀取數(shù)據(jù)并將數(shù)據(jù)寫入其中。 本文檔介紹如何設置 JDBC 連接器以針對關系數(shù)據(jù)庫運行 SQL 查詢。

如果 DDL 上定義了主鍵,則 JDBC sink 以 upsert 模式與外部系統(tǒng)交換 UPDATE/DELETE 消息,否則,它以 append 模式運行,不支持消費 UPDATE/DELETE 消息。

默認提供 exactly-once的保證。

MySQL配置

MySQL中創(chuàng)建表Events數(shù)據(jù)表

-- flink.events definition

CREATE TABLE `events` (
  `user` varchar(100) DEFAULT NULL,
  `url` varchar(200) DEFAULT NULL,
  `timestamp` bigint(20) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

插入幾條數(shù)據(jù)

INSERT INTO flink.events
(`user`, url, `timestamp`)
VALUES('Alice', './home', 1000);

Flink 讀寫MySQL數(shù)據(jù)(DataStream和Table API)

Maven依賴,包含了Hive Catalog的相關依賴
?



    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <flink.version>1.14.4</flink.version>
        <scala.binary.version>2.12</scala.binary.version>
        <hadoop.version>3.1.2</hadoop.version>
        <hive.version>3.1.2</hive.version>
    </properties>


    <dependencies>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.12</artifactId>
            <version>${flink.version}</version>
            <!--            <scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>


        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.49</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>


        <!-- Flink 的 Hive 連接器-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-hive_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!-- Hive 依賴 -->
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>${hive.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.calcite</groupId>
                    <artifactId>*</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <!-- Hive 依賴 -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <!-- Hive 依賴 -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>${hadoop.version}</version>
        </dependency>
        <!-- Hive 依賴 -->
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>${hadoop.version}</version>
        </dependency>


    </dependencies>

DataStream方式讀寫MySQL數(shù)據(jù)

import org.apache.flink.connector.jdbc.JdbcConnectionOptions;
import org.apache.flink.connector.jdbc.JdbcExecutionOptions;
import org.apache.flink.connector.jdbc.JdbcSink;
import org.apache.flink.connector.jdbc.JdbcStatementBuilder;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.sql.PreparedStatement;
import java.sql.SQLException;

public class JdbcSinkExample {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator<Event> eventStream = env.fromElements(
                new Event("Alice", "./home", 1000L),
                new Event("Bob", "./cart", 2000L),
                new Event("Alice", "./prod?id=1", 5 * 1000L),
                new Event("Cary", "./home", 60 * 1000L),
                new Event("Bob", "./prod?id=3", 90 * 1000L),
                new Event("Alice", "./prod?id=1", 105 * 1000L)
        );


        eventStream.addSink(JdbcSink.sink(
                "insert into events (user, url, timestamp) values (?, ?, ?)",
                new JdbcStatementBuilder<Event>() {
                    @Override
                    public void accept(PreparedStatement preparedStatement, Event event) throws SQLException {
                        preparedStatement.setString(1, event.user);
                        preparedStatement.setString(2, event.url);
                        preparedStatement.setLong(3, event.timestamp);
                    }
                },
                JdbcExecutionOptions.builder()
                        .withBatchSize(1000)
                        .withBatchIntervalMs(200)
                        .withMaxRetries(5)
                        .build(),
                new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
                        .withUrl("jdbc:mysql://127.0.0.1:3306/flink")
                        .withDriverName("com.mysql.jdbc.Driver")
                        .withUsername("root")
                        .withPassword("xxx")
                        .build()
        ));

        env.execute();
    }
}

Table API的方式讀寫MySQL,其中Flink的Catalog使用Hive Catalog的方式。熟悉 Flink 或者 Spark 等大數(shù)據(jù)引擎的同學應該都知道這兩個計算引擎都有一個共同的組件叫 Catalog。下面是 Flink 的 Catalog 的官方定義。

Catalog 提供了元數(shù)據(jù)信息,例如數(shù)據(jù)庫、表、分區(qū)、視圖以及數(shù)據(jù)庫或其他外部系統(tǒng)中存儲的函數(shù)和信息。

Table API與SQL API實現(xiàn)了Apache Flink的批流統(tǒng)一的實現(xiàn)方式。Table API與SQL API的核心概念就是TableEnviroment。TableEnviroment對象提供方法注冊數(shù)據(jù)源與數(shù)據(jù)表信息。那么數(shù)據(jù)源與數(shù)據(jù)表的信息則存儲在CataLog中。所以,CataLog是TableEnviroment的重要組成部分。

Apache Flink在獲取TableEnviroment對象后,可以通過Register實現(xiàn)對數(shù)據(jù)源與數(shù)據(jù)表進行注冊。注冊完成后數(shù)據(jù)庫與數(shù)據(jù)表的原信息則存儲在CataLog中。CataLog中保存了所有的表結構信息、數(shù)據(jù)目錄信息等。

簡單來說,Catalog 就是元數(shù)據(jù)管理中心,其中元數(shù)據(jù)包括數(shù)據(jù)庫、表、表結構等信息。

public class JDBCTable {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        SingleOutputStreamOperator<Event> eventStream = env.fromElements(
                new Event("Alice", "./home", 1000L),
                new Event("Bob", "./cart", 2000L),
                new Event("Alice", "./prod?id=1", 5 * 1000L),
                new Event("Cary", "./home", 60 * 1000L),
                new Event("Bob", "./prod?id=3", 90 * 1000L),
                new Event("Alice", "./prod?id=1", 105 * 1000L)
        );

        //獲取表環(huán)境
        //StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);
        EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().build();
        TableEnvironment tableEnv = TableEnvironment.create(settings);

        String name = "myhive";
        String defaultDatabase = "default";
        String hiveConfDir = "/opt/hive";

        // 創(chuàng)建一個 HiveCatalog,并在表環(huán)境中注冊
        HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
        tableEnv.registerCatalog("myhive", hive);

        // 使用 HiveCatalog 作為當前會話的 catalog
        tableEnv.useCatalog("myhive");


        TableResult tableResult = tableEnv.executeSql("CREATE TABLE IF NOT EXISTS EventTable (\n" +
                "`user` STRING,\n" +
                "url STRING,\n" +
                "`timestamp` BIGINT\n" +
                ") WITH (\n" +
                "'connector' = 'jdbc',\n" +
                "'url' = 'jdbc:mysql://127.0.0.1:3306/flink',\n" +
                "'table-name' = 'events',\n" +
                "'username'='root',\n" +
                "'password'='xxx'\n" +
                ")");
        tableEnv.executeSql("insert into EventTable values('Alice','./prod?id=3',106000)");
    }

MySQL中的數(shù)據(jù)

Flink 讀寫MySQL數(shù)據(jù)(DataStream和Table API)文章來源地址http://www.zghlxwxcb.cn/news/detail-406004.html

到了這里,關于Flink 讀寫MySQL數(shù)據(jù)(DataStream和Table API)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。如若轉載,請注明出處: 如若內(nèi)容造成侵權/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領支付寶紅包贊助服務器費用

相關文章

  • flink:通過table api把文件中讀取的數(shù)據(jù)寫入MySQL

    當寫入數(shù)據(jù)到外部數(shù)據(jù)庫時,F(xiàn)link 會使用 DDL 中定義的主鍵。如果定義了主鍵,則連接器將以 upsert 模式工作,否則連接器將以 append 模式工作 文件info.txt

    2024年03月15日
    瀏覽(18)
  • 大數(shù)據(jù)學習之Flink算子、了解DataStream API(基礎篇一)

    大數(shù)據(jù)學習之Flink算子、了解DataStream API(基礎篇一)

    注: 本文只涉及DataStream 原因:隨著大數(shù)據(jù)和流式計算需求的增長,處理實時數(shù)據(jù)流變得越來越重要。因此,DataStream由于其處理實時數(shù)據(jù)流的特性和能力,逐漸替代了DataSet成為了主流的數(shù)據(jù)處理方式。 目錄 DataStream API (基礎篇) 前摘: 一、執(zhí)行環(huán)境 1. 創(chuàng)建執(zhí)行環(huán)境 2. 執(zhí)

    2024年01月23日
    瀏覽(27)
  • 16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及JDBC示例(4)

    16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及JDBC示例(4)

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識點,并輔以具體的示例進行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關基礎內(nèi)容。 2、Flink基礎系列 本部分介紹Flink 的基礎部分,比如術語、架構、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月10日
    瀏覽(27)
  • 16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及FileSystem示例(1)

    16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及FileSystem示例(1)

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識點,并輔以具體的示例進行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關基礎內(nèi)容。 2、Flink基礎系列 本部分介紹Flink 的基礎部分,比如術語、架構、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月12日
    瀏覽(21)
  • 16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及Elasticsearch示例(2)

    16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及Elasticsearch示例(2)

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識點,并輔以具體的示例進行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關基礎內(nèi)容。 2、Flink基礎系列 本部分介紹Flink 的基礎部分,比如術語、架構、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月11日
    瀏覽(23)
  • 【flink番外篇】16、DataStream 和 Table 相互轉換示例

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識點,并輔以具體的示例進行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關基礎內(nèi)容。 2、Flink基礎系列 本部分介紹Flink 的基礎部分,比如術語、架構、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年01月17日
    瀏覽(27)
  • 16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及Apache Hive示例(6)

    16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及Apache Hive示例(6)

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識點,并輔以具體的示例進行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關基礎內(nèi)容。 2、Flink基礎系列 本部分介紹Flink 的基礎部分,比如術語、架構、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月11日
    瀏覽(26)
  • 16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及Apache Kafka示例(3)

    16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及Apache Kafka示例(3)

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識點,并輔以具體的示例進行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關基礎內(nèi)容。 2、Flink基礎系列 本部分介紹Flink 的基礎部分,比如術語、架構、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月10日
    瀏覽(30)
  • Flink DataStream API詳解

    參考:https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/datastream_api.html Data Sources Source是程序讀取其輸入的位置,您可以使用 env.addSource(sourceFunction) 將Source附加到程序中。Flink內(nèi)置了許多預先實現(xiàn)的SourceFunction,但是您始終可以通過實現(xiàn)SourceFunction(non-parallel sources)來編寫自定

    2024年02月14日
    瀏覽(51)
  • Flink學習——DataStream API

    Flink學習——DataStream API

    ? ? ? ? 一個flink程序,其實就是對DataStream的各種轉換。具體可以分成以下幾個部分: 獲取執(zhí)行環(huán)境(Execution Environment) 讀取數(shù)據(jù)源(Source) 定義基于數(shù)據(jù)的轉換操作(Transformations) 定義計算結果的輸出位置(Sink) 觸發(fā)程序執(zhí)行(Execute) ? ? ? ? flink 程序可以在各種上

    2024年02月05日
    瀏覽(22)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領取紅包,優(yōu)惠每天領

二維碼1

領取紅包

二維碼2

領紅包