国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及Apache Hive示例(6)

這篇具有很好參考價值的文章主要介紹了16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及Apache Hive示例(6)。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

Flink 系列文章

一、Flink 專欄

Flink 專欄系統(tǒng)介紹某一知識點,并輔以具體的示例進行說明。

  • 1、Flink 部署系列
    本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。

  • 2、Flink基礎(chǔ)系列
    本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。

  • 3、Flik Table API和SQL基礎(chǔ)系列
    本部分介紹Flink Table Api和SQL的基本用法,比如Table API和SQL創(chuàng)建庫、表用法、查詢、窗口函數(shù)、catalog等等內(nèi)容。

  • 4、Flik Table API和SQL提高與應(yīng)用系列
    本部分是table api 和sql的應(yīng)用部分,和實際的生產(chǎn)應(yīng)用聯(lián)系更為密切,以及有一定開發(fā)難度的內(nèi)容。

  • 5、Flink 監(jiān)控系列
    本部分和實際的運維、監(jiān)控工作相關(guān)。

二、Flink 示例專欄

Flink 示例專欄是 Flink 專欄的輔助說明,一般不會介紹知識點的信息,更多的是提供一個一個可以具體使用的示例。本專欄不再分目錄,通過鏈接即可看出介紹的內(nèi)容。

兩專欄的所有文章入口點擊:Flink 系列文章匯總索引



本文介紹了Apache Hive連接器的使用,以具體的示例演示了通過java和flink sql cli創(chuàng)建catalog。
本文依賴環(huán)境是hadoop、zookeeper、hive、flink環(huán)境好用,本文內(nèi)容以flink1.17版本進行介紹的,具體示例是在1.13版本中運行的(因為hadoop集群環(huán)境是基于jdk8的,flink1.17版本需要jdk11)。
更多的內(nèi)容詳見后續(xù)關(guān)于hive的介紹。

一、Table & SQL Connectors 示例: Apache Hive

Apache Hive 已經(jīng)成為了數(shù)據(jù)倉庫生態(tài)系統(tǒng)中的核心。 它不僅僅是一個用于大數(shù)據(jù)分析和ETL場景的SQL引擎,同樣它也是一個數(shù)據(jù)管理平臺,可用于發(fā)現(xiàn),定義,和演化數(shù)據(jù)。

Flink 與 Hive 的集成包含兩個層面。

一是利用了 Hive 的 MetaStore 作為持久化的 Catalog,用戶可通過HiveCatalog將不同會話中的 Flink 元數(shù)據(jù)存儲到 Hive Metastore 中。 例如,用戶可以使用HiveCatalog將其 Kafka 表或 Elasticsearch 表存儲在 Hive Metastore 中,并后續(xù)在 SQL 查詢中重新使用它們。

二是利用 Flink 來讀寫 Hive 的表。

HiveCatalog的設(shè)計提供了與 Hive 良好的兼容性,用戶可以"開箱即用"的訪問其已有的 Hive 數(shù)倉。 您不需要修改現(xiàn)有的 Hive Metastore,也不需要更改表的數(shù)據(jù)位置或分區(qū)。

1、支持的Hive版本

Flink 支持以下的 Hive 版本。

  • 2.3
    2.3.0
    2.3.1
    2.3.2
    2.3.3
    2.3.4
    2.3.5
    2.3.6
    2.3.7
    2.3.8
    2.3.9
  • 3.1
    3.1.0
    3.1.1
    3.1.2
    3.1.3

某些功能是否可用取決于您使用的 Hive 版本,這些限制不是由 Flink 所引起的:

  • Hive 內(nèi)置函數(shù)在使用 Hive-2.3.0 及更高版本時支持。
  • 列約束,也就是 PRIMARY KEY 和 NOT NULL,在使用 Hive-3.1.0 及更高版本時支持。
  • 更改表的統(tǒng)計信息,在使用 Hive-2.3.0 及更高版本時支持。
  • DATE列統(tǒng)計信息,在使用 Hive-2.3.0 及更高版時支持。

2、依賴項

要與 Hive 集成,您需要在 Flink 下的/lib/目錄中添加一些額外的依賴包, 以便通過 Table API 或 SQL Client 與 Hive 進行交互。 或者,您可以將這些依賴項放在專用文件夾中,并分別使用 Table API 程序或 SQL Client 的-C或-l選項將它們添加到 classpath 中。

Apache Hive 是基于 Hadoop 之上構(gòu)建的, 首先您需要 Hadoop 的依賴,請參考 Providing Hadoop classes:

export HADOOP_CLASSPATH=`hadoop classpath`

有兩種添加 Hive 依賴項的方法。第一種是使用 Flink 提供的 Hive Jar包。您可以根據(jù)使用的 Metastore 的版本來選擇對應(yīng)的 Hive jar。第二個方式是分別添加每個所需的 jar 包。如果您使用的 Hive 版本尚未在此處列出,則第二種方法會更適合。

