1、以flinksql的方式直接提交任務(wù)
此方式使用起來相對比較簡單,但是無法滿足需要設(shè)置savepoint暫存點的流式任務(wù)需求。
使用此方式需要先創(chuàng)建Flink遠(yuǎn)方的執(zhí)行環(huán)境,然后按序執(zhí)行FlinkSql,流程如下:
java示例如下:
package com.xw.flink;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.TableEnvironment;
public class testSqlServer {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("192.168.1.88",18082);
TableEnvironment tableEnv = TableEnvironment.create(env.getConfiguration());
Configuration configuration = tableEnv.getConfig().getConfiguration();
//任務(wù)名稱設(shè)定
configuration.setString("pipeline.name","sqlserver");
String sourceDDL = "CREATE TABLE Orders (f1 STRING,f2 STRING,f3 STRING) WITH ( " +
" 'connector' = 'jdbc', " +
" 'driver'='com.microsoft.sqlserver.jdbc.SQLServerDriver', " +
" 'url'='jdbc:sqlserver://192.168.1.40:1433;databaseName=test;useLOBs=false', " +
" 'table-name'='test_czd1', " +
" 'username'='root', " +
" 'password'='root'" +
")";
tableEnv.executeSql(sourceDDL);
String rtLog = "CREATE TABLE logs (node_id STRING, send_count BIGINT,PRIMARY KEY(node_id) NOT ENFORCED) WITH ( " +
" 'connector' = 'jdbc', " +
" 'driver'='com.mysql.cj.jdbc.Driver', " +
" 'url'='jdbc:mysql://192.168.0.68:3306/testDB?rewriteBatchedStatements=true&useUnicode=true&characterEncoding=utf8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&serverTimezone=GMT%2B8', " +
" 'table-name'='rt_log', " +
" 'username'='root', " +
" 'password'='root'," +
" 'sink.buffer-flush.max-rows' = '20000'," +
" 'sink.buffer-flush.interval' = '3000'" +
")";
tableEnv.executeSql(rtLog);
String sql = "insert into logs(node_id) select f1 from Orders limit 5";
tableEnv.executeSql(sql);
}
}
2、以任務(wù)jar的方式上傳任務(wù)
此方式主要通過用java編寫一個任務(wù),然后打成jar的形式上傳到flink集群。此方式比較靈活,可以精確控制任務(wù)的算子。但是對于現(xiàn)場的運維來說是一個比較困難的問題,因為要求運維人員需要有代碼開發(fā)的能力。
java實現(xiàn)示例:
public class testSqlServer {
public static void main(String[] args) throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
TableEnvironment tableEnv = TableEnvironment.create(env.getConfiguration());
Configuration configuration = tableEnv.getConfig().getConfiguration();
//任務(wù)名稱設(shè)定
configuration.setString("pipeline.name","sqlserver");
//todo 此部分可以是Flink-table 也可以是Flinksql
}
}
然后將其打成jar包,然后上傳到flink
填寫class的全路徑和并行度即可執(zhí)行。
3、以Rest API方式進(jìn)行提交
此方式綜合了flinksql和flinkjar的兩種形式,你也以在遠(yuǎn)方編寫flinksql,然后通過調(diào)用API的形式將FlinkSql和參數(shù)發(fā)送到flink集群上可執(zhí)行的jar。jar拿到參數(shù)組裝flink任務(wù)并提交。
Rest API官網(wǎng)可參考REST API | Apache Flink文章來源:http://www.zghlxwxcb.cn/news/detail-858215.html
java編寫一個接受參數(shù)的jar,模版可參考文章來源地址http://www.zghlxwxcb.cn/news/detail-858215.html
public class SqlTemplate {
public static void main(String[] args) throws Exception {
ParameterTool parameters = ParameterTool.fromArgs(args);//獲取傳遞的參數(shù)
String arg = parameters.get("arg",null);
if(arg == null){
return ;
}
arg = URLDecoder.decode(arg, StandardCharsets.UTF_8.toString());//URLDecoder解碼
String[] programArgs = arg.split("\\|\\|");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//設(shè)置重啟策略,最多重啟三次,每次間隔5秒鐘
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.seconds(5)
));
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
Configuration configuration = tableEnv.getConfig().getConfiguration();
//任務(wù)名稱設(shè)定
configuration.setString("pipeline.name",programArgs[0]);
// 任務(wù)并行度設(shè)定
env.setParallelism(Integer.parseInt(programArgs[1]));
//任務(wù)類型,流式任務(wù)強制開啟checkpoint
if("stream".equals(programArgs[2])){
//檢查點設(shè)定
if(!StringUtils.isNullOrWhitespaceOnly(programArgs[3])){
CheckPoint cp = JSON.parseObject(programArgs[3],CheckPoint.class);
//開啟檢查點
if(cp.getEnable()){
//開啟檢查點,1S一次
env.enableCheckpointing(cp.getCheckpointInterval());
//檢查點策略 EXACTLY_ONCE 精準(zhǔn)一次 AT_LEAST_ONCE至少一次
env.getCheckpointConfig().setCheckpointingMode(cp.getCheckPointingMode()==1?CheckpointingMode.EXACTLY_ONCE:CheckpointingMode.AT_LEAST_ONCE);
Checkpoint 必須在一分鐘內(nèi)完成,否則就會被拋棄
env.getCheckpointConfig().setCheckpointTimeout(cp.getCheckpointTimeout());
同一時間只允許一個 checkpoint 進(jìn)行
env.getCheckpointConfig().setMaxConcurrentCheckpoints(cp.getMaxConcurrentCheckpoints());
//設(shè)置檢查點保存位置
env.getCheckpointConfig().setCheckpointStorage(cp.getCheckpointDirectory());
//開啟實驗性的 unaligned checkpoints
if(cp.getUnalignedCheckpointsEnabled()){
env.getCheckpointConfig().enableUnalignedCheckpoints();
}
}
}else{//開啟默認(rèn)配置
//開啟檢查點,5S一次
env.enableCheckpointing(5000);
//檢查點策略 EXACTLY_ONCE 精準(zhǔn)一次 AT_LEAST_ONCE至少一次
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
Checkpoint 必須在五分鐘內(nèi)完成,否則就會被拋棄
env.getCheckpointConfig().setCheckpointTimeout(300000);
同一時間只允許一個 checkpoint 進(jìn)行
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
//開啟實驗性的 unaligned checkpoints
env.getCheckpointConfig().enableUnalignedCheckpoints();
}
}
//可執(zhí)行的SQL單節(jié)點執(zhí)行
String sql = programArgs[4];
//特殊符號在連接器里都會被使用,采用雙特殊符號進(jìn)行分割
String[] sqlExecu = sql.split(";;");
List<String> create = new ArrayList<>();
List<String> insert = new ArrayList<>();
for (String script : sqlExecu) {
if(!script.startsWith("insert") && !script.startsWith("INSERT")){
create.add(script);
}else{
insert.add(script);
}
}
//可執(zhí)行的SQL單節(jié)點執(zhí)行
create.forEach(tableEnv::executeSql);
// 運行多條 INSERT 語句,將原表數(shù)據(jù)輸出到多個結(jié)果表中
StatementSet stmtSet = tableEnv.createStatementSet();
insert.forEach(stmtSet::addInsertSql);
//開始執(zhí)行任務(wù)
TableResult execute = stmtSet.execute();
}
到了這里,關(guān)于flink的常見的任務(wù)提交方式的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!