国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

【flink番外篇】21、Flink 通過(guò)SQL client 和 table api注冊(cè)catalog示例

這篇具有很好參考價(jià)值的文章主要介紹了【flink番外篇】21、Flink 通過(guò)SQL client 和 table api注冊(cè)catalog示例。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

Flink 系列文章

一、Flink 專欄

Flink 專欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說(shuō)明。

  • 1、Flink 部署系列
    本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。

  • 2、Flink基礎(chǔ)系列
    本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語(yǔ)、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。

  • 3、Flik Table API和SQL基礎(chǔ)系列
    本部分介紹Flink Table Api和SQL的基本用法,比如Table API和SQL創(chuàng)建庫(kù)、表用法、查詢、窗口函數(shù)、catalog等等內(nèi)容。

  • 4、Flik Table API和SQL提高與應(yīng)用系列
    本部分是table api 和sql的應(yīng)用部分,和實(shí)際的生產(chǎn)應(yīng)用聯(lián)系更為密切,以及有一定開(kāi)發(fā)難度的內(nèi)容。

  • 5、Flink 監(jiān)控系列
    本部分和實(shí)際的運(yùn)維、監(jiān)控工作相關(guān)。

二、Flink 示例專欄

Flink 示例專欄是 Flink 專欄的輔助說(shuō)明,一般不會(huì)介紹知識(shí)點(diǎn)的信息,更多的是提供一個(gè)一個(gè)可以具體使用的示例。本專欄不再分目錄,通過(guò)鏈接即可看出介紹的內(nèi)容。

兩專欄的所有文章入口點(diǎn)擊:Flink 系列文章匯總索引



本文演示了Flink 將表注冊(cè)到catalog中,其中用sql client展示了連接mysql,通過(guò)table api 和sql 演示了將表注冊(cè)到hivecatalog中。

如果需要了解更多內(nèi)容,可以在本人Flink 專欄中了解更新系統(tǒng)的內(nèi)容。

本文除了maven依賴外,其他依賴如下:

hadoop的版本是3.1.4
hive的版本是3.1.2
flink的環(huán)境版本是1.3.6

一、創(chuàng)建 Flink 表并將其注冊(cè)到 Catalog

1、使用 SQL DDL

用戶可以使用 DDL 通過(guò) Table API 或者 SQL Client 在 Catalog 中創(chuàng)建表。

JdbcCatalog不能創(chuàng)建庫(kù)或表,官方示例寫(xiě)的不明確;hivecatalog可以創(chuàng)建表。

本示例是以mysql為基礎(chǔ),flink 版本為1.17。

// the catalog should have been registered via yaml file
Flink SQL> CREATE DATABASE mydb WITH (...);
-----Jdbccatalog不能創(chuàng)建表,hivecatalog可以創(chuàng)建表----
Flink SQL> CREATE TABLE mytable (name STRING, age INT) WITH (...);

Flink SQL> SHOW TABLES;
mytable

-----------------------具體示例如下-----------------------------------
Flink SQL> CREATE CATALOG alan_catalog WITH(
>     'type' = 'jdbc',
>     'default-database' = 'test?useSSL=false',
>     'username' = 'root',
>     'password' = 'root',
>     'base-url' = 'jdbc:mysql://192.168.10.44:3306'
> );
[INFO] Execute statement succeed.

Flink SQL> show catalogs;
+-----------------+
|    catalog name |
+-----------------+
|    alan_catalog |
| default_catalog |
+-----------------+
2 rows in set

Flink SQL> use catalog alan_catalog;
[INFO] Execute statement succeed.

Flink SQL> show databases;
+------------------+
|    database name |
+------------------+
|          azkaban |
|          cdhhive |
|           cdhhue |
......
| spring_boot_plus |
|   springbootmall |
|             test |
|           zipkin |
+------------------+
29 rows in set

Flink SQL> use test;
[INFO] Execute statement succeed.

Flink SQL> show tables;

+------------------------------+
|                   table name |
+------------------------------+
|                  permissions |
|                       person |
|                   personinfo |
|                         role |
|                         user |
+------------------------------+
34 rows in set

Flink SQL> select * from person;

Flink SQL> SET sql-client.execution.result-mode = tableau;
[INFO] Execute statement succeed.

