Flink 系列文章
一、Flink 專欄
Flink 專欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說(shuō)明。
-
1、Flink 部署系列
本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 -
2、Flink基礎(chǔ)系列
本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語(yǔ)、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 -
3、Flik Table API和SQL基礎(chǔ)系列
本部分介紹Flink Table Api和SQL的基本用法,比如Table API和SQL創(chuàng)建庫(kù)、表用法、查詢、窗口函數(shù)、catalog等等內(nèi)容。 -
4、Flik Table API和SQL提高與應(yīng)用系列
本部分是table api 和sql的應(yīng)用部分,和實(shí)際的生產(chǎn)應(yīng)用聯(lián)系更為密切,以及有一定開(kāi)發(fā)難度的內(nèi)容。 -
5、Flink 監(jiān)控系列
本部分和實(shí)際的運(yùn)維、監(jiān)控工作相關(guān)。
二、Flink 示例專欄
Flink 示例專欄是 Flink 專欄的輔助說(shuō)明,一般不會(huì)介紹知識(shí)點(diǎn)的信息,更多的是提供一個(gè)一個(gè)可以具體使用的示例。本專欄不再分目錄,通過(guò)鏈接即可看出介紹的內(nèi)容。
兩專欄的所有文章入口點(diǎn)擊:Flink 系列文章匯總索引
本文演示了Flink 將表注冊(cè)到catalog中,其中用sql client展示了連接mysql,通過(guò)table api 和sql 演示了將表注冊(cè)到hivecatalog中。
如果需要了解更多內(nèi)容,可以在本人Flink 專欄中了解更新系統(tǒng)的內(nèi)容。
本文除了maven依賴外,其他依賴如下:
hadoop的版本是3.1.4
hive的版本是3.1.2
flink的環(huán)境版本是1.3.6
一、創(chuàng)建 Flink 表并將其注冊(cè)到 Catalog
1、使用 SQL DDL
用戶可以使用 DDL 通過(guò) Table API 或者 SQL Client 在 Catalog 中創(chuàng)建表。
JdbcCatalog不能創(chuàng)建庫(kù)或表,官方示例寫(xiě)的不明確;hivecatalog可以創(chuàng)建表。
本示例是以mysql為基礎(chǔ),flink 版本為1.17。
// the catalog should have been registered via yaml file
Flink SQL> CREATE DATABASE mydb WITH (...);
-----Jdbccatalog不能創(chuàng)建表,hivecatalog可以創(chuàng)建表----
Flink SQL> CREATE TABLE mytable (name STRING, age INT) WITH (...);
Flink SQL> SHOW TABLES;
mytable
-----------------------具體示例如下-----------------------------------
Flink SQL> CREATE CATALOG alan_catalog WITH(
> 'type' = 'jdbc',
> 'default-database' = 'test?useSSL=false',
> 'username' = 'root',
> 'password' = 'root',
> 'base-url' = 'jdbc:mysql://192.168.10.44:3306'
> );
[INFO] Execute statement succeed.
Flink SQL> show catalogs;
+-----------------+
| catalog name |
+-----------------+
| alan_catalog |
| default_catalog |
+-----------------+
2 rows in set
Flink SQL> use catalog alan_catalog;
[INFO] Execute statement succeed.
Flink SQL> show databases;
+------------------+
| database name |
+------------------+
| azkaban |
| cdhhive |
| cdhhue |
......
| spring_boot_plus |
| springbootmall |
| test |
| zipkin |
+------------------+
29 rows in set
Flink SQL> use test;
[INFO] Execute statement succeed.
Flink SQL> show tables;
+------------------------------+
| table name |
+------------------------------+
| permissions |
| person |
| personinfo |
| role |
| user |
+------------------------------+
34 rows in set
Flink SQL> select * from person;
Flink SQL> SET sql-client.execution.result-mode = tableau;
[INFO] Execute statement succeed.
Flink SQL> select * from person;
+----+-------------+--------------------------------+-------------+
| op | id | name | age |
+----+-------------+--------------------------------+-------------+
| +I | 11 | 測(cè)試修改go語(yǔ)言 | 30 |
| +I | 13 | NameUpdate | 22 |
| +I | 14 | updatejson | 23 |
| +I | 189 | 再試一試 | 12 |
| +I | 191 | test-full-update | 3333 |
| +I | 889 | zhangsanswagger2 | 88 |
| +I | 892 | update | 189 |
| +I | 1001 | testupdate | 19 |
| +I | 1002 | 測(cè)試go語(yǔ)言 | 23 |
| +I | 1013 | slene | 0 |
| +I | 1014 | testing | 0 |
| +I | 1015 | testing | 18 |
| +I | 1016 | astaxie | 19 |
| +I | 1017 | alan | 18 |
| +I | 1018 | chan | 19 |
+----+-------------+--------------------------------+-------------+
Received a total of 15 rows
2、maven依賴
<properties>
<encoding>UTF-8</encoding>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<java.version>1.8</java.version>
<scala.version>2.12</scala.version>
<flink.version>1.13.6</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
<version>1.8</version>
<scope>system</scope>
<systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- blink執(zhí)行計(jì)劃,1.11+默認(rèn)的 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- flink連接器 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-sql-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.12</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-csv</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-json</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_2.12</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-metastore</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>3.1.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2-uber</artifactId>
<version>2.7.5-10.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.38</version>
<scope>provided</scope>
<!--<version>8.0.20</version> -->
</dependency>
<!-- 日志 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.44</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.2</version>
<scope>provided</scope>
</dependency>
</dependencies>
3、使用 Table API 創(chuàng)建hive表并注冊(cè)到hivecatalog示例
用戶可以用編程的方式使用Java 或者 Scala 來(lái)創(chuàng)建 Catalog 表。
下文示例是以hivecatalog為例,關(guān)于更多的hivecatalog將在其他的專題中介紹。
需要說(shuō)明的是本示例運(yùn)行時(shí)需要將hadoop環(huán)境中的/usr/local/bigdata/hadoop-3.1.4/share/hadoop/common/hadoop-lzo-0.4.21-SNAPSHOT.jar復(fù)制一份到flink的lib目錄(/usr/local/bigdata/flink-1.13.5/lib),此處做法的原因是本人的hadoop環(huán)境中配置了lzo的壓縮方式。
hadoop的版本是3.1.4
hive的版本是3.1.2
flink的環(huán)境版本是1.3.6
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
import org.apache.flink.types.Row;
/**
* @author alanchan
*
*/
public class TestHiveCatalogDemo {
/**
* @param args
* @throws DatabaseNotExistException
* @throws CatalogException
* @throws DatabaseAlreadyExistException
* @throws TableAlreadyExistException
*/
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
String name = "alan_hive";
// testhive 數(shù)據(jù)庫(kù)名稱
String defaultDatabase = "testhive";
String hiveConfDir = "/usr/local/bigdata/apache-hive-3.1.2-bin/conf";
HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir);
tenv.registerCatalog("alan_hive", hiveCatalog);
// 使用注冊(cè)的catalog
tenv.useCatalog("alan_hive");
List<String> tables = hiveCatalog.listTables(defaultDatabase);
for (String table : tables) {
System.out.println("Database:testhive tables:" + table);
}
//創(chuàng)建數(shù)據(jù)庫(kù)
// public CatalogDatabaseImpl(Map<String, String> properties, @Nullable String comment) {
// this.properties = checkNotNull(properties, "properties cannot be null");
// this.comment = comment;
// }
Map<String, String> properties = new HashMap();
// properties.put(CatalogConfig.IS_GENERIC, String.valueOf(true));
// properties.put("connector", "COLLECTION");
CatalogDatabase cd = new CatalogDatabaseImpl(properties, "this is new database,the name is alan_hivecatalog_hivedb");
String newDatabaseName = "alan_hivecatalog_hivedb";
hiveCatalog.createDatabase(newDatabaseName, cd, true);
//創(chuàng)建表
String tableName = "alan_hivecatalog_hivedb_testTable";
// public ObjectPath(String databaseName, String objectName)
ObjectPath path = new ObjectPath(newDatabaseName, tableName);
// public CatalogTableImpl( TableSchema tableSchema, Map<String, String> properties, String comment)
TableSchema schema = TableSchema.builder()
.field("id", DataTypes.INT())
.field("name", DataTypes.STRING())
.field("age", DataTypes.INT())
.build();
// public CatalogTableImpl(TableSchema tableSchema, Map<String, String> properties, String comment)
CatalogTable catalogTable = new CatalogTableImpl(schema, properties, "this is table comment");
hiveCatalog.createTable(path, catalogTable, true);
List<String> newTables = hiveCatalog.listTables(newDatabaseName);
for (String table : newTables) {
System.out.println("Database:alan_hivecatalog_hivedb tables:" + table);
}
//插入數(shù)據(jù)
String insertSQL = "insert into " + newDatabaseName + "." + tableName + " values (1,'alan',18)";
tenv.executeSql(insertSQL);
// 查詢數(shù)據(jù)
String selectSQL = "select * from " + newDatabaseName + "." + tableName;
Table table = tenv.sqlQuery(selectSQL);
table.printSchema();
DataStream<Tuple2<Boolean, Row>> result = tenv.toRetractStream(table, Row.class);
result.print();
env.execute();
}
}
4、使用 SQL語(yǔ)句 創(chuàng)建hive表并注冊(cè)到hivecatalog示例
本示例功能與上述的示例功能一樣,其區(qū)別是使用的實(shí)現(xiàn)方式不同,即一個(gè)是通過(guò)api建表,一個(gè)是通過(guò)sql建表。
import java.util.HashMap;
import java.util.Map;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.types.Row;
/**
* @author alanchan
*
*/
public class TestCreateHiveTable {
public static final String tableName = "alan_hivecatalog_hivedb_testTable";
public static final String hive_create_table_sql = "CREATE TABLE " + tableName + " (\n" +
" id INT,\n" +
" name STRING,\n" +
" age INT" + ") " +
"TBLPROPERTIES (\n" +
" 'sink.partition-commit.delay'='5 s',\n" +
" 'sink.partition-commit.trigger'='partition-time',\n" +
" 'sink.partition-commit.policy.kind'='metastore,success-file'" + ")";
/**
* @param args
* @throws DatabaseAlreadyExistException
* @throws CatalogException
*/
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
String hiveConfDir = "/usr/local/bigdata/apache-hive-3.1.2-bin/conf";
String name = "alan_hive";
// default 數(shù)據(jù)庫(kù)名稱
String defaultDatabase = "default";
HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir);
tenv.registerCatalog("alan_hive", hiveCatalog);
tenv.useCatalog("alan_hive");
// Map<String, String> properties = new HashMap();
// CatalogDatabase cd = new CatalogDatabaseImpl(properties, "this is new database,the name is alan_hivecatalog_hivedb");
String newDatabaseName = "alan_hivecatalog_hivedb";
// if (hiveCatalog.databaseExists(newDatabaseName)) {
// hiveCatalog.dropDatabase(newDatabaseName, true);
// }
// hiveCatalog.createDatabase(newDatabaseName, cd, true);
tenv.useDatabase(newDatabaseName);
// 創(chuàng)建表
tenv.getConfig().setSqlDialect(SqlDialect.HIVE);
// if(hiveCatalog.tableExists( new ObjectPath(newDatabaseName, tableName))) {
// hiveCatalog.dropTable( new ObjectPath(newDatabaseName, tableName), true);
// }
tenv.executeSql(hive_create_table_sql);
// 插入數(shù)據(jù)
// String insertSQL = "insert into " + tableName + " values (1,'alan',18)";
String insertSQL = "insert into alan_hivecatalog_hivedb_testTable values (1,'alan',18)";
tenv.executeSql(insertSQL);
// 查詢數(shù)據(jù)
// String selectSQL = "select * from " + tableName;
String selectSQL = "select * from alan_hivecatalog_hivedb_testTable" ;
Table table = tenv.sqlQuery(selectSQL);
table.printSchema();
DataStream<Tuple2<Boolean, Row>> result = tenv.toRetractStream(table, Row.class);
result.print();
env.execute();
}
}
5、驗(yàn)證
本示例是在flink集群中以命令形式提交的任務(wù),其實(shí)通過(guò)web ui頁(yè)面提交任務(wù)一樣,不再贅述。
前提:
1、hadoop環(huán)境好用
2、hive環(huán)境好用
3、flink與hive集成環(huán)境完成且好用
4、啟動(dòng)flink集群,本文是以yarn-session形式啟動(dòng)的
1)、打包、上傳
pom.xml文件中配置打包插件
<build>
<sourceDirectory>src/main/java</sourceDirectory>
<plugins>
<!-- 編譯插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<!--<encoding>${project.build.sourceEncoding}</encoding> -->
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18.1</version>
<configuration>
<useFile>false</useFile>
<disableXmlReport>true</disableXmlReport>
<includes>
<include>**/*Test.*</include>
<include>**/*Suite.*</include>
</includes>
</configuration>
</plugin>
<!-- 打包插件(會(huì)包含所有依賴) -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<!-- zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF -->
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<!-- 設(shè)置jar包的入口類(可選) -->
<mainClass> org.table_sql.TestCreateHiveTable</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
在cmd中打包或在開(kāi)發(fā)工具中打包,本處是以cmd命令行打包
mvn package -Dmaven.test.skip=true
# 直到看到
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 18.304 s
將打包后的jar文件上傳至flink集群中并運(yùn)行即可。文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-832679.html
2)、提交任務(wù)
#文件位置 /usr/local/bigdata/flink-1.13.5/examples/table/table_sql-0.0.1-SNAPSHOT.jar
#如果配置了flink的環(huán)境變量直接運(yùn)行下面的命令;如果沒(méi)有配置flink的環(huán)境變量則需要切換到flink的bin目錄運(yùn)行下面命令
flink run /usr/local/bigdata/flink-1.13.5/examples/table/table_sql-0.0.1-SNAPSHOT.jar org.table_sql.TestHiveCatalogDemo
3)、驗(yàn)證
# 1、提交任務(wù)后運(yùn)行情況
[alanchan@server1 bin]$ flink run /usr/local/bigdata/flink-1.13.5/examples/table/table_sql-0.0.1-SNAPSHOT.jar org.table_sql.TestHiveCatalogDemo
2023-08-31 00:18:01,185 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli [] - Found Yarn properties file under /tmp/.yarn-properties-alanchan.
2023-08-31 00:18:01,185 INFO org.apache.flink.yarn.cli.FlinkYarnSessionCli [] - Found Yarn properties file under /tmp/.yarn-properties-alanchan.
Hive Session ID = 4c3ab8b5-d99e-4e2f-9362-fcbcae8047fa
Hive Session ID = d3fc6679-9b60-47a9-b9e7-d125e3240196
2023-08-31 00:18:07,578 WARN org.apache.flink.yarn.configuration.YarnLogConfigUtil [] - The configuration directory ('/usr/local/bigdata/flink-1.13.5/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2023-08-31 00:18:07,778 INFO org.apache.hadoop.yarn.client.AHSProxy [] - Connecting to Application History server at server1/192.168.10.41:10200
2023-08-31 00:18:07,787 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2023-08-31 00:18:07,860 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface server3:43480 of application 'application_1693286353898_0021'.
Job has been submitted with JobID 2161b431ad0310df06417a3232ca5e60
Hive Session ID = 90444eb0-7fc9-4ac9-adb1-44df145739c7
(
`id` INT,
`name` STRING,
`age` INT
)
2023-08-31 00:18:17,806 WARN org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory [] - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
2023-08-31 00:18:17,871 INFO org.apache.hadoop.mapred.FileInputFormat [] - Total input files to process : 0
2023-08-31 00:18:18,115 INFO org.apache.hadoop.yarn.client.AHSProxy [] - Connecting to Application History server at server1/192.168.10.41:10200
2023-08-31 00:18:18,116 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2023-08-31 00:18:18,119 INFO org.apache.flink.yarn.YarnClusterDescriptor [] - Found Web Interface server3:43480 of application 'application_1693286353898_0021'.
Job has been submitted with JobID 16a85c80862dac9035c62563b39a9fb7
Program execution finished
Job with JobID 16a85c80862dac9035c62563b39a9fb7 has finished.
Job Runtime: 6652 ms
# 2、在flink sql cli中查詢表及其數(shù)據(jù)
Flink SQL> SET sql-client.execution.result-mode = tableau;
[INFO] Session property has been set.
Flink SQL> select * from alan_hivecatalog_hivedb_testtable;
+----+-------------+--------------------------------+-------------+
| op | id | name | age |
+----+-------------+--------------------------------+-------------+
| +I | 1 | alan | 18 |
+----+-------------+--------------------------------+-------------+
Received a total of 1 row
#以上,驗(yàn)證完畢
以上,本文演示了Flink 將表注冊(cè)到catalog中,其中用sql client展示了連接mysql,通過(guò)table api 和sql 演示了將表注冊(cè)到hivecatalog中。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-832679.html
到了這里,關(guān)于【flink番外篇】21、Flink 通過(guò)SQL client 和 table api注冊(cè)catalog示例的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!