国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

flink源碼分析 - yaml解析

這篇具有很好參考價值的文章主要介紹了flink源碼分析 - yaml解析。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

flink版本: flink-1.12.1? ? ?

代碼位置:??org.apache.flink.configuration.GlobalConfiguration

主要看下解析yaml文件的方法:??org.apache.flink.configuration.GlobalConfiguration#loadYAMLResource文章來源地址http://www.zghlxwxcb.cn/news/detail-800430.html

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 **/

package org.apache.flink.configuration;

import org.apache.flink.annotation.Internal;
import org.apache.flink.util.Preconditions;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;

/*
 * Global configuration object for Flink. Similar to Java properties configuration objects it
 * includes key-value pairs which represent the framework's configuration.
 **/
@Internal
public final class GlobalConfiguration {

    private static final Logger LOG = LoggerFactory.getLogger(GlobalConfiguration.class);

    public static final String FLINK_CONF_FILENAME = "flink-conf.yaml";

    // the keys whose values should be hidden
    private static final String[] SENSITIVE_KEYS = new String[]{"password", "secret", "fs.azure.account.key", "apikey"};

    // the hidden content to be displayed
    public static final String HIDDEN_CONTENT = "******";

    // --------------------------------------------------------------------------------------------

    private GlobalConfiguration() {
    }

    // --------------------------------------------------------------------------------------------

    /**
     * Loads the global configuration from the environment. Fails if an error occurs during loading.
     * Returns an empty configuration object if the environment variable is not set. In production
     * this variable is set but tests and local execution/debugging don't have this environment
     * variable set. That's why we should fail if it is not set.
     *
     * @return Returns the Configuration
     */
    public static Configuration loadConfiguration() {
        return loadConfiguration(new Configuration());
    }

    /**
     * Loads the global configuration and adds the given dynamic properties configuration.
     *
     * @param dynamicProperties The given dynamic properties
     * @return Returns the loaded global configuration with dynamic properties
     */
    public static Configuration loadConfiguration(Configuration dynamicProperties) {
        final String configDir = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
        if(configDir == null) {
            return new Configuration(dynamicProperties);
        }

        return loadConfiguration(configDir, dynamicProperties);
    }

    /**
     * Loads the configuration files from the specified directory.
     *
     * <p>YAML files are supported as configuration files.
     *
     * @param configDir the directory which contains the configuration files
     */
    public static Configuration loadConfiguration(final String configDir) {
        return loadConfiguration(configDir, null);
    }

    /**
     * Loads the configuration files from the specified directory. If the dynamic properties
     * configuration is not null, then it is added to the loaded configuration.
     *
     * @param configDir         directory to load the configuration from
     * @param dynamicProperties configuration file containing the dynamic properties. Null if none.
     * @return The configuration loaded from the given configuration directory
     */
    public static Configuration loadConfiguration(final String configDir, @Nullable final Configuration dynamicProperties) {

        if(configDir == null) {
            throw new IllegalArgumentException("Given configuration directory is null, cannot load configuration");
        }

        final File confDirFile = new File(configDir);
        if(!(confDirFile.exists())) {
            throw new IllegalConfigurationException("The given configuration directory name '" + configDir + "' (" + confDirFile
                    .getAbsolutePath() + ") does not describe an existing directory.");
        }

        /*************************************************
         * TODO_MA 馬中華 https://blog.csdn.net/zhongqi2513
         *  注釋: flink-conf.yaml
         */
        // get Flink yaml configuration file
        final File yamlConfigFile = new File(confDirFile, FLINK_CONF_FILENAME);

        if(!yamlConfigFile.exists()) {
            throw new IllegalConfigurationException("The Flink config file '" + yamlConfigFile + "' (" + confDirFile.getAbsolutePath() + ") does not exist.");
        }

        /*************************************************
         * TODO_MA 馬中華 https://blog.csdn.net/zhongqi2513
         *  注釋: 解析配置
         */
        Configuration configuration = loadYAMLResource(yamlConfigFile);

        if(dynamicProperties != null) {
            configuration.addAll(dynamicProperties);
        }

        return configuration;
    }