Flink SQL> select * from person;

+----+-------------+--------------------------------+-------------+
| op |          id |                           name |         age |
+----+-------------+--------------------------------+-------------+
| +I |          11 |                 測(cè)試修改go語(yǔ)言 |          30 |
| +I |          13 |                     NameUpdate |          22 |
| +I |          14 |                     updatejson |          23 |
| +I |         189 |                       再試一試 |          12 |
| +I |         191 |               test-full-update |        3333 |
| +I |         889 |               zhangsanswagger2 |          88 |
| +I |         892 |                         update |         189 |
| +I |        1001 |                     testupdate |          19 |
| +I |        1002 |                     測(cè)試go語(yǔ)言 |          23 |
| +I |        1013 |                          slene |           0 |
| +I |        1014 |                        testing |           0 |
| +I |        1015 |                        testing |          18 |
| +I |        1016 |                        astaxie |          19 |
| +I |        1017 |                           alan |          18 |
| +I |        1018 |                           chan |          19 |
+----+-------------+--------------------------------+-------------+
Received a total of 15 rows

2、maven依賴

<properties>
		<encoding>UTF-8</encoding>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<maven.compiler.source>1.8</maven.compiler.source>
		<maven.compiler.target>1.8</maven.compiler.target>
		<java.version>1.8</java.version>
		<scala.version>2.12</scala.version>
		<flink.version>1.13.6</flink.version>
</properties>

<dependencies>
	<dependency>
		<groupId>jdk.tools</groupId>
		<artifactId>jdk.tools</artifactId>
		<version>1.8</version>
		<scope>system</scope>
		<systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-clients_2.12</artifactId>
		<version>${flink.version}</version>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-scala_2.12</artifactId>
		<version>${flink.version}</version>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-java</artifactId>
		<version>${flink.version}</version>
		<scope>provided</scope>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-streaming-scala_2.12</artifactId>
		<version>${flink.version}</version>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-streaming-java_2.12</artifactId>
		<version>${flink.version}</version>
		<scope>provided</scope>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-table-api-scala-bridge_2.12</artifactId>
		<version>${flink.version}</version>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-table-api-java-bridge_2.12</artifactId>
		<version>${flink.version}</version>
	</dependency>
	<!-- blink執(zhí)行計(jì)劃,1.11+默認(rèn)的 -->
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-table-planner-blink_2.12</artifactId>
		<version>${flink.version}</version>
		<scope>provided</scope>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-table-common</artifactId>
		<version>${flink.version}</version>
	</dependency>
	<!-- flink連接器 -->
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-connector-kafka_2.12</artifactId>
		<version>${flink.version}</version>
		<scope>provided</scope>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-sql-connector-kafka_2.12</artifactId>
		<version>${flink.version}</version>
		<scope>provided</scope>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-connector-jdbc_2.12</artifactId>
		<version>${flink.version}</version>
		<scope>provided</scope>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-csv</artifactId>
		<version>${flink.version}</version>
	</dependency>
	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-json</artifactId>
		<version>${flink.version}</version>
	</dependency>

	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-connector-hive_2.12</artifactId>
		<version>${flink.version}</version>
		<scope>provided</scope>
	</dependency>
	<dependency>
		<groupId>org.apache.hive</groupId>
		<artifactId>hive-metastore</artifactId>
		<version>2.1.0</version>
	</dependency>
	<dependency>
		<groupId>org.apache.hive</groupId>
		<artifactId>hive-exec</artifactId>
		<version>3.1.2</version>
		<scope>provided</scope>
	</dependency>

	<dependency>
		<groupId>org.apache.flink</groupId>
		<artifactId>flink-shaded-hadoop-2-uber</artifactId>
		<version>2.7.5-10.0</version>
		<scope>provided</scope>
	</dependency>

	<dependency>
		<groupId>mysql</groupId>
		<artifactId>mysql-connector-java</artifactId>
		<version>5.1.38</version>
		<scope>provided</scope>
		<!--<version>8.0.20</version> -->
	</dependency>

	<!-- 日志 -->
	<dependency>
		<groupId>org.slf4j</groupId>
		<artifactId>slf4j-log4j12</artifactId>
		<version>1.7.7</version>
		<scope>runtime</scope>
	</dependency>
	<dependency>
		<groupId>log4j</groupId>
		<artifactId>log4j</artifactId>
		<version>1.2.17</version>
		<scope>runtime</scope>
	</dependency>

	<dependency>
		<groupId>com.alibaba</groupId>
		<artifactId>fastjson</artifactId>
		<version>1.2.44</version>
	</dependency>

	<dependency>
		<groupId>org.projectlombok</groupId>
		<artifactId>lombok</artifactId>
		<version>1.18.2</version>
		<scope>provided</scope>
	</dependency>

