国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

24、Flink 的table api與sql之Catalogs(java api操作視圖)-3

這篇具有很好參考價(jià)值的文章主要介紹了24、Flink 的table api與sql之Catalogs(java api操作視圖)-3。希望對大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

Flink 系列文章

一、Flink 專欄

Flink 專欄系統(tǒng)介紹某一知識點(diǎn),并輔以具體的示例進(jìn)行說明。

  • 1、Flink 部署系列
    本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。

  • 2、Flink基礎(chǔ)系列
    本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。

  • 3、Flik Table API和SQL基礎(chǔ)系列
    本部分介紹Flink Table Api和SQL的基本用法,比如Table API和SQL創(chuàng)建庫、表用法、查詢、窗口函數(shù)、catalog等等內(nèi)容。

  • 4、Flik Table API和SQL提高與應(yīng)用系列
    本部分是table api 和sql的應(yīng)用部分,和實(shí)際的生產(chǎn)應(yīng)用聯(lián)系更為密切,以及有一定開發(fā)難度的內(nèi)容。

  • 5、Flink 監(jiān)控系列
    本部分和實(shí)際的運(yùn)維、監(jiān)控工作相關(guān)。

二、Flink 示例專欄

Flink 示例專欄是 Flink 專欄的輔助說明,一般不會(huì)介紹知識點(diǎn)的信息,更多的是提供一個(gè)一個(gè)可以具體使用的示例。本專欄不再分目錄,通過鏈接即可看出介紹的內(nèi)容。

兩專欄的所有文章入口點(diǎn)擊:Flink 系列文章匯總索引



本文簡單介紹了通過java api操作視圖,提供了三個(gè)示例,即sql實(shí)現(xiàn)和java api的兩種實(shí)現(xiàn)方式。
本文依賴flink和hive、hadoop集群能正常使用。
本文示例java api的實(shí)現(xiàn)是通過Flink 1.13.5版本做的示例,SQL 如果沒有特別說明則是Flink 1.17版本。

五、Catalog API

3、視圖操作

1)、官方示例

// create view
catalog.createTable(new ObjectPath("mydb", "myview"), new CatalogViewImpl(...), false);

// drop view
catalog.dropTable(new ObjectPath("mydb", "myview"), false);

// alter view
catalog.alterTable(new ObjectPath("mydb", "mytable"), new CatalogViewImpl(...), false);

// rename view
catalog.renameTable(new ObjectPath("mydb", "myview"), "my_new_view", false);

// get view
catalog.getTable("myview");

// check if a view exist or not
catalog.tableExists("mytable");

// list views in a database
catalog.listViews("mydb");

2)、SQL創(chuàng)建HIVE 視圖示例