注意:建議您優(yōu)先使用 Flink 提供的 Hive jar 包。僅在 Flink 提供的 Hive jar 不滿足您的需求時,再考慮使用分開添加 jar 包的方式。

1)、使用 Flink 提供的 Hive jar

下表列出了所有可用的 Hive jar。您可以選擇一個并放在 Flink 發(fā)行版的/lib/ 目錄中。
16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及Apache Hive示例(6),# Flink專欄,flink,sql,apache,flink 流批一體化,flink sql,flink hive,flink hadoop

2)、用戶定義的依賴項

您可以在下方找到不同Hive主版本所需要的依賴項。

  • Hive 2.3.4
/flink-1.17.1
   /lib

       // Flink's Hive connector.Contains flink-hadoop-compatibility and flink-orc jars
       flink-connector-hive_2.12-1.17.1.jar

       // Hive dependencies
       hive-exec-2.3.4.jar

       // add antlr-runtime if you need to use hive dialect
       antlr-runtime-3.5.2.jar
  • Hive 3.1.0
/flink-1.17.1
   /lib

       // Flink's Hive connector
       flink-connector-hive_2.12-1.17.1.jar

       // Hive dependencies
       hive-exec-3.1.0.jar
       libfb303-0.9.3.jar // libfb303 is not packed into hive-exec in some versions, need to add it separately

       // add antlr-runtime if you need to use hive dialect
       antlr-runtime-3.5.2.jar

3)、移動 planner jar 包

把 FLINK_HOME/opt 下的 jar 包 flink-table-planner_2.12-1.17.1.jar 移動到 FLINK_HOME/lib 下,并且將 FLINK_HOME/lib 下的 jar 包 flink-table-planner-loader-1.17.1.jar 移出去。 具體原因請參見 FLINK-25128。你可以使用如下命令來完成移動 planner jar 包的工作:

mv $FLINK_HOME/opt/flink-table-planner_2.12-1.17.1.jar $FLINK_HOME/lib/flink-table-planner_2.12-1.17.1.jar
mv $FLINK_HOME/lib/flink-table-planner-loader-1.17.1.jar $FLINK_HOME/opt/flink-table-planner-loader-1.17.1.jar

只有當(dāng)要使用 Hive 語法 或者 HiveServer2 endpoint, 你才需要做上述的 jar 包移動。 但是在集成 Hive 的時候,推薦進行上述的操作。

3、Maven 依賴

如果您在構(gòu)建自己的應(yīng)用程序,則需要在 mvn 文件中添加以下依賴項。 您應(yīng)該在運行時添加以上的這些依賴項,而不要在已生成的 jar 文件中去包含它們。

<!-- Flink Dependency -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-hive_2.12</artifactId>
  <version>1.17.1</version>
  <scope>provided</scope>
</dependency>

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-api-java-bridge_2.12</artifactId>
  <version>1.17.1</version>
  <scope>provided</scope>
</dependency>

<!-- Hive Dependency -->
<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-exec</artifactId>
    <version>${hive.version}</version>
    <scope>provided</scope>
</dependency>

4、連接到Hive

通過 TableEnvironment 或者 YAML 配置,使用 Catalog 接口 和 HiveCatalog連接到現(xiàn)有的 Hive 集群。

以下是如何連接到 Hive 的示例:

  • java
EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
TableEnvironment tableEnv = TableEnvironment.create(settings);

String name            = "myhive";
String defaultDatabase = "mydatabase";
String hiveConfDir     = "/opt/hive-conf";

HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
tableEnv.registerCatalog("myhive", hive);

// set the HiveCatalog as the current catalog of the session
tableEnv.useCatalog("myhive");

----------------------示例----------------------------
import java.util.List;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.hive.HiveCatalog;

/**
 * @author alanchan
 *
 */
public class TestHiveCatalogDemo {

	/**
	 * @param args
	 * @throws DatabaseNotExistException 
	 * @throws CatalogException 
	 */
	public static void main(String[] args) throws CatalogException, DatabaseNotExistException {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		StreamTableEnvironment tenv = StreamTableEnvironment.create(env);

		String name = "alan_hive";
		// testhive 數(shù)據(jù)庫名稱
		String defaultDatabase = "testhive";
		String hiveConfDir = "/usr/local/bigdata/apache-hive-3.1.2-bin/conf";

		HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir);
		tenv.registerCatalog("alan_hive", hiveCatalog);
		// 使用注冊的catalog
		tenv.useCatalog("alan_hive");

		List<String> tables = hiveCatalog.listTables(defaultDatabase); 
		for (String table : tables) {
			System.out.println("Database:testhive  tables:" + table);
		}
	}

}
  • sql