</dependencies>

3、使用 Table API 創(chuàng)建hive表并注冊(cè)到hivecatalog示例

用戶可以用編程的方式使用Java 或者 Scala 來(lái)創(chuàng)建 Catalog 表。

下文示例是以hivecatalog為例,關(guān)于更多的hivecatalog將在其他的專題中介紹。

需要說(shuō)明的是本示例運(yùn)行時(shí)需要將hadoop環(huán)境中的/usr/local/bigdata/hadoop-3.1.4/share/hadoop/common/hadoop-lzo-0.4.21-SNAPSHOT.jar復(fù)制一份到flink的lib目錄(/usr/local/bigdata/flink-1.13.5/lib),此處做法的原因是本人的hadoop環(huán)境中配置了lzo的壓縮方式。

hadoop的版本是3.1.4
hive的版本是3.1.2
flink的環(huán)境版本是1.3.6

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.catalog.hive.util.HiveTableUtil;
import org.apache.flink.types.Row;

/**
 * @author alanchan
 *
 */
public class TestHiveCatalogDemo {

	/**
	 * @param args
	 * @throws DatabaseNotExistException
	 * @throws CatalogException
	 * @throws DatabaseAlreadyExistException
	 * @throws TableAlreadyExistException
	 */
	public static void main(String[] args) throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);

		String name = "alan_hive";
		// testhive 數(shù)據(jù)庫(kù)名稱
		String defaultDatabase = "testhive";
		String hiveConfDir = "/usr/local/bigdata/apache-hive-3.1.2-bin/conf";

		HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir);
		tenv.registerCatalog("alan_hive", hiveCatalog);
		// 使用注冊(cè)的catalog
		tenv.useCatalog("alan_hive");

		List<String> tables = hiveCatalog.listTables(defaultDatabase);
		for (String table : tables) {
			System.out.println("Database:testhive  tables:" + table);
		}
//創(chuàng)建數(shù)據(jù)庫(kù)
//	    public CatalogDatabaseImpl(Map<String, String> properties, @Nullable String comment) {
//	        this.properties = checkNotNull(properties, "properties cannot be null");
//	        this.comment = comment;
//	    }
		Map<String, String> properties = new HashMap();
//		properties.put(CatalogConfig.IS_GENERIC, String.valueOf(true));
//		properties.put("connector", "COLLECTION");
		CatalogDatabase cd = new CatalogDatabaseImpl(properties, "this is new database,the name is alan_hivecatalog_hivedb");
		String newDatabaseName = "alan_hivecatalog_hivedb";

		hiveCatalog.createDatabase(newDatabaseName, cd, true);
//創(chuàng)建表
		String tableName = "alan_hivecatalog_hivedb_testTable";
		// public ObjectPath(String databaseName, String objectName)
		ObjectPath path = new ObjectPath(newDatabaseName, tableName);

//		public CatalogTableImpl( TableSchema tableSchema, Map<String, String> properties, String comment) 
		TableSchema schema = TableSchema.builder()
				.field("id", DataTypes.INT())
				.field("name", DataTypes.STRING())
				.field("age", DataTypes.INT())
				.build();
		
//		public CatalogTableImpl(TableSchema tableSchema, Map<String, String> properties, String comment) 
		CatalogTable catalogTable = new CatalogTableImpl(schema, properties, "this is table comment");
		hiveCatalog.createTable(path, catalogTable, true);

		List<String> newTables = hiveCatalog.listTables(newDatabaseName);
		for (String table : newTables) {
			System.out.println("Database:alan_hivecatalog_hivedb  tables:" + table);
		}
		
