gitee碼云地址
直接下載解壓可用 https://gitee.com/shawsongyue/aurora.git
模塊:aurora_flink
主類(lèi):GetParamsStreamingJob
簡(jiǎn)介概述
? 1.幾乎所有的批和流的 Flink 應(yīng)用程序,都依賴(lài)于外部配置參數(shù)。這些配置參數(shù)可以用于指定輸入和輸出源(如路徑或地址)、系統(tǒng)參數(shù)(并行度,運(yùn)行時(shí)配置)和特定的應(yīng)用程序參數(shù)(通常使用在用戶(hù)自定義函數(shù))。
? 2.為解決以上問(wèn)題,F(xiàn)link 提供一個(gè)名為 Parametertool
的簡(jiǎn)單公共類(lèi),其中包含了一些基本的工具。請(qǐng)注意,這里說(shuō)的 Parametertool
并不是必須使用的。Commons CLI 和 argparse4j 等其他框架也可以非常好地兼容 Flink。
? 3.**ParameterTool**定義了一組靜態(tài)方法,用于讀取配置信息。該工具類(lèi)內(nèi)部使用了
Map` 類(lèi)型,這樣使得它可以很容易地與你的配置集成在一起。
01 配置值來(lái)自.properties文件
1.通過(guò)路徑讀取
//定義文件路徑
String propertiesFilePath = "E:\\project\\aurora_dev\\aurora_flink\\src\\main\\resources\\application.properties";
//方式一:直接使用內(nèi)置工具類(lèi)
ParameterTool parameter_01 = ParameterTool.fromPropertiesFile(propertiesFilePath);
String jobName_01 = parameter_01.get("jobName");
logger.info("方式一:讀取配置文件中指定的key值={}",jobName_01);
2.通過(guò)文件流讀取
//定義文件路徑
String propertiesFilePath = "E:\\project\\aurora_dev\\aurora_flink\\src\\main\\resources\\application.properties";
//方式二:使用文件
File propertiesFile = new File(propertiesFilePath);
ParameterTool parameter_02 = ParameterTool.fromPropertiesFile(propertiesFile);
String jobName_02 = parameter_02.get("jobName");
logger.info("方式二:讀取配置文件中指定的key值={}",jobName_02);
3.通過(guò)IO流讀取
//定義文件路徑
String propertiesFilePath = "E:\\project\\aurora_dev\\aurora_flink\\src\\main\\resources\\application.properties";
//方式三:使用IO流
InputStream propertiesFileInputStream = new FileInputStream(new File(propertiesFilePath));
ParameterTool parameter_03 = ParameterTool.fromPropertiesFile(propertiesFileInputStream);
String jobName_03 = parameter_03.get("jobName");
logger.info("方式三:讀取配置文件中指定的key值={}",jobName_03);
02 配置值來(lái)自命令行
tips:在idea的命令行傳參,格式:–jobName program_job_aurora
ParameterTool parameter_04 = ParameterTool.fromArgs(args);
String jobName_04 = parameter_04.get("jobName");
logger.info("方式四:命令行傳參key值={}",jobName_04);
03 配置來(lái)自系統(tǒng)屬性
tips:在idea的的jvm系統(tǒng)參數(shù)設(shè)置,格式:-Dinput=hdfs:///mydata
//方式五:獲取jvm參數(shù)值
ParameterTool parameter_05 = ParameterTool.fromSystemProperties();
String jobName_05 = parameter_05.get("input");
logger.info("方式五:獲取jvm參數(shù)key值={}",jobName_05);
04 注冊(cè)以及使用全局變量
注意:Flink全局變量?jī)H支持在富函數(shù)中使用,即Rich開(kāi)頭的類(lèi)使用
//定義文件路徑
String propertiesFilePath = "E:\\project\\aurora_dev\\aurora_flink\\src\\main\\resources\\application.properties";
//直接使用內(nèi)置工具類(lèi)獲取參數(shù)
ParameterTool parameter_01 = ParameterTool.fromPropertiesFile(propertiesFilePath);
//方式六:注冊(cè)全局參數(shù)
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(parameter_01);
//在任意富函數(shù)中均可以獲取,注意!注意!注意!只有富文本函數(shù)才可以使用
//1.創(chuàng)建富函數(shù)
RichFlatMapFunction<String, String> richFlatMap = new RichFlatMapFunction<>() {
@Override
public void flatMap(String s, Collector<String> collector) throws Exception {
//獲取運(yùn)行環(huán)境
ParameterTool parameters = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
//獲取對(duì)應(yīng)的值
String jobName = parameters.getRequired("jobName");
logger.info("方式六:獲取全局注冊(cè)參數(shù)key值={}",jobName_05);
}
};
//2.創(chuàng)建數(shù)據(jù)集
ArrayList<String> list = new ArrayList<>();
list.add("001");
list.add("002");
list.add("003");
//3.把有限數(shù)據(jù)集轉(zhuǎn)換為數(shù)據(jù)源
DataStreamSource<String> dataStreamSource = env.fromCollection(list).setParallelism(1);
//4.執(zhí)行富文本處理
dataStreamSource.flatMap(richFlatMap);
//5.啟動(dòng)程序
env.execute();
05 Flink獲取參數(shù)值Demo
1.項(xiàng)目結(jié)構(gòu)
2.pom.xml文件如下
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.xsy</groupId>
<artifactId>aurora_flink</artifactId>
<version>1.0-SNAPSHOT</version>
<!--屬性設(shè)置-->
<properties>
<!--java_JDK版本-->
<java.version>11</java.version>
<!--maven打包插件-->
<maven.plugin.version>3.8.1</maven.plugin.version>
<!--編譯編碼UTF-8-->
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<!--輸出報(bào)告編碼UTF-8-->
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<!--json數(shù)據(jù)格式處理工具-->
<fastjson.version>1.2.75</fastjson.version>
<!--log4j版本-->
<log4j.version>2.17.1</log4j.version>
<!--flink版本-->
<flink.version>1.18.0</flink.version>
<!--scala版本-->
<scala.binary.version>2.11</scala.binary.version>
<!--log4j依賴(lài)-->
<log4j.version>2.17.1</log4j.version>
</properties>
<!--通用依賴(lài)-->
<dependencies>
<!-- json -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>${fastjson.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<!--================================集成外部依賴(lài)==========================================-->
<!--集成日志框架 start-->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
</dependency>
<!--集成日志框架 end-->
</dependencies>
<!--編譯打包-->
<build>
<finalName>${project.name}</finalName>
<!--資源文件打包-->
<resources>
<resource>
<directory>src/main/resources</directory>
</resource>
<resource>
<directory>src/main/java</directory>
<includes>
<include>**/*.xml</include>
</includes>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.1.1</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<excludes>
<exclude>org.apache.flink:force-shading</exclude>
<exclude>org.google.code.flindbugs:jar305</exclude>
<exclude>org.slf4j:*</exclude>
<excluder>org.apache.logging.log4j:*</excluder>
</excludes>
</artifactSet>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.xsy.sevenhee.flink.TestStreamJob</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
<!--插件統(tǒng)一管理-->
<pluginManagement>
<plugins>
<!--maven打包插件-->
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring.boot.version}</version>
<configuration>
<fork>true</fork>
<finalName>${project.build.finalName}</finalName>
</configuration>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
<!--編譯打包插件-->
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven.plugin.version}</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
<encoding>UTF-8</encoding>
<compilerArgs>
<arg>-parameters</arg>
</compilerArgs>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
<!--配置Maven項(xiàng)目中需要使用的遠(yuǎn)程倉(cāng)庫(kù)-->
<repositories>
<repository>
<id>aliyun-repos</id>
<url>https://maven.aliyun.com/nexus/content/groups/public/</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>
<!--用來(lái)配置maven插件的遠(yuǎn)程倉(cāng)庫(kù)-->
<pluginRepositories>
<pluginRepository>
<id>aliyun-plugin</id>
<url>https://maven.aliyun.com/nexus/content/groups/public/</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</pluginRepository>
</pluginRepositories>
</project>
3.配置文件
(1)application.properties
jobName=job_aurora
jobMemory=1024
taskName=task_aurora
(2)log4j2.properties文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-829829.html
rootLogger.level=INFO
rootLogger.appenderRef.console.ref=ConsoleAppender
appender.console.name=ConsoleAppender
appender.console.type=CONSOLE
appender.console.layout.type=PatternLayout
appender.console.layout.pattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
log.file=D:\\tmprootLogger.level=INFO
rootLogger.appenderRef.console.ref=ConsoleAppender
appender.console.name=ConsoleAppender
appender.console.type=CONSOLE
appender.console.layout.type=PatternLayout
appender.console.layout.pattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
log.file=D:\\tmp
4.項(xiàng)目主類(lèi)
package com.aurora;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
/**
* @description flink獲取外部參數(shù)作業(yè)
*
* @author 淺夏的貓
* @datetime 15:54 2024/1/28
*/
public class GetParamsStreamingJob {
private static final Logger logger = LoggerFactory.getLogger(GetParamsStreamingJob.class);
public static void main(String[] args) throws Exception {
//定義文件路徑
String propertiesFilePath = "E:\\project\\aurora_dev\\aurora_flink\\src\\main\\resources\\application.properties";
//方式一:直接使用內(nèi)置工具類(lèi)
ParameterTool parameter_01 = ParameterTool.fromPropertiesFile(propertiesFilePath);
String jobName_01 = parameter_01.get("jobName");
logger.info("方式一:讀取配置文件中指定的key值={}",jobName_01);
//方式二:使用文件
File propertiesFile = new File(propertiesFilePath);
ParameterTool parameter_02 = ParameterTool.fromPropertiesFile(propertiesFile);
String jobName_02 = parameter_02.get("jobName");
logger.info("方式二:讀取配置文件中指定的key值={}",jobName_02);
//方式三:使用IO流
InputStream propertiesFileInputStream = new FileInputStream(new File(propertiesFilePath));
ParameterTool parameter_03 = ParameterTool.fromPropertiesFile(propertiesFileInputStream);
String jobName_03 = parameter_03.get("jobName");
logger.info("方式三:讀取配置文件中指定的key值={}",jobName_03);
//方式四:命令行傳參格式:--jobName program_job_aurora
ParameterTool parameter_04 = ParameterTool.fromArgs(args);
String jobName_04 = parameter_04.get("jobName");
logger.info("方式四:命令行傳參key值={}",jobName_04);
//方式五:獲取jvm參數(shù)值
ParameterTool parameter_05 = ParameterTool.fromSystemProperties();
String jobName_05 = parameter_05.get("input");
logger.info("方式五:獲取jvm參數(shù)key值={}",jobName_05);
//方式六:注冊(cè)全局參數(shù)
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(parameter_01);
//在任意富函數(shù)中均可以獲取,注意!注意!注意!只有富文本函數(shù)才可以使用
//1.創(chuàng)建富函數(shù)
RichFlatMapFunction<String, String> richFlatMap = new RichFlatMapFunction<>() {
@Override
public void flatMap(String s, Collector<String> collector) throws Exception {
//獲取運(yùn)行環(huán)境
ParameterTool parameters = (ParameterTool) getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
//獲取對(duì)應(yīng)的值
String jobName = parameters.getRequired("jobName");
logger.info("方式六:獲取全局注冊(cè)參數(shù)key值={}",jobName_05);
}
};
//2.創(chuàng)建數(shù)據(jù)集
ArrayList<String> list = new ArrayList<>();
list.add("001");
list.add("002");
list.add("003");
//3.把有限數(shù)據(jù)集轉(zhuǎn)換為數(shù)據(jù)源
DataStreamSource<String> dataStreamSource = env.fromCollection(list).setParallelism(1);
//4.執(zhí)行富文本處理
dataStreamSource.flatMap(richFlatMap);
//5.啟動(dòng)程序
env.execute();
}
}
5.運(yùn)行查看相關(guān)日志
文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-829829.html
到了這里,關(guān)于【極數(shù)系列】Flink配置參數(shù)如何獲???(06)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!