    /**
     * Loads a YAML-file of key-value pairs.
     *
     * <p>Colon and whitespace ": " separate key and value (one per line). The hash tag "#" starts a
     * single-line comment.
     *
     * <p>Example:
     *
     * <pre>
     * jobmanager.rpc.address: localhost # network address for communication with the job manager
     * jobmanager.rpc.port   : 6123      # network port to connect to for communication with the job manager
     * taskmanager.rpc.port  : 6122      # network port the task manager expects incoming IPC connections
     * </pre>
     *
     * <p>This does not span the whole YAML specification, but only the *syntax* of simple YAML
     * key-value pairs (see issue #113 on GitHub). If at any point in time, there is a need to go
     * beyond simple key-value pairs syntax compatibility will allow to introduce a YAML parser
     * library.
     *
     * @param file the YAML file to read from
     * @see <a >YAML 1.2 specification</a>
     */
    private static Configuration loadYAMLResource(File file) {
        final Configuration config = new Configuration();

        try(BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream(file)))) {

            String line;
            int lineNo = 0;
            while((line = reader.readLine()) != null) {
                lineNo++;
                // 1. check for comments
                String[] comments = line.split("#", 2);
                String conf = comments[0].trim();

                // 2. get key and value
                if(conf.length() > 0) {
                    String[] kv = conf.split(": ", 2);

                    // skip line with no valid key-value pair
                    if(kv.length == 1) {
                        LOG.warn("Error while trying to split key and value in configuration file " + file + ":" + lineNo + ": \"" + line + "\"");
                        continue;
                    }

                    String key = kv[0].trim();
                    String value = kv[1].trim();

                    // sanity check
                    if(key.length() == 0 || value.length() == 0) {
                        LOG.warn("Error after splitting key and value in configuration file " + file + ":" + lineNo + ": \"" + line + "\"");
                        continue;
                    }

                    LOG.info("Loading configuration property: {}, {}", key, isSensitive(key) ? HIDDEN_CONTENT : value);
                    config.setString(key, value);
                }
            }
        } catch(IOException e) {
            throw new RuntimeException("Error parsing YAML configuration.", e);
        }

        return config;
    }

    /**
     * Check whether the key is a hidden key.
     *
     * @param key the config key
     */
    public static boolean isSensitive(String key) {
        Preconditions.checkNotNull(key, "key is null");
        final String keyInLower = key.toLowerCase();
        for(String hideKey : SENSITIVE_KEYS) {
            if(keyInLower.length() >= hideKey.length() && keyInLower.contains(hideKey)) {
                return true;
            }
        }
        return false;
    }
}