1、maven依賴
properties>
		<encoding>UTF-8</encoding>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<maven.compiler.source>1.8</maven.compiler.source>
		<maven.compiler.target>1.8</maven.compiler.target>
		<java.version>1.8</java.version>
		<scala.version>2.12</scala.version>
		<flink.version>1.13.6</flink.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-clients_2.11</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-scala_2.11</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-java</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope> 
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-streaming-scala_2.11</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-streaming-java_2.11</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-api-scala-bridge_2.11</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-api-java-bridge_2.11</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<!-- blink執(zhí)行計(jì)劃,1.11+默認(rèn)的 -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-planner-blink_2.11</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope> 
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-table-common</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<!-- flink連接器 -->
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-kafka_2.12</artifactId>
			<version>${flink.version}</version>
			<!-- <scope>provided</scope> -->
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-sql-connector-kafka_2.12</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-jdbc_2.12</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-csv</artifactId>
			<version>${flink.version}</version>
		</dependency>
		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-json</artifactId>
			<version>${flink.version}</version>
		</dependency>

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-connector-hive_2.12</artifactId>
			<version>${flink.version}</version>
			<scope>provided</scope> 
		</dependency>
		<dependency>
			<groupId>org.apache.hive</groupId>
			<artifactId>hive-metastore</artifactId>
			<version>2.1.0</version>
		</dependency>
		<dependency>
			<groupId>org.apache.hive</groupId>
			<artifactId>hive-exec</artifactId>
			<version>3.1.2</version>
			<scope>provided</scope> 
		</dependency>

		<dependency>
			<groupId>org.apache.flink</groupId>
			<artifactId>flink-shaded-hadoop-2-uber</artifactId>
			<version>2.7.5-10.0</version>
			<!-- <scope>provided</scope> -->
		</dependency>

		<dependency>
			<groupId>mysql</groupId>
			<artifactId>mysql-connector-java</artifactId>
			<version>5.1.38</version>
			<scope>provided</scope>
			<!--<version>8.0.20</version> -->
		</dependency>

		<!-- 日志 -->
		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-log4j12</artifactId>
			<version>1.7.7</version>
			<scope>runtime</scope>
		</dependency>
		<dependency>
			<groupId>log4j</groupId>
			<artifactId>log4j</artifactId>
			<version>1.2.17</version>
			<scope>runtime</scope>
		</dependency>

		<dependency>
			<groupId>com.alibaba</groupId>
			<artifactId>fastjson</artifactId>
			<version>1.2.44</version>
		</dependency>

		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<version>1.18.2</version>
			<!-- <scope>provided</scope> -->
		</dependency>

	</dependencies>

	<build>
		<sourceDirectory>src/main/java</sourceDirectory>
		<plugins>
			<!-- 編譯插件 -->
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-compiler-plugin</artifactId>
				<version>3.5.1</version>
				<configuration>
					<source>1.8</source>
					<target>1.8</target>
					<!--<encoding>${project.build.sourceEncoding}</encoding> -->
				</configuration>
			</plugin>
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-surefire-plugin</artifactId>
				<version>2.18.1</version>
				<configuration>
					<useFile>false</useFile>
					<disableXmlReport>true</disableXmlReport>
					<includes>
						<include>**/*Test.*</include>
						<include>**/*Suite.*</include>
					</includes>
				</configuration>
			</plugin>
			<!-- 打包插件(會(huì)包含所有依賴) -->
			<plugin>
				<groupId>org.apache.maven.plugins</groupId>
				<artifactId>maven-shade-plugin</artifactId>
				<version>2.3</version>
				<executions>
					<execution>
						<phase>package</phase>
						<goals>
							<goal>shade</goal>
						</goals>
						<configuration>
							<filters>
								<filter>
									<artifact>*:*</artifact>
									<excludes>
										<!-- zip -d learn_spark.jar META-INF/*.RSA META-INF/*.DSA META-INF/*.SF -->
										<exclude>META-INF/*.SF</exclude>
										<exclude>META-INF/*.DSA</exclude>
										<exclude>META-INF/*.RSA</exclude>
									</excludes>
								</filter>
							</filters>
							<transformers>
								<transformer
									implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
									<!-- 設(shè)置jar包的入口類(可選) -->
									<mainClass> org.table_sql.TestHiveViewBySQLDemo</mainClass>
								</transformer>
							</transformers>
						</configuration>
					</execution>
				</executions>
			</plugin>
		</plugins>
	</build>
2、代碼
package org.table_sql;

import java.util.HashMap;
import java.util.List;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.CatalogView;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.module.hive.HiveModule;
import org.apache.flink.types.Row;
import org.apache.flink.util.CollectionUtil;

/**
 * @author alanchan
 *
 */
public class TestHiveViewBySQLDemo {
	public static final String tableName = "viewtest";
	public static final String hive_create_table_sql = "CREATE  TABLE  " + tableName +  " (\n" + 
			  "  id INT,\n" + 
			  "  name STRING,\n" + 
			  "  age INT" + ") " + 
			  "TBLPROPERTIES (\n" + 
			  "  'sink.partition-commit.delay'='5 s',\n" + 
			  "  'sink.partition-commit.trigger'='partition-time',\n" + 
			  "  'sink.partition-commit.policy.kind'='metastore,success-file'" + ")";

	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);

		String moduleName = "myhive";
		String hiveVersion = "3.1.2";
		tenv.loadModule(moduleName, new HiveModule(hiveVersion));

		String name = "alan_hive";
		String defaultDatabase = "default";
		String databaseName = "viewtest_db";
		String hiveConfDir = "/usr/local/bigdata/apache-hive-3.1.2-bin/conf";

		HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir);
		tenv.registerCatalog(name, hiveCatalog);
		tenv.useCatalog(name);
		tenv.listDatabases();
		hiveCatalog.createDatabase(databaseName, new CatalogDatabaseImpl(new HashMap(), hiveConfDir) {
		}, true);

