国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

flink:通過table api把文件中讀取的數(shù)據(jù)寫入MySQL

這篇具有很好參考價(jià)值的文章主要介紹了flink:通過table api把文件中讀取的數(shù)據(jù)寫入MySQL。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

當(dāng)寫入數(shù)據(jù)到外部數(shù)據(jù)庫時(shí),F(xiàn)link 會(huì)使用 DDL 中定義的主鍵。如果定義了主鍵,則連接器將以 upsert 模式工作,否則連接器將以 append 模式工作

package cn.edu.tju.demo2;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.*;
import org.apache.flink.types.Row;

public class Test41 {
    //demo 是MySQL中已經(jīng)創(chuàng)建好的表
    //create table demo (userId varchar(50) not null,total bigint,avgVal double);
    private static String FILE_PATH = "info.txt";
    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);



        tableEnv.connect(new FileSystem().path(FILE_PATH))
                .withFormat(new Csv())
                .withSchema(new Schema()
                        .field("userId", DataTypes.VARCHAR(50))
                        .field("ts", DataTypes.INT())
                        .field("val", DataTypes.DOUBLE()))
                .createTemporaryTable("input");



        Table dataTable = tableEnv.from("input");
        Table aggregateTable = dataTable
                .groupBy("userId")
                .select("userId, userId.count as total, val.avg as avgVal");


        String sql=

                "create table jdbcOutputTable (" +

                        " userId varchar(50) not null,total bigint,avgVal double " +

                        ") with (" +

                        " 'connector.type' = 'jdbc', " +

                        " 'connector.url' = 'jdbc:mysql://xx.xx.xx.xx:3306/test', " +

                        " 'connector.table' = 'demo', " +

                        " 'connector.driver' = 'com.mysql.jdbc.Driver', " +

                        " 'connector.username' = 'root', " +

                        " 'connector.password' = 123456' )";

        tableEnv.sqlUpdate(sql);

        aggregateTable.insertInto("jdbcOutputTable");




        tableEnv.execute("my job");

    }
}

文件info.txt文章來源地址http://www.zghlxwxcb.cn/news/detail-840347.html

user1,1680000890,31.6
user2,1681111900,38.3
user1,1680000890,34.9

