該需求為實時接收對手Topic,并進行消費落盤至Hive。
在具體的實施中,基于華為MRS 3.2.0安全模式帶kerberos認證的Kafka2.4、Flink1.15、Hadoop3.3.1、Hive3.1,調(diào)度平臺為開源dolphinscheduler。
本需求的完成全部參考華為官方MRS3.2.0開發(fā)文檔,相關章節(jié)是普通版的安全模式。
華為官方文檔:https://support.huaweicloud.com/cmpntguide-mrs/mrs_01_1031.html
1 Kafka
1.1 Kerberos安全模式的認證與環(huán)境準備
著手開發(fā)前,需要將FushionInsight租戶加入kafkaadmin組,保證有創(chuàng)建主題和消費主題的權限,在得到此權限時,切勿對集群中的主題進行危險操作。
保證租戶權限后,開始準備開發(fā)環(huán)境。該步驟需要安裝Idea客戶端在windows本地,同時安裝兼容的maven版本,華為MRS需要安裝至少OpenJDK 1.8.0_332的版本。
運行環(huán)境的配置則需要在FushionInsight的web管理界面下載kafka的完整客戶端,包括config配置文件也需要下載。另外windows本地的hosts文件中要和FushionInsight中的集群地址有映射,可手動添加,同時應保證本地和集群能ping通。
參考文檔:https://support.huaweicloud.com/devg3-mrs/mrs_07_130006.html
1.2 創(chuàng)建一個測試主題
在Linux環(huán)境中執(zhí)行:
bin/kafka-topics.sh --create --bootstrap-server <Kafka集群IP:21007> --command-config config/client.properties --partitions 1 --replication-factor 1 --topic testTopic
創(chuàng)建一個測試testTopic,創(chuàng)建成功后,FushionInsight的web界面會報topic只有一個分區(qū)副本的警告,請忽略它。
同時也可以開啟兩個新的終端窗口用于測試生產(chǎn)者和消費者:
bin/kafka-console-producer.sh --broker-list <Kafka集群IP:21007> --topic <Topic名稱> --producer.config config/producer.properties
bin/kafka-console-consumer.sh --topic <Topic名稱> --bootstrap-server <Kafka集群IP:21007> --consumer.config config/consumer.properties
參考文檔:https://support.huaweicloud.com/devg3-mrs/mrs_07_130031.html
1.3 消費主題的接收測試
通過以下網(wǎng)站下載華為MRS所需的樣例代碼:
https://github.com/huaweicloud/huaweicloud-mrs-example/tree/mrs-3.2.0
下載樣例代碼之后需要在華為鏡像站下載代碼所需依賴,華為MRS所需的組件依賴不同于apache的開源版本,需要單獨配置maven的setting文件華為中央倉庫進行下載,在開發(fā)時,組件相關的依賴都需要用下載華為的。
鏡像地址:
https://repo.huaweicloud.com/repository/maven/huaweicloudsdk/org/apache/
華為開源鏡像站:
https://mirrors.huaweicloud.com/home
完成依賴和樣例代碼項目創(chuàng)建即可開發(fā),在開發(fā)程序時,需要將用于安全認證的keytab文件即“user.keytab”和“krb5.conf”文件以及config所有配置文件放置在樣例工程的“kafka-examples\src\main\resources”目錄下。
在開發(fā)時,這些安全認證只需要生成一個jaas文件并設置相關環(huán)境變量即可。華為提供了LoginUtil相關接口來完成這些配置,樣例代碼中只需要配置用戶自己租戶名稱和對應的keytab文件名稱即可。
創(chuàng)建生產(chǎn)測試時,首先需要修改KafkaProperties
類中的生產(chǎn)主題名,接下來在com.huawei.bigdata.kafka.example.Producer
類中修改租戶賬號,keytab位置即可,運行成功后,會向主題推送100條測試數(shù)據(jù),此時,我們在1.2
小節(jié)中開啟的消費者窗口就能接受到生產(chǎn)的數(shù)據(jù)。
在具體的測試中,需要控制消息發(fā)送的間隔和消息次數(shù),方便后續(xù)開發(fā)Flink。一般來說,每秒發(fā)送一條,一直發(fā)送即可。
至此,Kafka的主題消費測試完成,接下來需要用Flink將主題落盤到HDFS。
如果運行代碼時報和clock相關的錯誤,是因為本地時間和FushionInsight集群時間不一致所致,請將本地時間和服務器時間差控制在5分鐘內(nèi)。
參考文檔:
https://support.huaweicloud.com/devg3-mrs/mrs_07_130012.html
2 Flink
1.1 Kerberos安全模式的認證與環(huán)境準備
用戶在提交Flink應用程序時,需要與Yarn、HDFS等之間進行通信。那么提交Flink的應用程序中需要設置安全認證,確保Flink程序能夠正常運行。
圖為Flink在華為MRS安全模式的認證體系。
對于Kafka的權限在章節(jié)1.1已經(jīng)獲取,另外要保證有yarn資源的使用權限,還需要對HDFS的/flink
、/flink-checkpoint
目錄獲取權限,保證讀
,寫
,執(zhí)行
。有了相關權限之后,再下載kerberos認證憑據(jù)文件,keytab和conf。準備運行環(huán)境同Kafka類似,需要對Flink客戶端進行配置,注意config文件應該在權限修改之后獲取。
Flink整個系統(tǒng)存在三種認證方式,使用kerberos認證、使用security cookie進行認證、使用YARN內(nèi)部的認證機制。在進行安全認證時,可以用flink自帶的wordcount樣例程序進行提交測試,根據(jù)提交結果反饋再進行適配,直到提交成功。如果報auth相關的錯誤,可能還是權限問題,可以嘗試先將租戶權限給到最大,謹慎操作,先保證代碼能通。
參考文檔:
https://support.huaweicloud.com/devg3-mrs/mrs_07_050010.html
1.2 Flink任務的開發(fā)
最終在yarn隊列運行的flink程序是從本地idea打包,通過flink run提交的。前面安全模式已經(jīng)打通,在開發(fā)時仍然是使用華為官方的flink樣例代碼進行修改調(diào)試。
在具體的flink程序開發(fā)中,由于是開啟了kerberos認證的安全模式,需要加入判斷安全模式登錄的代碼段在main方法,以下代碼來自華為官方樣例:
if (LoginUtil.isSecurityModel()) {
try {
LOG.info("Securitymode start.");
//!!注意,安全認證時,需要用戶手動修改為自己申請的機機賬號
LoginUtil.securityPrepare(USER_PRINCIPAL, USER_KEYTAB_FILE);
} catch (IOException e) {
LOG.error("Security prepare failure.");
LOG.error("The IOException occured : {}.", e);
return;
}
LOG.info("Security prepare success.");
}
對于具體需求的開發(fā)參照開源Flink的apache官方文檔即可,只需要保證依賴是華為官方鏡像站的。
在該需求中,是將消費的數(shù)據(jù)落盤到HDFS中。開發(fā)中要用到FlinkKafkaConsumer
方法創(chuàng)建kafka消費者,拿到流數(shù)據(jù)。該方法在Flink1.17版本被棄用,但是Flink1.15仍然可以用,具體開發(fā)方法可參考Flink1.13的官方文檔Apache Kafka 連接器。
FlinkKafkaConsumer方法參考文檔:
https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/datastream/kafka/
接收的Kafka數(shù)據(jù),我們不需要處理,測試時直接測試主題的數(shù)據(jù)寫入HDFS即可,需要用StreamingFileSink
方法。該方法可以設置按照日期分桶,我們設置.withBucketAssigner
為每天一個桶,保證每天消費的數(shù)據(jù)在一個文件中,同時用該方法傳入日期格式參數(shù)yyyy-MM-dd
,這樣便于使用shell調(diào)度每日增量數(shù)據(jù)時日期變量的傳遞。
FileSink方法參考文檔:
https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/datastream/file_sink/
另外,關于Sink到HDFS的數(shù)據(jù)文件(part file) 生命周期有幾種狀態(tài),其中當文件名為in-progress表示當前文件正在寫入中,此時的文件是不能被Hive讀到的,我們需要將該文件的狀態(tài)通過checkpoint機制轉(zhuǎn)變?yōu)镕inished。需要配置env.enableCheckpointing(60000)
開啟checkpoint,該參數(shù)是60秒開啟一次。
完成代碼開發(fā)后無法在本地測試,只能通過maven打包到華為服務器,通過flink run
提交到y(tǒng)arn,此時可以指定并行度及其他配置。
通過以上方法即可實現(xiàn)將我們測試主題中的數(shù)據(jù)存儲在按照每天一個yyyy-mm-dd
命名的文件夾中。
3 HDFS與Hive
HDFS與Hive的交互也可以使用FlinkSQL,但是考慮到未來對數(shù)據(jù)的加工過濾,在此需求中選擇將數(shù)據(jù)落盤HDFS再通過Shell命令調(diào)度至Hive。
3.1 Shell腳本的編寫思路
-
source華為的環(huán)境,認證狀態(tài)成功;
-
創(chuàng)建日期變量:
c_date=$(date '+%Y-%m-%d')
; -
在beeline -u中執(zhí)行HiveSQL代碼:
-
使用beeline的變量函數(shù)
--hivevar
將在外部注冊的c_date
變量注冊為hive beeline的變量; -
創(chuàng)建臨時外部表,映射字段一行數(shù)據(jù),建表語句中指定位置為Flink寫入的當日日期變量的HDFS數(shù)據(jù)文件夾;
-
將臨時表中的數(shù)據(jù)解析,一般是json數(shù)據(jù),可通過
get_json_object
函數(shù)解析為字段,insert into table
存入貼源層正式表; -
刪除臨時表;
-
-
有需要的話,也可以添加日志路徑,將執(zhí)行結果追加至日志。
3.2 腳本測試方法
該腳本的執(zhí)行原理是首先在刷新華為租戶環(huán)境,然后創(chuàng)建時間變量,并且是yyyy-mm-dd
格式,與flink寫入在HDFS中的每日增量文件夾名相同;
然后在beeline客戶端中注冊beeline的變量,將linux的時間變量傳入beeline;
解下來是建臨時表,將HDFS中的增量數(shù)據(jù)先寫入,再解析字段到下一層標準表,同時刪除臨時表,通過此方法即完成每天新增數(shù)據(jù)的導入。
需要注意的是,hive -e
命令似乎由于認證安全設置,無法在華為集群節(jié)點機使用。
4 DolphinScheduler
通過將腳本文件掛在DS調(diào)度中,每天在Flink完成消費落盤后,即可執(zhí)行該shell。DS的部署不在華為MRS集群,在客戶端節(jié)點中,使用開源版本即可,DS更方便查看每天的調(diào)度執(zhí)行日志。文章來源:http://www.zghlxwxcb.cn/news/detail-811516.html
需要注意的是,目前我的需求中每天的新增數(shù)據(jù)大約2000-10000條,可以在短時間內(nèi)完成調(diào)度執(zhí)行。文章來源地址http://www.zghlxwxcb.cn/news/detail-811516.html
到了這里,關于基于華為MRS3.2.0實時Flink消費Kafka落盤至HDFS的Hive外部表的調(diào)度方案的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!