CREATE CATALOG myhive WITH (
    'type' = 'hive',
    'default-database' = 'mydatabase',
    'hive-conf-dir' = '/opt/hive-conf'
);
-- set the HiveCatalog as the current catalog of the session
USE CATALOG myhive;

------------------具體示例如下----------------------------
Flink SQL> show catalogs;
+-----------------+
|    catalog name |
+-----------------+
| default_catalog |
+-----------------+
1 row in set

Flink SQL> CREATE CATALOG alan_hivecatalog WITH (
>     'type' = 'hive',
>     'default-database' = 'testhive',
>     'hive-conf-dir' = '/usr/local/bigdata/apache-hive-3.1.2-bin/conf'
> );
[INFO] Execute statement succeed.

Flink SQL> show catalogs;
+------------------+
|     catalog name |
+------------------+
| alan_hivecatalog |
|  default_catalog |
+------------------+
2 rows in set

Flink SQL> use alan_hivecatalog;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.catalog.exceptions.CatalogException: A database with name [alan_hivecatalog] does not exist in the catalog: [default_catalog].

Flink SQL> use catalog alan_hivecatalog;
[INFO] Execute statement succeed.

Flink SQL> show tables;
+-----------------------------------+
|                        table name |
+-----------------------------------+
| alan_hivecatalog_hivedb_testtable |
|                         apachelog |
|                          col2row1 |
|                          col2row2 |
|                       cookie_info |
|                              dual |
|                         dw_zipper |
|                               emp |
|                          employee |
|                  employee_address |
|               employee_connection |
|                 ods_zipper_update |
|                          row2col1 |
|                          row2col2 |
|                            singer |
|                           singer2 |
|                           student |
|                      student_dept |
|               student_from_insert |
|                      student_hdfs |
|                    student_hdfs_p |
|                      student_info |
|                     student_local |
|                 student_partition |
|              t_all_hero_part_msck |
|                     t_usa_covid19 |
|                   t_usa_covid19_p |
|                              tab1 |
|                         tb_dept01 |
|                    tb_dept_bucket |
|                            tb_emp |
|                          tb_emp01 |
|                     tb_emp_bucket |
|                     tb_json_test1 |
|                     tb_json_test2 |
|                          tb_login |
|                      tb_login_tmp |
|                          tb_money |
|                      tb_money_mtn |
|                            tb_url |
|              the_nba_championship |
|                             tmp_1 |
|                        tmp_zipper |
|                         user_dept |
|                     user_dept_sex |
|                             users |
|                 users_bucket_sort |
|                   website_pv_info |
|                  website_url_info |
+-----------------------------------+
49 rows in set

  • ymal
execution:
    ...
    current-catalog: alan_hivecatalog  # set the HiveCatalog as the current catalog of the session
    current-database: testhive
    
catalogs:
   - name: alan_hivecatalog  
     type: hive
     hive-conf-dir: /usr/local/bigdata/apache-hive-3.1.2-bin/conf

下表列出了通過 YAML 文件或 DDL 定義 HiveCatalog 時所支持的參數(shù)。

16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及Apache Hive示例(6),# Flink專欄,flink,sql,apache,flink 流批一體化,flink sql,flink hive,flink hadoop

5、DDL&DML

在 Flink 中執(zhí)行 DDL 操作 Hive 的表、視圖、分區(qū)、函數(shù)等元數(shù)據(jù)時,參考:33、Flink之hive
Flink 支持 DML 寫入 Hive 表,請參考:33、Flink之hive
以上,介紹了Apache Hive連接器的使用,以具體的示例演示了通過java和flink sql cli創(chuàng)建catalog。文章來源地址http://www.zghlxwxcb.cn/news/detail-681235.html

