當(dāng)寫入數(shù)據(jù)到外部數(shù)據(jù)庫時(shí),F(xiàn)link 會(huì)使用 DDL 中定義的主鍵。如果定義了主鍵,則連接器將以 upsert 模式工作,否則連接器將以 append 模式工作文章來源:http://www.zghlxwxcb.cn/news/detail-840347.html
package cn.edu.tju.demo2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.*;
import org.apache.flink.types.Row;
public class Test41 {
//demo 是MySQL中已經(jīng)創(chuàng)建好的表
//create table demo (userId varchar(50) not null,total bigint,avgVal double);
private static String FILE_PATH = "info.txt";
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.connect(new FileSystem().path(FILE_PATH))
.withFormat(new Csv())
.withSchema(new Schema()
.field("userId", DataTypes.VARCHAR(50))
.field("ts", DataTypes.INT())
.field("val", DataTypes.DOUBLE()))
.createTemporaryTable("input");
Table dataTable = tableEnv.from("input");
Table aggregateTable = dataTable
.groupBy("userId")
.select("userId, userId.count as total, val.avg as avgVal");
String sql=
"create table jdbcOutputTable (" +
" userId varchar(50) not null,total bigint,avgVal double " +
") with (" +
" 'connector.type' = 'jdbc', " +
" 'connector.url' = 'jdbc:mysql://xx.xx.xx.xx:3306/test', " +
" 'connector.table' = 'demo', " +
" 'connector.driver' = 'com.mysql.jdbc.Driver', " +
" 'connector.username' = 'root', " +
" 'connector.password' = 123456' )";
tableEnv.sqlUpdate(sql);
aggregateTable.insertInto("jdbcOutputTable");
tableEnv.execute("my job");
}
}
文件info.txt文章來源地址http://www.zghlxwxcb.cn/news/detail-840347.html
user1,1680000890,31.6
user2,1681111900,38.3
user1,1680000890,34.9
到了這里,關(guān)于flink:通過table api把文件中讀取的數(shù)據(jù)寫入MySQL的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!