到了這里,關(guān)于flink源碼分析 - yaml解析的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • Python綜合案例-小費數(shù)據(jù)集的數(shù)據(jù)分析(詳細(xì)思路+源碼解析)

    Python綜合案例-小費數(shù)據(jù)集的數(shù)據(jù)分析(詳細(xì)思路+源碼解析)

    目錄 1. 請導(dǎo)入相應(yīng)模塊并獲取數(shù)據(jù)。導(dǎo)入待處理數(shù)據(jù)tips.xls,并顯示前5行。 2、分析數(shù)據(jù) ?3.增加一列“人均消費” 4查詢抽煙男性中人均消費大于5的數(shù)據(jù) ?5.分析小費金額和消費總額的關(guān)系,小費金額與消費總額是否存在正相關(guān)關(guān)系。畫圖觀察。 6分析男女顧客哪個更慷慨,

    2024年02月02日
    瀏覽(27)
  • 【Python系列】Python中的YAML數(shù)據(jù)讀取與解析

    【Python系列】Python中的YAML數(shù)據(jù)讀取與解析

    ??????歡迎來到我的博客,很高興能夠在這里和您見面!希望您在這里可以感受到一份輕松愉快的氛圍,不僅可以獲得有趣的內(nèi)容和知識,也可以暢所欲言、分享您的想法和見解。 推薦:kwan 的首頁,持續(xù)學(xué)習(xí),不斷總結(jié),共同進(jìn)步,活到老學(xué)到老 導(dǎo)航 檀越劍指大廠系列:全面總

    2024年04月08日
    瀏覽(18)
  • Flink ExecuteGraph構(gòu)建源碼解析

    Flink ExecuteGraph構(gòu)建源碼解析

    在 JobGraph構(gòu)建過程 中分析了JobGraph的構(gòu)建過程,本文分析ExecutionGraph的構(gòu)建過程。JobManager(JobMaster) 根據(jù) JobGraph 生成 ExecutionGraph。ExecutionGraph是JobGraph 的并行化版本,是調(diào)度層最核心的數(shù)據(jù)結(jié)構(gòu)。 1、ExecutionJobVertex:和JobGraph中的JobVertex一一對應(yīng)。每一個ExecutionJobVertex都有 和并

    2024年03月11日
    瀏覽(16)
  • flink源碼分析 - flink命令啟動分析

    flink版本: flink-1.12.1 源碼位置:? flink-dist/src/main/flink-bin/bin/flink flink命令源碼: 首先講第一段: 工作中,很多人喜歡用符號鏈接(軟連接)去將原始命令鏈接到一個新的文件。 例如:? 將 /home/aaa鏈接到/opt/soft/flink-1.12.1/bin/flink,? 實際使用的時候就可以用 aaa去代替flink命令。 例如

    2024年01月18日
    瀏覽(20)
  • 【源碼解析】flink sql執(zhí)行源碼概述:flink sql執(zhí)行過程中有哪些階段,這些階段的源碼大概位置在哪里

    【源碼解析】flink sql執(zhí)行源碼概述:flink sql執(zhí)行過程中有哪些階段,這些階段的源碼大概位置在哪里

    本文大致分析了flink sql執(zhí)行過程中的各個階段的源碼邏輯,這樣可以在flink sql執(zhí)行過程中, 能夠定位到任務(wù)執(zhí)行的某個階段的代碼大概分布在哪里,為更針對性的分析此階段的細(xì)節(jié)邏輯打下基礎(chǔ),比如create 的邏輯是怎么執(zhí)行的,select的邏輯是怎么生成的,優(yōu)化邏輯都做了哪

    2024年02月04日
    瀏覽(26)
  • 【Flink精講】Flink內(nèi)核源碼分析:命令執(zhí)行入口

    【Flink精講】Flink內(nèi)核源碼分析:命令執(zhí)行入口

    官方推薦per-job模式,一個job一個集群,提交時yarn才分配集群資源; 主要的進(jìn)程:JobManager、TaskManager、Client 提交命令:bin/flink run -t yarn-per-job? /opt/module/flink-1.12.0/examples/streaming/SocketWindowWordCount.jar --port 9999 Per-job進(jìn)程: CliFronted、YarnJobClusterEncrypoint、TaskExecutorRunner=TaskManagerRunn

    2024年02月21日
    瀏覽(24)
  • Flink源碼解析八之任務(wù)調(diào)度和負(fù)載均衡

    Flink源碼解析八之任務(wù)調(diào)度和負(fù)載均衡

    jobmanager scheduler :這部分與 Flink 的任務(wù)調(diào)度有關(guān)。 CoLocationConstraint :這是一個約束類,用于確保某些算子的不同子任務(wù)在同一個 TaskManager 上運行。這通常用于狀態(tài)共享或算子鏈的情況。 CoLocationGroup CoLocationGroupImpl :這些與 CoLocationConstraint 相關(guān),定義了一組需要在同一個

    2024年02月05日
    瀏覽(17)
  • Flink源碼解析四之任務(wù)調(diào)度和負(fù)載均衡

    Flink源碼解析四之任務(wù)調(diào)度和負(fù)載均衡

    jobmanager scheduler :這部分與 Flink 的任務(wù)調(diào)度有關(guān)。 CoLocationConstraint :這是一個約束類,用于確保某些算子的不同子任務(wù)在同一個 TaskManager 上運行。這通常用于狀態(tài)共享或算子鏈的情況。 CoLocationGroup CoLocationGroupImpl :這些與 CoLocationConstraint 相關(guān),定義了一組需要在同一個

    2024年02月06日
    瀏覽(29)
  • Flink window 源碼分析4:WindowState

    本文源碼為flink 1.18.0版本。 其他相關(guān)文章: Flink window 源碼分析1:窗口整體執(zhí)行流程 Flink window 源碼分析2:Window 的主要組件 Flink window 源碼分析3:WindowOperator Flink window 源碼分析4:WindowState 主要考慮 reduce、aggregate 函數(shù)中的托管狀態(tài)是在什么時候觸發(fā)和使用的?使用時與Win

    2024年01月25日
    瀏覽(27)
  • FlinkSQL-- sql-client及源碼解析 -- flink-1.13.6

    FlinkSQL-- sql-client及源碼解析 -- flink-1.13.6

    本文基于flink-1.13.6 SQL Client: Init scripts and Statement Sets 這個版本極大地改進(jìn)了 SQL 客戶端的功能。現(xiàn)在 SQL Client 和 SQL 腳本都支持 通過Java 應(yīng)用程序執(zhí)行的幾乎所有操作(從 TableEnvironment 以編程方式啟動查詢)。這意味著 SQL 用戶在 SQL 部署中需要的代碼少了很多。其中最核心的功能

    2023年04月27日
    瀏覽(29)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包