到了這里,關(guān)于flink:通過table api把文件中讀取的數(shù)據(jù)寫入MySQL的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • Flink 讀寫MySQL數(shù)據(jù)(DataStream和Table API)

    Flink 讀寫MySQL數(shù)據(jù)(DataStream和Table API)

    Flink提供了基于JDBC的方式,可以將讀取到的數(shù)據(jù)寫入到MySQL中;本文通過兩種方式將數(shù)據(jù)下入到MySQL數(shù)據(jù)庫,其他的基于JDBC的數(shù)據(jù)庫類似,另外,Table API方式的Catalog指定為Hive Catalog方式,持久化DDL操作。 另外,JDBC 連接器允許使用 JDBC 驅(qū)動(dòng)程序從任何關(guān)系數(shù)據(jù)庫讀取數(shù)據(jù)并將

    2023年04月09日
    瀏覽(32)
  • Python讀取寫入數(shù)據(jù)到Excel文件

    Python讀取寫入數(shù)據(jù)到Excel文件

    【Linux干貨教程】Ubuntu Linux 換源詳細(xì)教程 大家好,我是洲洲,歡迎關(guān)注,一個(gè)愛聽周杰倫的程序員。關(guān)注公眾號(hào)【程序員洲洲】即可獲得10G學(xué)習(xí)資料、面試筆記、大廠獨(dú)家學(xué)習(xí)體系路線等…還可以加入技術(shù)交流群歡迎大家在CSDN后臺(tái)私信我! Hello,各位看官老爺們好,洲洲已

    2024年02月12日
    瀏覽(97)
  • qt學(xué)習(xí):json數(shù)據(jù)文件讀取寫入

    目錄 什么是json 基本格式 例子? 解析json文件數(shù)據(jù)到界面上 組合json數(shù)據(jù)文檔對(duì)象 json是一種輕量級(jí)的數(shù)據(jù)交互格式,簡(jiǎn)單來說,json就是一種在各個(gè)編程語言中流通的數(shù)據(jù)格式,負(fù)責(zé)不同編程語言中的數(shù)據(jù)傳遞和交互 以鍵值對(duì)的形式存放 鍵-----字符串 值------基本數(shù)據(jù)類型,字

    2024年01月24日
    瀏覽(23)
  • Flink將數(shù)據(jù)寫入CSV文件后文件中沒有數(shù)據(jù)

    Flink將數(shù)據(jù)寫入CSV文件后文件中沒有數(shù)據(jù)

    Flink中有一個(gè)過時(shí)的 sink 方法: writeAsCsv ,這個(gè)方法是將數(shù)據(jù)寫入 CSV 文件中,有時(shí)候我們會(huì)發(fā)現(xiàn)程序啟動(dòng)后,打開文件查看沒有任何數(shù)據(jù),日志信息中也沒有任何報(bào)錯(cuò),這里我們結(jié)合源碼分析一下這個(gè)原因. 這里先看一下數(shù)據(jù)處理的代碼 代碼中我是使用的自定義數(shù)據(jù)源生產(chǎn)數(shù)據(jù)的方式

    2024年02月16日
    瀏覽(14)
  • Flink之FileSink將數(shù)據(jù)寫入parquet文件

    Flink之FileSink將數(shù)據(jù)寫入parquet文件

    在使用FileSink將數(shù)據(jù)寫入列式存儲(chǔ)文件中時(shí)必須使用 forBulkFormat ,列式存儲(chǔ)文件如 ORCFile 、 ParquetFile ,這里就以 ParquetFile 為例結(jié)合代碼進(jìn)行說明. 在Flink 1.15.3 中是通過構(gòu)造 ParquetWriterFactory 然后調(diào)用 forBulkFormat 方法將構(gòu)造好的 ParquetWriterFactory 傳入,這里先講一下構(gòu)造 ParquetWriterF

    2024年02月03日
    瀏覽(18)
  • 24、Flink 的table api與sql之Catalogs(java api操作數(shù)據(jù)庫、表)-2

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月04日
    瀏覽(26)
  • Python處理xlsx文件(讀取、轉(zhuǎn)為列表、新建、寫入數(shù)據(jù)、保存)

    xlsxwriter**庫對(duì)于xslx表的列數(shù)不做限制, xlrd 庫不能寫入超過65535行,256列的數(shù)據(jù)。 由于需要處理的數(shù)據(jù)行列數(shù)較多,遇到報(bào)錯(cuò)才發(fā)現(xiàn)庫的限制問題,記錄一下。

    2024年02月12日
    瀏覽(89)
  • 【flink番外篇】15、Flink維表實(shí)戰(zhàn)之6種實(shí)現(xiàn)方式-通過Temporal table實(shí)現(xiàn)維表數(shù)據(jù)join

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年01月20日
    瀏覽(28)
  • R語言【utils】——write.table(),write.csv(),write.csv2():將數(shù)據(jù)寫入文件

    Package? utils ?version 4.2.0 參數(shù)【x】 :要寫入的對(duì)象,最好是矩陣或數(shù)據(jù)幀。如果不是,則嘗試將其強(qiáng)制轉(zhuǎn)換為數(shù)據(jù)幀。 參數(shù)【file】 :命名文件的字符串或打開用于寫入的連接?!啊北硎鞠蚩刂婆_(tái)輸出。 參數(shù)【append】 :邏輯值。只有當(dāng) 參數(shù)【file】 是一個(gè)字符串時(shí)才相關(guān)。

    2024年01月22日
    瀏覽(19)
  • 17、Flink 之Table API: Table API 支持的操作(1)

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月03日
    瀏覽(16)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包