到了這里,關(guān)于16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及Apache Hive示例(6)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • 【flink sql】kafka連接器

    Kafka 連接器提供從 Kafka topic 中消費和寫入數(shù)據(jù)的能力。 前面已經(jīng)介紹了flink sql創(chuàng)建表的語法及說明:【flink sql】創(chuàng)建表 這篇博客聊聊怎么通過flink sql連接kafka 以下的連接器元數(shù)據(jù)可以在表定義中通過元數(shù)據(jù)列的形式獲取。 R/W 列定義了一個元數(shù)據(jù)是可讀的(R)還是可寫的(

    2024年02月08日
    瀏覽(22)
  • Flink系列之:Elasticsearch SQL 連接器

    Sink: Batch Sink: Streaming Append Upsert Mode Elasticsearch 連接器允許將數(shù)據(jù)寫入到 Elasticsearch 引擎的索引中。本文檔描述運行 SQL 查詢時如何設(shè)置 Elasticsearch 連接器。 連接器可以工作在 upsert 模式,使用 DDL 中定義的主鍵與外部系統(tǒng)交換 UPDATE/DELETE 消息。 如果 DDL 中沒有定義主鍵,那么

    2024年02月04日
    瀏覽(22)
  • Flink系列之:JDBC SQL 連接器

    Scan Source: Bounded Lookup Source: Sync Mode Sink: Batch Sink: Streaming Append Upsert Mode JDBC 連接器允許使用 JDBC 驅(qū)動向任意類型的關(guān)系型數(shù)據(jù)庫讀取或者寫入數(shù)據(jù)。本文檔描述了針對關(guān)系型數(shù)據(jù)庫如何通過建立 JDBC 連接器來執(zhí)行 SQL 查詢。 如果在 DDL 中定義了主鍵,JDBC sink 將以 upsert 模式與外

    2024年02月02日
    瀏覽(24)
  • Flink系列之:Apache Kafka SQL 連接器

    Scan Source: Unbounded Sink: Streaming Append Mode Kafka 連接器提供從 Kafka topic 中消費和寫入數(shù)據(jù)的能力。 以下示例展示了如何創(chuàng)建 Kafka 表: 以下的連接器元數(shù)據(jù)可以在表定義中通過元數(shù)據(jù)列的形式獲取。 R/W 列定義了一個元數(shù)據(jù)是可讀的(R)還是可寫的(W)。 只讀列必須聲明為 VI

    2024年02月01日
    瀏覽(29)
  • Flink系列之:Upsert Kafka SQL 連接器

    Scan Source: Unbounded 、 Sink: Streaming Upsert Mode Upsert Kafka 連接器支持以 upsert 方式從 Kafka topic 中讀取數(shù)據(jù)并將數(shù)據(jù)寫入 Kafka topic。 作為 source,upsert-kafka 連接器生產(chǎn) changelog 流,其中每條數(shù)據(jù)記錄代表一個更新或刪除事件。更準(zhǔn)確地說,數(shù)據(jù)記錄中的 value 被解釋為同一 key 的最后一

    2024年01月16日
    瀏覽(26)
  • 【Flink實戰(zhàn)】Flink hint更靈活、更細(xì)粒度的設(shè)置Flink sql行為與簡化hive連接器參數(shù)設(shè)置

    SQL 提示(SQL Hints)是和 SQL 語句一起使用來改變執(zhí)行計劃的。本章介紹如何使用 SQL 提示來實現(xiàn)各種干預(yù)。 SQL 提示一般可以用于以下: 增強 planner:沒有完美的 planner, SQL 提示讓用戶更好地控制執(zhí)行; 增加元數(shù)據(jù)(或者統(tǒng)計信息):如\\\"已掃描的表索引\\\"和\\\"一些混洗鍵(shu

    2024年04月25日
    瀏覽(25)
  • Flink 讀寫MySQL數(shù)據(jù)(DataStream和Table API)

    Flink 讀寫MySQL數(shù)據(jù)(DataStream和Table API)

    Flink提供了基于JDBC的方式,可以將讀取到的數(shù)據(jù)寫入到MySQL中;本文通過兩種方式將數(shù)據(jù)下入到MySQL數(shù)據(jù)庫,其他的基于JDBC的數(shù)據(jù)庫類似,另外,Table API方式的Catalog指定為Hive Catalog方式,持久化DDL操作。 另外,JDBC 連接器允許使用 JDBC 驅(qū)動程序從任何關(guān)系數(shù)據(jù)庫讀取數(shù)據(jù)并將

    2023年04月09日
    瀏覽(33)
  • flink-sql讀寫hive-1.16

    本文檔內(nèi)容基于 flink-1.16.x ,其他版本的整理,請查看本人博客的 flink 專欄其他文章。 Apache Hive 已經(jīng)成為了數(shù)據(jù)倉庫生態(tài)系統(tǒng)中的核心。它不僅僅是一個用于大數(shù)據(jù)分析和ETL場景的SQL引擎,同樣也是一個數(shù)據(jù)管理平臺,可用于發(fā)現(xiàn),定義,和演化數(shù)據(jù)。 Flink 與 Hive 的集成包

    2024年02月16日
    瀏覽(33)
  • 【flink番外篇】9、Flink Table API 支持的操作示例(1)-通過Table API和SQL創(chuàng)建表

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識點,并輔以具體的示例進行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年01月17日
    瀏覽(24)
  • Flink Table API 與 SQL 編程整理

    Flink Table API 與 SQL 編程整理

    Flink API 總共分為 4 層這里主要整理 Table API 的使用 Table API 是流處理和批處理通用的關(guān)系型 API , Table API 可以基于流輸入或者批輸入來運行而不需要進行任何修改。 Table API 是 SQL 語言的超集并專門為 Apache Flink 設(shè)計的, Table API 是 Scala 和 Java 語言集成式的 API 。與常規(guī) SQL 語言

    2024年02月04日
    瀏覽(26)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包