//		tenv.executeSql("create database "+databaseName);
		tenv.useDatabase(databaseName);

		// 創(chuàng)建第一個(gè)視圖viewName_byTable
		String selectSQL = "select * from " + tableName;
		String viewName_byTable = "test_view_table_V";
		String createViewSQL = "create view " + viewName_byTable + " as " + selectSQL;

		tenv.getConfig().setSqlDialect(SqlDialect.HIVE);
		tenv.executeSql(hive_create_table_sql);

//		tenv.getConfig().setSqlDialect(SqlDialect.DEFAULT);

		String insertSQL = "insert into " + tableName + " values (1,'alan',18)";
		tenv.executeSql(insertSQL);

		tenv.executeSql(createViewSQL);
		tenv.listViews();

		CatalogView catalogView = (CatalogView) hiveCatalog.getTable(new ObjectPath(databaseName, viewName_byTable));

		List<Row> results = CollectionUtil.iteratorToList(tenv.executeSql("select * from " + viewName_byTable).collect());

		for (Row row : results) {
			System.out.println("test_view_table_V: " + row.toString());
		}

		// 創(chuàng)建第二個(gè)視圖
		String viewName_byView = "test_view_view_V";
		tenv.executeSql("create view " + viewName_byView + " (v2_id,v2_name,v2_age) comment 'test_view_view_V comment' as select * from " + viewName_byTable);
		catalogView = (CatalogView) hiveCatalog.getTable(new ObjectPath(databaseName, viewName_byView));

		results = CollectionUtil.iteratorToList(tenv.executeSql("select * from " + viewName_byView).collect());
		System.out.println("test_view_view_V comment : " + catalogView.getComment());

		for (Row row : results) {
			System.out.println("test_view_view_V : " + row.toString());
		}
		tenv.executeSql("drop database " + databaseName + " cascade");
	}

}

3、運(yùn)行結(jié)果

前提是flink的集群可用。使用maven打包成jar。

[alanchan@server2 bin]$ flink run  /usr/local/bigdata/flink-1.13.5/examples/table/table_sql-0.0.2-SNAPSHOT.jar

Hive Session ID = ed6d5c9b-e00f-4881-840d-24c72aba6db7
Hive Session ID = 14445dc8-1f08-4f0f-bb45-aba8c6f52174
Job has been submitted with JobID bff7b59367bd5de6e778b442c4cc4404
Hive Session ID = 4c16f4fc-4c10-4353-b322-e6633e3ebe3d
Hive Session ID = 57949f09-bdcb-497f-a85c-ed9766fc4ce3
2023-10-13 02:42:24,891 INFO  org.apache.hadoop.mapred.FileInputFormat                     [] - Total input files to process : 0
Job has been submitted with JobID 80e48bb76e3d580412fdcdc434a8a979
test_view_table_V: +I[1, alan, 18]
Hive Session ID = a73d5b93-2129-4159-ad5e-0814df77e987
Hive Session ID = e4ae1a79-4d5e-4835-81de-ebc2041eedf9
2023-10-13 02:42:33,648 INFO  org.apache.hadoop.mapred.FileInputFormat                     [] - Total input files to process : 1
Job has been submitted with JobID c228d9ce3bdce91dc68bff75d14db1e5
test_view_view_V comment : test_view_view_V comment
test_view_view_V : +I[1, alan, 18]
Hive Session ID = e4a38393-d760-4bd3-8d8b-864cbe0daba7

3)、API創(chuàng)建Hive 視圖示例

通過api創(chuàng)建視圖相對比較麻煩,且存在版本更新的過期方法情況。
通過TableSchema和CatalogViewImpl創(chuàng)建視圖則已過期,當(dāng)前推薦使用通過CatalogView和ResolvedSchema來創(chuàng)建視圖。
另外需要注意的是下面兩個(gè)參數(shù)的區(qū)別
String originalQuery,原始的sql
String expandedQuery,帶有數(shù)據(jù)庫名稱的表,甚至包含hivecatalog

例如:如果使用default作為默認(rèn)的數(shù)據(jù)庫,查詢語句為select * from test1,則
originalQuery = ”select name,value from test1“即可,
expandedQuery = “selecttest1.name, test1.value from default.test1

修改、刪除視圖等操作比較簡單,不再贅述。

1、maven依賴

此處使用的依賴與上示例一致,mainclass變成本示例的類,不再贅述。