//插入數(shù)據(jù)
		String insertSQL = "insert into " + newDatabaseName + "." + tableName + " values (1,'alan',18)";
		tenv.executeSql(insertSQL);
// 查詢數(shù)據(jù)
		String selectSQL = "select * from " + newDatabaseName + "." + tableName;
		Table table = tenv.sqlQuery(selectSQL);
		table.printSchema();
		DataStream<Tuple2<Boolean, Row>> result = tenv.toRetractStream(table, Row.class);
		result.print();
		env.execute();
	}

}

4、使用 SQL語(yǔ)句 創(chuàng)建hive表并注冊(cè)到hivecatalog示例

本示例功能與上述的示例功能一樣,其區(qū)別是使用的實(shí)現(xiàn)方式不同,即一個(gè)是通過(guò)api建表,一個(gè)是通過(guò)sql建表。


import java.util.HashMap;
import java.util.Map;

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.types.Row;

/**
 * @author alanchan
 *
 */
public class TestCreateHiveTable {
	public static final String tableName = "alan_hivecatalog_hivedb_testTable";
	public static final String hive_create_table_sql = "CREATE  TABLE  " + tableName +  " (\n" + 
																					  "  id INT,\n" + 
																					  "  name STRING,\n" + 
																					  "  age INT" + ") " + 
																					  "TBLPROPERTIES (\n" + 
																					  "  'sink.partition-commit.delay'='5 s',\n" + 
																					  "  'sink.partition-commit.trigger'='partition-time',\n" + 
																					  "  'sink.partition-commit.policy.kind'='metastore,success-file'" + ")";

	/**
	 * @param args
	 * @throws DatabaseAlreadyExistException
	 * @throws CatalogException
	 */
	public static void main(String[] args) throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
		String hiveConfDir = "/usr/local/bigdata/apache-hive-3.1.2-bin/conf";
		String name = "alan_hive";
		// default 數(shù)據(jù)庫(kù)名稱
		String defaultDatabase = "default";

		HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir);
		tenv.registerCatalog("alan_hive", hiveCatalog);
		tenv.useCatalog("alan_hive");

//		Map<String, String> properties = new HashMap();
//		CatalogDatabase cd = new CatalogDatabaseImpl(properties, "this is new database,the name is alan_hivecatalog_hivedb");
		String newDatabaseName = "alan_hivecatalog_hivedb";
//		if (hiveCatalog.databaseExists(newDatabaseName)) {
//			hiveCatalog.dropDatabase(newDatabaseName, true);
//		}
//		hiveCatalog.createDatabase(newDatabaseName, cd, true);
		tenv.useDatabase(newDatabaseName);

		// 創(chuàng)建表
		tenv.getConfig().setSqlDialect(SqlDialect.HIVE);
//		if(hiveCatalog.tableExists( new ObjectPath(newDatabaseName, tableName))) {
//			hiveCatalog.dropTable( new ObjectPath(newDatabaseName, tableName), true);
//		}
		tenv.executeSql(hive_create_table_sql);

		// 插入數(shù)據(jù)
//		String insertSQL = "insert into " + tableName + " values (1,'alan',18)";
		String insertSQL = "insert into alan_hivecatalog_hivedb_testTable values (1,'alan',18)";
		tenv.executeSql(insertSQL);

		// 查詢數(shù)據(jù)
//		String selectSQL = "select * from " + tableName;
		String selectSQL = "select * from alan_hivecatalog_hivedb_testTable" ;
		Table table = tenv.sqlQuery(selectSQL);
		table.printSchema();
		DataStream<Tuple2<Boolean, Row>> result = tenv.toRetractStream(table, Row.class);
		result.print();
		env.execute();
	}

}

5、驗(yàn)證

本示例是在flink集群中以命令形式提交的任務(wù),其實(shí)通過(guò)web ui頁(yè)面提交任務(wù)一樣,不再贅述。

前提:
1、hadoop環(huán)境好用
2、hive環(huán)境好用
3、flink與hive集成環(huán)境完成且好用
4、啟動(dòng)flink集群,本文是以yarn-session形式啟動(dòng)的

1)、打包、上傳

pom.xml文件中配置打包插件

