国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

flink的常見的任務(wù)提交方式

這篇具有很好參考價值的文章主要介紹了flink的常見的任務(wù)提交方式。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

1、以flinksql的方式直接提交任務(wù)

此方式使用起來相對比較簡單,但是無法滿足需要設(shè)置savepoint暫存點的流式任務(wù)需求。

使用此方式需要先創(chuàng)建Flink遠(yuǎn)方的執(zhí)行環(huán)境,然后按序執(zhí)行FlinkSql,流程如下:

flink任務(wù)提交方式,開發(fā),flink,大數(shù)據(jù)

java示例如下:

package com.xw.flink;

import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.api.TableEnvironment;

public class testSqlServer {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment("192.168.1.88",18082);
        TableEnvironment tableEnv = TableEnvironment.create(env.getConfiguration());
        Configuration configuration = tableEnv.getConfig().getConfiguration();
        //任務(wù)名稱設(shè)定
        configuration.setString("pipeline.name","sqlserver");
        String sourceDDL = "CREATE TABLE Orders (f1 STRING,f2 STRING,f3 STRING) WITH ( " +
                " 'connector' = 'jdbc',  " +
                " 'driver'='com.microsoft.sqlserver.jdbc.SQLServerDriver',  " +
                " 'url'='jdbc:sqlserver://192.168.1.40:1433;databaseName=test;useLOBs=false',  " +
                " 'table-name'='test_czd1',  " +
                " 'username'='root',  " +
                " 'password'='root'" +
                ")";
        tableEnv.executeSql(sourceDDL);
        String rtLog = "CREATE TABLE logs (node_id STRING, send_count BIGINT,PRIMARY KEY(node_id) NOT ENFORCED) WITH ( " +
                " 'connector' = 'jdbc',  " +
                " 'driver'='com.mysql.cj.jdbc.Driver',  " +
                " 'url'='jdbc:mysql://192.168.0.68:3306/testDB?rewriteBatchedStatements=true&useUnicode=true&characterEncoding=utf8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&allowMultiQueries=true&serverTimezone=GMT%2B8',  " +
                " 'table-name'='rt_log',  " +
                " 'username'='root',  " +
                " 'password'='root'," +
                " 'sink.buffer-flush.max-rows' = '20000'," +
                " 'sink.buffer-flush.interval' = '3000'" +
                ")";
        tableEnv.executeSql(rtLog);
        String sql = "insert into logs(node_id) select f1 from Orders limit 5";
        tableEnv.executeSql(sql);
    }
}

2、以任務(wù)jar的方式上傳任務(wù)

此方式主要通過用java編寫一個任務(wù),然后打成jar的形式上傳到flink集群。此方式比較靈活,可以精確控制任務(wù)的算子。但是對于現(xiàn)場的運維來說是一個比較困難的問題,因為要求運維人員需要有代碼開發(fā)的能力。

java實現(xiàn)示例:

public class testSqlServer {
    public static void main(String[] args) throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();
        TableEnvironment tableEnv = TableEnvironment.create(env.getConfiguration());
        Configuration configuration = tableEnv.getConfig().getConfiguration();
        //任務(wù)名稱設(shè)定
        configuration.setString("pipeline.name","sqlserver");
        //todo 此部分可以是Flink-table  也可以是Flinksql
      
    }
}

然后將其打成jar包,然后上傳到flink

flink任務(wù)提交方式,開發(fā),flink,大數(shù)據(jù)

填寫class的全路徑和并行度即可執(zhí)行。

3、以Rest API方式進(jìn)行提交

此方式綜合了flinksql和flinkjar的兩種形式,你也以在遠(yuǎn)方編寫flinksql,然后通過調(diào)用API的形式將FlinkSql和參數(shù)發(fā)送到flink集群上可執(zhí)行的jar。jar拿到參數(shù)組裝flink任務(wù)并提交。

Rest API官網(wǎng)可參考REST API | Apache Flink

java編寫一個接受參數(shù)的jar,模版可參考文章來源地址http://www.zghlxwxcb.cn/news/detail-858215.html