2、代碼
import static org.apache.flink.util.Preconditions.checkNotNull;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.api.SqlDialect;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
import org.apache.flink.table.catalog.CatalogView;
import org.apache.flink.table.catalog.CatalogViewImpl;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.ResolvedCatalogView;
import org.apache.flink.table.catalog.ResolvedSchema;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.hive.HiveCatalog;
import org.apache.flink.table.module.hive.HiveModule;
import org.apache.flink.types.Row;
import org.apache.flink.util.CollectionUtil;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.Column;

/**
 * @author alanchan
 *
 */
public class TestHiveViewByAPIDemo {
	public static final String tableName = "viewtest";
	public static final String hive_create_table_sql = "CREATE  TABLE  " + tableName +  " (\n" + 
			  "  id INT,\n" + 
			  "  name STRING,\n" + 
			  "  age INT" + ") " + 
			  "TBLPROPERTIES (\n" + 
			  "  'sink.partition-commit.delay'='5 s',\n" + 
			  "  'sink.partition-commit.trigger'='partition-time',\n" + 
			  "  'sink.partition-commit.policy.kind'='metastore,success-file'" + ")";

	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
		System.setProperty("HADOOP_USER_NAME", "alanchan");
		String moduleName = "myhive";
		String hiveVersion = "3.1.2";
		tenv.loadModule(moduleName, new HiveModule(hiveVersion));

		String catalogName = "alan_hive";
		String defaultDatabase = "default";
		String databaseName = "viewtest_db";
		String hiveConfDir = "/usr/local/bigdata/apache-hive-3.1.2-bin/conf";

		HiveCatalog hiveCatalog = new HiveCatalog(catalogName, defaultDatabase, hiveConfDir);
		tenv.registerCatalog(catalogName, hiveCatalog);
		tenv.useCatalog(catalogName);
		tenv.listDatabases();
		
		hiveCatalog.createDatabase(databaseName, new CatalogDatabaseImpl(new HashMap(), hiveConfDir) {
		}, true);

//		tenv.executeSql("create database "+databaseName);
		tenv.useDatabase(databaseName);

		tenv.getConfig().setSqlDialect(SqlDialect.HIVE);
		tenv.executeSql(hive_create_table_sql);
		String insertSQL = "insert into " + tableName + " values (1,'alan',18)";
		String insertSQL2 = "insert into " + tableName + " values (2,'alan2',19)";
		String insertSQL3 = "insert into " + tableName + " values (3,'alan3',20)";
		tenv.executeSql(insertSQL);
		tenv.executeSql(insertSQL2);
		tenv.executeSql(insertSQL3);
		
		tenv.getConfig().setSqlDialect(SqlDialect.DEFAULT);
		String viewName1 = "test_view_table_V";
		String viewName2 = "test_view_table_V2";
		
		ObjectPath path1= new ObjectPath(databaseName, viewName1);
		//ObjectPath.fromString("viewtest_db.test_view_table_V2")
		ObjectPath path2= new ObjectPath(databaseName, viewName2);
		
		String originalQuery = "SELECT id, name, age FROM "+tableName+" WHERE id >=1 ";
//		String originalQuery = String.format("select * from %s",tableName+" WHERE id >=1 ");
		System.out.println("originalQuery:"+originalQuery);
		String expandedQuery = "SELECT  id, name, age FROM "+databaseName+"."+tableName+"  WHERE id >=1 ";		
//		String expandedQuery = String.format("select * from %s.%s", catalogName, path1.getFullName());
		System.out.println("expandedQuery:"+expandedQuery);
		String comment = "this is a comment";
		
		// 創(chuàng)建視圖,第一種方式(通過TableSchema和CatalogViewImpl),已聲明過期	
		createView1(originalQuery,expandedQuery,comment,hiveCatalog,path1);
		// 查詢視圖
		List<Row> results = CollectionUtil.iteratorToList( tenv.executeSql("select * from " + viewName1).collect());
		for (Row row : results) {
			System.out.println("test_view_table_V: " + row.toString());
		}
		
		// 創(chuàng)建視圖,第二種方式(通過Schema和ResolvedSchema)
		createView2(originalQuery,expandedQuery,comment,hiveCatalog,path2);
		
		List<Row> results2 = CollectionUtil.iteratorToList( tenv.executeSql("select * from viewtest_db.test_view_table_V2").collect());
		for (Row row : results2) {
			System.out.println("test_view_table_V2: " + row.toString());
		}