<build>
	<sourceDirectory>src/main/java</sourceDirectory>
	<plugins>
		<!-- 編譯插件 -->
		<plugin>
			<groupId>org.apache.maven.plugins</groupId>
			<artifactId>maven-compiler-plugin</artifactId>
			<version>3.5.1</version>
			<configuration>
				<source>1.8</source>
				<target>1.8</target>
				<!--<encoding>${project.build.sourceEncoding}</encoding> -->
			</configuration>
		</plugin>
		<plugin>
			<groupId>org.apache.maven.plugins</groupId>
			<artifactId>maven-surefire-plugin</artifactId>
			<version>2.18.1</version>
			<configuration>
				<useFile>false</useFile>
				<disableXmlReport>true</disableXmlReport>
				<includes>
					<include>**/*Test.*</include>
					<include>**/*Suite.*</include>
				</includes>
			</configuration>
		</plugin>
		<!-- 打包插件(會(huì)包含所有依賴) -->
		<plugin>
			<groupId>org.apache.maven.plugins</groupId>
			<artifactId>maven-shade-plugin</artifactId>
			<version>2.3</version>
			<executions>
				<execution>
					<phase>package</phase>
					<goals>
						<goal>shade</goal>
					</goals>
					<configuration>
						<filters>
							<filter>
								<artifact>*:*</artifact>
								<excludes>
									<!-- zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF -->
									<exclude>META-INF/*.SF</exclude>
									<exclude>META-INF/*.DSA</exclude>
									<exclude>META-INF/*.RSA</exclude>
								</excludes>
							</filter>
						</filters>
						<transformers>
							<transformer
								implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
								<!-- 設(shè)置jar包的入口類(可選) -->
								<mainClass> org.table_sql.TestCreateHiveTable</mainClass>
							</transformer>
						</transformers>
					</configuration>
				</execution>
			</executions>
		</plugin>
	</plugins>
</build>

在cmd中打包或在開(kāi)發(fā)工具中打包,本處是以cmd命令行打包


mvn package  -Dmaven.test.skip=true

# 直到看到
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  18.304 s

將打包后的jar文件上傳至flink集群中并運(yùn)行即可。

2)、提交任務(wù)

#文件位置 /usr/local/bigdata/flink-1.13.5/examples/table/table_sql-0.0.1-SNAPSHOT.jar
#如果配置了flink的環(huán)境變量直接運(yùn)行下面的命令;如果沒(méi)有配置flink的環(huán)境變量則需要切換到flink的bin目錄運(yùn)行下面命令
flink run  /usr/local/bigdata/flink-1.13.5/examples/table/table_sql-0.0.1-SNAPSHOT.jar  org.table_sql.TestHiveCatalogDemo

3)、驗(yàn)證

# 1、提交任務(wù)后運(yùn)行情況
[alanchan@server1 bin]$ flink run  /usr/local/bigdata/flink-1.13.5/examples/table/table_sql-0.0.1-SNAPSHOT.jar  org.table_sql.TestHiveCatalogDemo
2023-08-31 00:18:01,185 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Found Yarn properties file under /tmp/.yarn-properties-alanchan.
2023-08-31 00:18:01,185 INFO  org.apache.flink.yarn.cli.FlinkYarnSessionCli                [] - Found Yarn properties file under /tmp/.yarn-properties-alanchan.
Hive Session ID = 4c3ab8b5-d99e-4e2f-9362-fcbcae8047fa
Hive Session ID = d3fc6679-9b60-47a9-b9e7-d125e3240196
2023-08-31 00:18:07,578 WARN  org.apache.flink.yarn.configuration.YarnLogConfigUtil        [] - The configuration directory ('/usr/local/bigdata/flink-1.13.5/conf') already contains a LOG4J config file.If you want to use logback, then please delete or rename the log configuration file.
2023-08-31 00:18:07,778 INFO  org.apache.hadoop.yarn.client.AHSProxy                       [] - Connecting to Application History server at server1/192.168.10.41:10200
2023-08-31 00:18:07,787 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2023-08-31 00:18:07,860 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface server3:43480 of application 'application_1693286353898_0021'.
Job has been submitted with JobID 2161b431ad0310df06417a3232ca5e60
Hive Session ID = 90444eb0-7fc9-4ac9-adb1-44df145739c7
(
  `id` INT,
  `name` STRING,
  `age` INT
)
2023-08-31 00:18:17,806 WARN  org.apache.hadoop.hdfs.shortcircuit.DomainSocketFactory      [] - The short-circuit local reads feature cannot be used because libhadoop cannot be loaded.
2023-08-31 00:18:17,871 INFO  org.apache.hadoop.mapred.FileInputFormat                     [] - Total input files to process : 0
2023-08-31 00:18:18,115 INFO  org.apache.hadoop.yarn.client.AHSProxy                       [] - Connecting to Application History server at server1/192.168.10.41:10200
2023-08-31 00:18:18,116 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - No path for the flink jar passed. Using the location of class org.apache.flink.yarn.YarnClusterDescriptor to locate the jar
2023-08-31 00:18:18,119 INFO  org.apache.flink.yarn.YarnClusterDescriptor                  [] - Found Web Interface server3:43480 of application 'application_1693286353898_0021'.
Job has been submitted with JobID 16a85c80862dac9035c62563b39a9fb7
Program execution finished
Job with JobID 16a85c80862dac9035c62563b39a9fb7 has finished.
Job Runtime: 6652 ms

# 2、在flink sql cli中查詢表及其數(shù)據(jù)
Flink SQL> SET sql-client.execution.result-mode = tableau;
[INFO] Session property has been set.

Flink SQL> select * from alan_hivecatalog_hivedb_testtable;
+----+-------------+--------------------------------+-------------+
| op |          id |                           name |         age |
+----+-------------+--------------------------------+-------------+
| +I |           1 |                           alan |          18 |
+----+-------------+--------------------------------+-------------+
Received a total of 1 row

#以上,驗(yàn)證完畢

以上,本文演示了Flink 將表注冊(cè)到catalog中,其中用sql client展示了連接mysql,通過(guò)table api 和sql 演示了將表注冊(cè)到hivecatalog中。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-832679.html

到了這里,關(guān)于【flink番外篇】21、Flink 通過(guò)SQL client 和 table api注冊(cè)catalog示例的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 【flink番外篇】9、Flink Table API 支持的操作示例(5)- 表的列操作

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說(shuō)明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語(yǔ)、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年01月24日
    瀏覽(19)
  • 【flink番外篇】9、Flink Table API 支持的操作示例(14)- 時(shí)態(tài)表的join(java版本)

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說(shuō)明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語(yǔ)、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月02日
    瀏覽(20)
  • 【flink番外篇】15、Flink維表實(shí)戰(zhàn)之6種實(shí)現(xiàn)方式-通過(guò)Temporal table實(shí)現(xiàn)維表數(shù)據(jù)join

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說(shuō)明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語(yǔ)、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年01月20日
    瀏覽(28)
  • 【flink番外篇】9、Flink Table API 支持的操作示例(11)- Group Windows(tumbling、sliding和session)操作

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說(shuō)明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語(yǔ)、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月01日
    瀏覽(31)
  • 【flink番外篇】9、Flink Table API 支持的操作示例(7)- 表的join操作(內(nèi)聯(lián)接、外聯(lián)接以及聯(lián)接自定義函數(shù)等)

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說(shuō)明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語(yǔ)、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年01月19日
    瀏覽(26)
  • 【flink番外篇】9、Flink Table API 支持的操作示例(12)- Over Windows(有界和無(wú)界的over window)操作

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說(shuō)明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語(yǔ)、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月02日
    瀏覽(27)
  • 【flink番外篇】9、Flink Table API 支持的操作示例(6)- 表的聚合(group by、Distinct、GroupBy/Over Window Aggregation)操作

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說(shuō)明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語(yǔ)、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月02日
    瀏覽(25)
  • 【flink番外篇】16、DataStream 和 Table 相互轉(zhuǎn)換示例

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說(shuō)明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語(yǔ)、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年01月17日
    瀏覽(27)
  • 【flink番外篇】19、Datastream數(shù)據(jù)類型到Table schema映射示例

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說(shuō)明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語(yǔ)、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年01月21日
    瀏覽(30)
  • 【flink番外篇】20、DataStream 和 Table集成-Changelog Streams變化流示例

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識(shí)點(diǎn),并輔以具體的示例進(jìn)行說(shuō)明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語(yǔ)、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年01月25日
    瀏覽(27)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包