public class SqlTemplate {
    public static void main(String[] args) throws Exception {
        ParameterTool parameters = ParameterTool.fromArgs(args);//獲取傳遞的參數(shù)
        String arg = parameters.get("arg",null);
        if(arg == null){
            return ;
        }
        arg = URLDecoder.decode(arg, StandardCharsets.UTF_8.toString());//URLDecoder解碼
        String[] programArgs =  arg.split("\\|\\|");
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //設(shè)置重啟策略,最多重啟三次,每次間隔5秒鐘
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,                    Time.seconds(5)
        ));
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        Configuration configuration = tableEnv.getConfig().getConfiguration();
        //任務(wù)名稱設(shè)定
        configuration.setString("pipeline.name",programArgs[0]);
        // 任務(wù)并行度設(shè)定
        env.setParallelism(Integer.parseInt(programArgs[1]));
        //任務(wù)類型,流式任務(wù)強制開啟checkpoint
        if("stream".equals(programArgs[2])){
            //檢查點設(shè)定
            if(!StringUtils.isNullOrWhitespaceOnly(programArgs[3])){
                CheckPoint cp = JSON.parseObject(programArgs[3],CheckPoint.class);
                //開啟檢查點
                if(cp.getEnable()){
                    //開啟檢查點,1S一次
                    env.enableCheckpointing(cp.getCheckpointInterval());
                    //檢查點策略 EXACTLY_ONCE 精準(zhǔn)一次  AT_LEAST_ONCE至少一次
                    env.getCheckpointConfig().setCheckpointingMode(cp.getCheckPointingMode()==1?CheckpointingMode.EXACTLY_ONCE:CheckpointingMode.AT_LEAST_ONCE);
                     Checkpoint 必須在一分鐘內(nèi)完成,否則就會被拋棄
                    env.getCheckpointConfig().setCheckpointTimeout(cp.getCheckpointTimeout());
                     同一時間只允許一個 checkpoint 進(jìn)行
                    env.getCheckpointConfig().setMaxConcurrentCheckpoints(cp.getMaxConcurrentCheckpoints());
                    //設(shè)置檢查點保存位置
                    env.getCheckpointConfig().setCheckpointStorage(cp.getCheckpointDirectory());
                    //開啟實驗性的 unaligned checkpoints
                    if(cp.getUnalignedCheckpointsEnabled()){
                        env.getCheckpointConfig().enableUnalignedCheckpoints();
                    }
                }
            }else{//開啟默認(rèn)配置
                //開啟檢查點,5S一次
                env.enableCheckpointing(5000);
                //檢查點策略 EXACTLY_ONCE 精準(zhǔn)一次  AT_LEAST_ONCE至少一次
                env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
                 Checkpoint 必須在五分鐘內(nèi)完成,否則就會被拋棄
                env.getCheckpointConfig().setCheckpointTimeout(300000);
                 同一時間只允許一個 checkpoint 進(jìn)行
                env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
                //開啟實驗性的 unaligned checkpoints
                env.getCheckpointConfig().enableUnalignedCheckpoints();
            }
        }
        //可執(zhí)行的SQL單節(jié)點執(zhí)行
        String sql = programArgs[4];
        //特殊符號在連接器里都會被使用,采用雙特殊符號進(jìn)行分割
        String[] sqlExecu = sql.split(";;");
        List<String> create = new ArrayList<>();
        List<String> insert = new ArrayList<>();
        for (String script : sqlExecu) {
            if(!script.startsWith("insert") && !script.startsWith("INSERT")){
                create.add(script);
            }else{
                insert.add(script);
            }
        }
        //可執(zhí)行的SQL單節(jié)點執(zhí)行
        create.forEach(tableEnv::executeSql);
        // 運行多條 INSERT 語句,將原表數(shù)據(jù)輸出到多個結(jié)果表中
        StatementSet stmtSet = tableEnv.createStatementSet();
        insert.forEach(stmtSet::addInsertSql);
        //開始執(zhí)行任務(wù)
        TableResult execute = stmtSet.execute();
     }