		tenv.executeSql("drop database " + databaseName + " cascade");
	}
	
	static void createView1(String originalQuery,String expandedQuery,String comment,HiveCatalog hiveCatalog,ObjectPath path) throws Exception {
		TableSchema viewSchema = new TableSchema(new String[]{"id", "name","age"}, new TypeInformation[]{Types.INT, Types.STRING,Types.INT});
		CatalogBaseTable viewTable = new CatalogViewImpl(
				originalQuery,
				expandedQuery,
				viewSchema, 
				new HashMap(),
				comment);
		hiveCatalog.createTable(path, viewTable, false);
	}
	
	static void createView2(String originalQuery,String expandedQuery,String comment,HiveCatalog hiveCatalog,ObjectPath path) throws Exception {
		ResolvedSchema resolvedSchema = new ResolvedSchema(
                Arrays.asList(
                        Column.physical("id", DataTypes.INT()),
                        Column.physical("name", DataTypes.STRING()),
                        Column.physical("age", DataTypes.INT())),
                Collections.emptyList(),
                null);
		
		 CatalogView origin =  CatalogView.of(
	                        Schema.newBuilder().fromResolvedSchema(resolvedSchema).build(),
	                        comment,
//	                        String.format("select * from tt"),
//	                        String.format("select * from %s.%s", TEST_CATALOG_NAME, path1.getFullName()),
	                        originalQuery,
	                        expandedQuery,
	                        Collections.emptyMap());
			CatalogView view = new ResolvedCatalogView(origin, resolvedSchema);
//			ObjectPath.fromString("viewtest_db.test_view_table_V2")
		hiveCatalog.createTable(path, view, false);
		
	}

}
3、運(yùn)行結(jié)果
[alanchan@server2 bin]$ flink run  /usr/local/bigdata/flink-1.13.5/examples/table/table_sql-0.0.3-SNAPSHOT.jar

Hive Session ID = ab4d159a-b2d3-489e-988f-eebdc43d9517
Hive Session ID = 391de19c-5d5a-4a83-a88c-c43cca71fc63
Job has been submitted with JobID a880510032165523f3f2a559c5ab4ec9
Hive Session ID = cb063c31-eaf2-44e3-8fc0-9e8d2a6a3a5d
Job has been submitted with JobID cb05286c404b561306f8eb3969c3456a
Hive Session ID = 8132b36e-c9e2-41a2-8f42-3fe842e0991f
Job has been submitted with JobID 264aef7da1b17598bda159d946827dea
Hive Session ID = 7657be14-8188-4362-84a9-4c84c596021b
2023-10-16 07:21:19,073 INFO  org.apache.hadoop.mapred.FileInputFormat                     [] - Total input files to process : 3
Job has been submitted with JobID 05c2bb7265b0430cb12e00237f18444b
test_view_table_V: +I[1, alan, 18]
test_view_table_V: +I[2, alan2, 19]
test_view_table_V: +I[3, alan3, 20]
Hive Session ID = 7bb01c0d-03c9-413a-9040-c89676cec3b9
2023-10-16 07:21:27,512 INFO  org.apache.hadoop.mapred.FileInputFormat                     [] - Total input files to process : 3
Job has been submitted with JobID 79130d1fe56d88a784980d16e7f1cfb4
test_view_table_V2: +I[1, alan, 18]
test_view_table_V2: +I[2, alan2, 19]
test_view_table_V2: +I[3, alan3, 20]
Hive Session ID = 6d44ea95-f733-4c56-8da4-e2687a4bf945

本文簡單介紹了通過java api操作視圖,提供了三個(gè)示例,即sql實(shí)現(xiàn)和java api的兩種實(shí)現(xiàn)方式。文章來源地址http://www.zghlxwxcb.cn/news/detail-723787.html

