Flink 系列文章
一、Flink 專欄
Flink 專欄系統(tǒng)介紹某一知識點,并輔以具體的示例進行說明。
-
1、Flink 部署系列
本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 -
2、Flink基礎(chǔ)系列
本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 -
3、Flik Table API和SQL基礎(chǔ)系列
本部分介紹Flink Table Api和SQL的基本用法,比如Table API和SQL創(chuàng)建庫、表用法、查詢、窗口函數(shù)、catalog等等內(nèi)容。 -
4、Flik Table API和SQL提高與應(yīng)用系列
本部分是table api 和sql的應(yīng)用部分,和實際的生產(chǎn)應(yīng)用聯(lián)系更為密切,以及有一定開發(fā)難度的內(nèi)容。 -
5、Flink 監(jiān)控系列
本部分和實際的運維、監(jiān)控工作相關(guān)。
二、Flink 示例專欄
Flink 示例專欄是 Flink 專欄的輔助說明,一般不會介紹知識點的信息,更多的是提供一個一個可以具體使用的示例。本專欄不再分目錄,通過鏈接即可看出介紹的內(nèi)容。
兩專欄的所有文章入口點擊:Flink 系列文章匯總索引
本文介紹了Apache Hive連接器的使用,以具體的示例演示了通過java和flink sql cli創(chuàng)建catalog。
本文依賴環(huán)境是hadoop、zookeeper、hive、flink環(huán)境好用,本文內(nèi)容以flink1.17版本進行介紹的,具體示例是在1.13版本中運行的(因為hadoop集群環(huán)境是基于jdk8的,flink1.17版本需要jdk11)。
更多的內(nèi)容詳見后續(xù)關(guān)于hive的介紹。
一、Table & SQL Connectors 示例: Apache Hive
Apache Hive 已經(jīng)成為了數(shù)據(jù)倉庫生態(tài)系統(tǒng)中的核心。 它不僅僅是一個用于大數(shù)據(jù)分析和ETL場景的SQL引擎,同樣它也是一個數(shù)據(jù)管理平臺,可用于發(fā)現(xiàn),定義,和演化數(shù)據(jù)。
Flink 與 Hive 的集成包含兩個層面。
一是利用了 Hive 的 MetaStore 作為持久化的 Catalog,用戶可通過HiveCatalog將不同會話中的 Flink 元數(shù)據(jù)存儲到 Hive Metastore 中。 例如,用戶可以使用HiveCatalog將其 Kafka 表或 Elasticsearch 表存儲在 Hive Metastore 中,并后續(xù)在 SQL 查詢中重新使用它們。
二是利用 Flink 來讀寫 Hive 的表。
HiveCatalog的設(shè)計提供了與 Hive 良好的兼容性,用戶可以"開箱即用"的訪問其已有的 Hive 數(shù)倉。 您不需要修改現(xiàn)有的 Hive Metastore,也不需要更改表的數(shù)據(jù)位置或分區(qū)。
1、支持的Hive版本
Flink 支持以下的 Hive 版本。
- 2.3
2.3.0
2.3.1
2.3.2
2.3.3
2.3.4
2.3.5
2.3.6
2.3.7
2.3.8
2.3.9 - 3.1
3.1.0
3.1.1
3.1.2
3.1.3
某些功能是否可用取決于您使用的 Hive 版本,這些限制不是由 Flink 所引起的:
- Hive 內(nèi)置函數(shù)在使用 Hive-2.3.0 及更高版本時支持。
- 列約束,也就是 PRIMARY KEY 和 NOT NULL,在使用 Hive-3.1.0 及更高版本時支持。
- 更改表的統(tǒng)計信息,在使用 Hive-2.3.0 及更高版本時支持。
- DATE列統(tǒng)計信息,在使用 Hive-2.3.0 及更高版時支持。
2、依賴項
要與 Hive 集成,您需要在 Flink 下的/lib/目錄中添加一些額外的依賴包, 以便通過 Table API 或 SQL Client 與 Hive 進行交互。 或者,您可以將這些依賴項放在專用文件夾中,并分別使用 Table API 程序或 SQL Client 的-C或-l選項將它們添加到 classpath 中。
Apache Hive 是基于 Hadoop 之上構(gòu)建的, 首先您需要 Hadoop 的依賴,請參考 Providing Hadoop classes:
export HADOOP_CLASSPATH=`hadoop classpath`
有兩種添加 Hive 依賴項的方法。第一種是使用 Flink 提供的 Hive Jar包。您可以根據(jù)使用的 Metastore 的版本來選擇對應(yīng)的 Hive jar。第二個方式是分別添加每個所需的 jar 包。如果您使用的 Hive 版本尚未在此處列出,則第二種方法會更適合。
注意:建議您優(yōu)先使用 Flink 提供的 Hive jar 包。僅在 Flink 提供的 Hive jar 不滿足您的需求時,再考慮使用分開添加 jar 包的方式。
1)、使用 Flink 提供的 Hive jar
下表列出了所有可用的 Hive jar。您可以選擇一個并放在 Flink 發(fā)行版的/lib/ 目錄中。
2)、用戶定義的依賴項
您可以在下方找到不同Hive主版本所需要的依賴項。
- Hive 2.3.4
/flink-1.17.1
/lib
// Flink's Hive connector.Contains flink-hadoop-compatibility and flink-orc jars
flink-connector-hive_2.12-1.17.1.jar
// Hive dependencies
hive-exec-2.3.4.jar
// add antlr-runtime if you need to use hive dialect
antlr-runtime-3.5.2.jar
- Hive 3.1.0
/flink-1.17.1
/lib
// Flink's Hive connector
flink-connector-hive_2.12-1.17.1.jar
// Hive dependencies
hive-exec-3.1.0.jar
libfb303-0.9.3.jar // libfb303 is not packed into hive-exec in some versions, need to add it separately
// add antlr-runtime if you need to use hive dialect
antlr-runtime-3.5.2.jar
3)、移動 planner jar 包
把 FLINK_HOME/opt 下的 jar 包 flink-table-planner_2.12-1.17.1.jar 移動到 FLINK_HOME/lib 下,并且將 FLINK_HOME/lib 下的 jar 包 flink-table-planner-loader-1.17.1.jar 移出去。 具體原因請參見 FLINK-25128。你可以使用如下命令來完成移動 planner jar 包的工作:
mv $FLINK_HOME/opt/flink-table-planner_2.12-1.17.1.jar $FLINK_HOME/lib/flink-table-planner_2.12-1.17.1.jar
mv $FLINK_HOME/lib/flink-table-planner-loader-1.17.1.jar $FLINK_HOME/opt/flink-table-planner-loader-1.17.1.jar
只有當(dāng)要使用 Hive 語法 或者 HiveServer2 endpoint, 你才需要做上述的 jar 包移動。 但是在集成 Hive 的時候,推薦進行上述的操作。
3、Maven 依賴
如果您在構(gòu)建自己的應(yīng)用程序,則需要在 mvn 文件中添加以下依賴項。 您應(yīng)該在運行時添加以上的這些依賴項,而不要在已生成的 jar 文件中去包含它們。
<!-- Flink Dependency -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-hive_2.12</artifactId>
<version>1.17.1</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.12</artifactId>
<version>1.17.1</version>
<scope>provided</scope>
</dependency>
<!-- Hive Dependency -->
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
<scope>provided</scope>
</dependency>
4、連接到Hive
通過 TableEnvironment 或者 YAML 配置,使用 Catalog 接口 和 HiveCatalog連接到現(xiàn)有的 Hive 集群。
以下是如何連接到 Hive 的示例:
- java
EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
TableEnvironment tableEnv = TableEnvironment.create(settings);
String name = "myhive";
String defaultDatabase = "mydatabase";
String hiveConfDir = "/opt/hive-conf";
HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir);
tableEnv.registerCatalog("myhive", hive);
// set the HiveCatalog as the current catalog of the session
tableEnv.useCatalog("myhive");
----------------------示例----------------------------
import java.util.List;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.hive.HiveCatalog;
/**
* @author alanchan
*
*/
public class TestHiveCatalogDemo {
/**
* @param args
* @throws DatabaseNotExistException
* @throws CatalogException
*/
public static void main(String[] args) throws CatalogException, DatabaseNotExistException {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
String name = "alan_hive";
// testhive 數(shù)據(jù)庫名稱
String defaultDatabase = "testhive";
String hiveConfDir = "/usr/local/bigdata/apache-hive-3.1.2-bin/conf";
HiveCatalog hiveCatalog = new HiveCatalog(name, defaultDatabase, hiveConfDir);
tenv.registerCatalog("alan_hive", hiveCatalog);
// 使用注冊的catalog
tenv.useCatalog("alan_hive");
List<String> tables = hiveCatalog.listTables(defaultDatabase);
for (String table : tables) {
System.out.println("Database:testhive tables:" + table);
}
}
}
- sql
CREATE CATALOG myhive WITH (
'type' = 'hive',
'default-database' = 'mydatabase',
'hive-conf-dir' = '/opt/hive-conf'
);
-- set the HiveCatalog as the current catalog of the session
USE CATALOG myhive;
------------------具體示例如下----------------------------
Flink SQL> show catalogs;
+-----------------+
| catalog name |
+-----------------+
| default_catalog |
+-----------------+
1 row in set
Flink SQL> CREATE CATALOG alan_hivecatalog WITH (
> 'type' = 'hive',
> 'default-database' = 'testhive',
> 'hive-conf-dir' = '/usr/local/bigdata/apache-hive-3.1.2-bin/conf'
> );
[INFO] Execute statement succeed.
Flink SQL> show catalogs;
+------------------+
| catalog name |
+------------------+
| alan_hivecatalog |
| default_catalog |
+------------------+
2 rows in set
Flink SQL> use alan_hivecatalog;
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.catalog.exceptions.CatalogException: A database with name [alan_hivecatalog] does not exist in the catalog: [default_catalog].
Flink SQL> use catalog alan_hivecatalog;
[INFO] Execute statement succeed.
Flink SQL> show tables;
+-----------------------------------+
| table name |
+-----------------------------------+
| alan_hivecatalog_hivedb_testtable |
| apachelog |
| col2row1 |
| col2row2 |
| cookie_info |
| dual |
| dw_zipper |
| emp |
| employee |
| employee_address |
| employee_connection |
| ods_zipper_update |
| row2col1 |
| row2col2 |
| singer |
| singer2 |
| student |
| student_dept |
| student_from_insert |
| student_hdfs |
| student_hdfs_p |
| student_info |
| student_local |
| student_partition |
| t_all_hero_part_msck |
| t_usa_covid19 |
| t_usa_covid19_p |
| tab1 |
| tb_dept01 |
| tb_dept_bucket |
| tb_emp |
| tb_emp01 |
| tb_emp_bucket |
| tb_json_test1 |
| tb_json_test2 |
| tb_login |
| tb_login_tmp |
| tb_money |
| tb_money_mtn |
| tb_url |
| the_nba_championship |
| tmp_1 |
| tmp_zipper |
| user_dept |
| user_dept_sex |
| users |
| users_bucket_sort |
| website_pv_info |
| website_url_info |
+-----------------------------------+
49 rows in set
- ymal
execution:
...
current-catalog: alan_hivecatalog # set the HiveCatalog as the current catalog of the session
current-database: testhive
catalogs:
- name: alan_hivecatalog
type: hive
hive-conf-dir: /usr/local/bigdata/apache-hive-3.1.2-bin/conf
下表列出了通過 YAML 文件或 DDL 定義 HiveCatalog 時所支持的參數(shù)。
文章來源:http://www.zghlxwxcb.cn/news/detail-681235.html
5、DDL&DML
在 Flink 中執(zhí)行 DDL 操作 Hive 的表、視圖、分區(qū)、函數(shù)等元數(shù)據(jù)時,參考:33、Flink之hive
Flink 支持 DML 寫入 Hive 表,請參考:33、Flink之hive
以上,介紹了Apache Hive連接器的使用,以具體的示例演示了通過java和flink sql cli創(chuàng)建catalog。文章來源地址http://www.zghlxwxcb.cn/news/detail-681235.html
到了這里,關(guān)于16、Flink 的table api與sql之連接外部系統(tǒng): 讀寫外部系統(tǒng)的連接器和格式以及Apache Hive示例(6)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!