到了這里,關(guān)于flink的常見的任務(wù)提交方式的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • 使用Java代碼遠(yuǎn)程提交flink任務(wù)

    導(dǎo)入依賴 參數(shù)格式參考: { ????\\\"jarPath\\\":\\\"C:\\\\flink-1.13.5\\\\examples\\\\streaming\\\\WordCount.jar\\\", ????\\\"parallelism\\\":1, ????\\\"entryPointClassName\\\":\\\"org.apache.flink.streaming.examples.wordcount.WordCount\\\" }

    2024年02月11日
    瀏覽(22)
  • flink客戶端提交任務(wù)報錯

    { “errors”: [ “org.apache.flink.runtime.rest.handler.RestHandlerException: Could not execute application.ntat org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest KaTeX parse error: Undefined control sequence: n at position 26: …ndler.java:110)?n?tat java.util.… UniHandle.tryFire(CompletableFuture.java:797)ntat j

    2024年02月15日
    瀏覽(95)
  • 采用seatunnel提交Flink和Spark任務(wù)

    seatunnel 是一個非常易用,高性能、支持實時流式和離線批處理的海量數(shù)據(jù)處理產(chǎn)品,架構(gòu)于Apache Spark 和 Apache Flink之上。 seatunnel 讓Spark和Flink的使用更簡單,更高效。 注:當(dāng)前版本用的是2.1.3版本? 如果在github下載自己編譯有問題 可在此地址下載編譯好的文件seatunnel-2.1.3-b

    2024年02月15日
    瀏覽(21)
  • 關(guān)于flink重新提交任務(wù),重復(fù)消費kafka的坑

    關(guān)于flink重新提交任務(wù),重復(fù)消費kafka的坑

    按照以下方式設(shè)置backend目錄和checkpoint目錄,fsbackend目錄有數(shù)據(jù),checkpoint目錄沒數(shù)據(jù) 我以為checkpoint和fsbackend要同時設(shè)置,其實,1.14.3版本,setCheckpointStorage和stateBackend改成了分著設(shè)置 我上邊代碼這樣設(shè)置,相當(dāng)于首先指定了以下checkpoint按照默認(rèn)的backend存儲,然后又指定了按

    2024年02月03日
    瀏覽(23)
  • flinkcdc 3.0 源碼學(xué)習(xí)之任務(wù)提交腳本flink-cdc.sh

    flinkcdc 3.0 源碼學(xué)習(xí)之任務(wù)提交腳本flink-cdc.sh

    大道至簡,用簡單的話來描述復(fù)雜的事,我是Antgeek,歡迎閱讀. 在flink 3.0版本中,我們僅通過一個簡單yaml文件就可以配置出一個復(fù)雜的數(shù)據(jù)同步任務(wù), 然后再來一句 bash bin/flink-cdc.sh mysql-to-doris.yaml 就可以將任務(wù)提交, 本文就是來探索一下這個shell腳本,主要是研究如何通過一個shell命

    2024年02月19日
    瀏覽(23)
  • Flink1.14提交任務(wù)報錯classloader.check-leaked-classloader問題解決

    我的hadoop版本是3.1.3,F(xiàn)link版本是1.14。不知道是hadoop版本的原因還是Flink版本更新的原因。當(dāng)我運行一個簡單的Flink測試時,雖然結(jié)果出來了但是后面還跟著一段報錯信息。 測試命令: flink run -m yarn-cluster -p 2 -yjm 2G -ytm 2G $FLINK_HOME/examples/batch/WordCount.jar 報錯信息: Trying to acce

    2024年02月11日
    瀏覽(26)
  • 20、Flink SQL之SQL Client: 不用編寫代碼就可以嘗試 Flink SQL,可以直接提交 SQL 任務(wù)到集群上

    20、Flink SQL之SQL Client: 不用編寫代碼就可以嘗試 Flink SQL,可以直接提交 SQL 任務(wù)到集群上

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識點,并輔以具體的示例進(jìn)行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月11日
    瀏覽(23)
  • Flink(三)flink重要概念(api分層、角色、執(zhí)行流程、執(zhí)行圖和編程模型)及dataset、datastream詳細(xì)示例入門和提交任務(wù)至on yarn運行

    Flink(三)flink重要概念(api分層、角色、執(zhí)行流程、執(zhí)行圖和編程模型)及dataset、datastream詳細(xì)示例入門和提交任務(wù)至on yarn運行

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識點,并輔以具體的示例進(jìn)行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月16日
    瀏覽(27)
  • 3、flink重要概念(api分層、角色、執(zhí)行流程、執(zhí)行圖和編程模型)及dataset、datastream詳細(xì)示例入門和提交任務(wù)至on yarn運行

    3、flink重要概念(api分層、角色、執(zhí)行流程、執(zhí)行圖和編程模型)及dataset、datastream詳細(xì)示例入門和提交任務(wù)至on yarn運行

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識點,并輔以具體的示例進(jìn)行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月12日
    瀏覽(42)
  • 《Flink學(xué)習(xí)筆記》——第二章 Flink的安裝和啟動、以及應(yīng)用開發(fā)和提交

    《Flink學(xué)習(xí)筆記》——第二章 Flink的安裝和啟動、以及應(yīng)用開發(fā)和提交

    ? 介紹Flink的安裝、啟動以及如何進(jìn)行Flink程序的開發(fā),如何運行部署Flink程序等 2.1 Flink的安裝和啟動 本地安裝指的是單機(jī)模式 0、前期準(zhǔn)備 java8或者java11(官方推薦11) 下載Flink安裝包 https://flink.apache.org/zh/downloads/ hadoop(后面Flink on Yarn部署模式需要) 服務(wù)器(我是使用虛擬

    2024年02月10日
    瀏覽(17)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包