到了這里,關(guān)于24、Flink 的table api與sql之Catalogs(java api操作視圖)-3的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 【flink番外篇】9、Flink Table API 支持的操作示例(14)- 時(shí)態(tài)表的join(java版本)

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識點(diǎn),并輔以具體的示例進(jìn)行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月02日
    瀏覽(20)
  • Flink Table API 與 SQL 編程整理

    Flink Table API 與 SQL 編程整理

    Flink API 總共分為 4 層這里主要整理 Table API 的使用 Table API 是流處理和批處理通用的關(guān)系型 API , Table API 可以基于流輸入或者批輸入來運(yùn)行而不需要進(jìn)行任何修改。 Table API 是 SQL 語言的超集并專門為 Apache Flink 設(shè)計(jì)的, Table API 是 Scala 和 Java 語言集成式的 API 。與常規(guī) SQL 語言

    2024年02月04日
    瀏覽(24)
  • 【flink番外篇】9、Flink Table API 支持的操作示例(4)- Table API 對表的查詢、過濾操作

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識點(diǎn),并輔以具體的示例進(jìn)行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年01月17日
    瀏覽(20)
  • Flink Table API/SQL 多分支sink

    在某個(gè)場景中,需要從Kafka中獲取數(shù)據(jù),經(jīng)過轉(zhuǎn)換處理后,需要同時(shí)sink到多個(gè)輸出源中(kafka、mysql、hologres)等。兩次調(diào)用execute, 阿里云Flink vvr引擎報(bào)錯(cuò): 使用 StreamStatementSet. 具體參考官網(wǎng): https://nightlies.apache.org/flink/flink-docs-release-1.15/zh/docs/dev/table/data_stream_api/#converting-betwe

    2024年02月11日
    瀏覽(23)
  • 【Flink SQL】Flink SQL 基礎(chǔ)概念(一):SQL & Table 運(yùn)行環(huán)境、基本概念及常用 API

    《 Flink SQL 基礎(chǔ)概念 》系列,共包含以下 5 篇文章: Flink SQL 基礎(chǔ)概念(一):SQL Table 運(yùn)行環(huán)境、基本概念及常用 API Flink SQL 基礎(chǔ)概念(二):數(shù)據(jù)類型 Flink SQL 基礎(chǔ)概念(三):SQL 動(dòng)態(tài)表 連續(xù)查詢 Flink SQL 基礎(chǔ)概念(四):SQL 的時(shí)間屬性 Flink SQL 基礎(chǔ)概念(五):SQL 時(shí)區(qū)問

    2024年03月21日
    瀏覽(99)
  • Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】

    Flink(十五)【Flink SQL Connector、savepoint、CateLog、Table API】

    ? ? ? ?今天一天爭取搞完最后這一部分,學(xué)完趕緊把 Kafka 和 Flume 學(xué)完,就要開始做實(shí)時(shí)數(shù)倉了。據(jù)說是應(yīng)屆生得把實(shí)時(shí)數(shù)倉搞個(gè) 80%~90% 才能差不多找個(gè)工作,太牛馬了。 ????????之前我們已經(jīng)用過了一些簡單的內(nèi)置連接器,比如 \\\'datagen\\\' 、\\\'print\\\' ,其它的可以查看官網(wǎng):

    2024年01月24日
    瀏覽(52)
  • 《十堂課學(xué)習(xí) Flink》第五章:Table API 以及 Flink SQL 入門

    《十堂課學(xué)習(xí) Flink》第五章:Table API 以及 Flink SQL 入門

    第四章中介紹了 DataStream API 以及 DataSet API 的入門案例,本章開始介紹 Table API 以及基于此的高層應(yīng)用 Flink SQL 的基礎(chǔ)。 Flink 提供了兩個(gè)關(guān)系A(chǔ)PI——Table API 和 SQL——用于統(tǒng)一的流和批處理。Table API 是一種針對Java、Scala和Python的語言集成查詢API,它允許以非常直觀的方式組合來

    2024年02月03日
    瀏覽(48)
  • 17、Flink 之Table API: Table API 支持的操作(1)

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識點(diǎn),并輔以具體的示例進(jìn)行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月03日
    瀏覽(16)
  • Flink系列Table API和SQL之:時(shí)間屬性

    基于時(shí)間的操作(比如時(shí)間窗口),需要定義相關(guān)的時(shí)間語義和時(shí)間數(shù)據(jù)來源的信息。在Table API和SQL中,會(huì)給表單獨(dú)提供一個(gè)邏輯上的時(shí)間字段,專門用來在表處理程序中指示時(shí)間。 所謂的時(shí)間屬性(time attributes),就是每個(gè)表模式結(jié)構(gòu)(schema)的一部分??梢栽趧?chuàng)建表的DDL里直接定

    2023年04月09日
    瀏覽(37)
  • Flink(十三)Flink 的table api與sql的基本概念、通用api介紹及入門示例

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識點(diǎn),并輔以具體的示例進(jìn)行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月15日
    瀏覽(23)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包