国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

基于華為MRS3.2.0實時Flink消費Kafka落盤至HDFS的Hive外部表的調(diào)度方案

這篇具有很好參考價值的文章主要介紹了基于華為MRS3.2.0實時Flink消費Kafka落盤至HDFS的Hive外部表的調(diào)度方案。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。


該需求為實時接收對手Topic,并進行消費落盤至Hive。

在具體的實施中,基于華為MRS 3.2.0安全模式帶kerberos認證的Kafka2.4、Flink1.15、Hadoop3.3.1、Hive3.1,調(diào)度平臺為開源dolphinscheduler。
基于華為MRS3.2.0實時Flink消費Kafka落盤至HDFS的Hive外部表的調(diào)度方案,Hadoop生態(tài),Flink,華為,kafka,flink,fusioninsight,hdfs,hive

本需求的完成全部參考華為官方MRS3.2.0開發(fā)文檔,相關章節(jié)是普通版的安全模式。

華為官方文檔:https://support.huaweicloud.com/cmpntguide-mrs/mrs_01_1031.html

1 Kafka

1.1 Kerberos安全模式的認證與環(huán)境準備

著手開發(fā)前,需要將FushionInsight租戶加入kafkaadmin組,保證有創(chuàng)建主題和消費主題的權限,在得到此權限時,切勿對集群中的主題進行危險操作。

保證租戶權限后,開始準備開發(fā)環(huán)境。該步驟需要安裝Idea客戶端在windows本地,同時安裝兼容的maven版本,華為MRS需要安裝至少OpenJDK 1.8.0_332的版本。

運行環(huán)境的配置則需要在FushionInsight的web管理界面下載kafka的完整客戶端,包括config配置文件也需要下載。另外windows本地的hosts文件中要和FushionInsight中的集群地址有映射,可手動添加,同時應保證本地和集群能ping通。

參考文檔:https://support.huaweicloud.com/devg3-mrs/mrs_07_130006.html

1.2 創(chuàng)建一個測試主題

在Linux環(huán)境中執(zhí)行:

bin/kafka-topics.sh --create --bootstrap-server <Kafka集群IP:21007> --command-config config/client.properties --partitions 1 --replication-factor 1 --topic testTopic

創(chuàng)建一個測試testTopic,創(chuàng)建成功后,FushionInsight的web界面會報topic只有一個分區(qū)副本的警告,請忽略它。

同時也可以開啟兩個新的終端窗口用于測試生產(chǎn)者和消費者:

  1. bin/kafka-console-producer.sh --broker-list <Kafka集群IP:21007> --topic <Topic名稱> --producer.config config/producer.properties
  2. bin/kafka-console-consumer.sh --topic <Topic名稱> --bootstrap-server <Kafka集群IP:21007> --consumer.config config/consumer.properties

參考文檔:https://support.huaweicloud.com/devg3-mrs/mrs_07_130031.html

1.3 消費主題的接收測試

通過以下網(wǎng)站下載華為MRS所需的樣例代碼:

https://github.com/huaweicloud/huaweicloud-mrs-example/tree/mrs-3.2.0

下載樣例代碼之后需要在華為鏡像站下載代碼所需依賴,華為MRS所需的組件依賴不同于apache的開源版本,需要單獨配置maven的setting文件華為中央倉庫進行下載,在開發(fā)時,組件相關的依賴都需要用下載華為的。

鏡像地址:

https://repo.huaweicloud.com/repository/maven/huaweicloudsdk/org/apache/

華為開源鏡像站:

https://mirrors.huaweicloud.com/home

完成依賴和樣例代碼項目創(chuàng)建即可開發(fā),在開發(fā)程序時,需要將用于安全認證的keytab文件即“user.keytab”和“krb5.conf”文件以及config所有配置文件放置在樣例工程的“kafka-examples\src\main\resources”目錄下。

在開發(fā)時,這些安全認證只需要生成一個jaas文件并設置相關環(huán)境變量即可。華為提供了LoginUtil相關接口來完成這些配置,樣例代碼中只需要配置用戶自己租戶名稱和對應的keytab文件名稱即可。

創(chuàng)建生產(chǎn)測試時,首先需要修改KafkaProperties類中的生產(chǎn)主題名,接下來在com.huawei.bigdata.kafka.example.Producer類中修改租戶賬號,keytab位置即可,運行成功后,會向主題推送100條測試數(shù)據(jù),此時,我們在1.2小節(jié)中開啟的消費者窗口就能接受到生產(chǎn)的數(shù)據(jù)。

在具體的測試中,需要控制消息發(fā)送的間隔和消息次數(shù),方便后續(xù)開發(fā)Flink。一般來說,每秒發(fā)送一條,一直發(fā)送即可。

至此,Kafka的主題消費測試完成,接下來需要用Flink將主題落盤到HDFS。

如果運行代碼時報和clock相關的錯誤,是因為本地時間和FushionInsight集群時間不一致所致,請將本地時間和服務器時間差控制在5分鐘內(nèi)。

參考文檔:
https://support.huaweicloud.com/devg3-mrs/mrs_07_130012.html

2 Flink

1.1 Kerberos安全模式的認證與環(huán)境準備

用戶在提交Flink應用程序時,需要與Yarn、HDFS等之間進行通信。那么提交Flink的應用程序中需要設置安全認證,確保Flink程序能夠正常運行。
基于華為MRS3.2.0實時Flink消費Kafka落盤至HDFS的Hive外部表的調(diào)度方案,Hadoop生態(tài),Flink,華為,kafka,flink,fusioninsight,hdfs,hive
圖為Flink在華為MRS安全模式的認證體系。

對于Kafka的權限在章節(jié)1.1已經(jīng)獲取,另外要保證有yarn資源的使用權限,還需要對HDFS的/flink、/flink-checkpoint目錄獲取權限,保證,,執(zhí)行。有了相關權限之后,再下載kerberos認證憑據(jù)文件,keytab和conf。準備運行環(huán)境同Kafka類似,需要對Flink客戶端進行配置,注意config文件應該在權限修改之后獲取。

Flink整個系統(tǒng)存在三種認證方式,使用kerberos認證、使用security cookie進行認證、使用YARN內(nèi)部的認證機制。在進行安全認證時,可以用flink自帶的wordcount樣例程序進行提交測試,根據(jù)提交結果反饋再進行適配,直到提交成功。如果報auth相關的錯誤,可能還是權限問題,可以嘗試先將租戶權限給到最大,謹慎操作,先保證代碼能通。

參考文檔:
https://support.huaweicloud.com/devg3-mrs/mrs_07_050010.html

1.2 Flink任務的開發(fā)

最終在yarn隊列運行的flink程序是從本地idea打包,通過flink run提交的。前面安全模式已經(jīng)打通,在開發(fā)時仍然是使用華為官方的flink樣例代碼進行修改調(diào)試。

在具體的flink程序開發(fā)中,由于是開啟了kerberos認證的安全模式,需要加入判斷安全模式登錄的代碼段在main方法,以下代碼來自華為官方樣例:

 if (LoginUtil.isSecurityModel()) {
            try {
                LOG.info("Securitymode start.");
                //!!注意,安全認證時,需要用戶手動修改為自己申請的機機賬號
                LoginUtil.securityPrepare(USER_PRINCIPAL, USER_KEYTAB_FILE);
            } catch (IOException e) {
                LOG.error("Security prepare failure.");
                LOG.error("The IOException occured : {}.", e);
                return;
            }
            LOG.info("Security prepare success.");
        }

對于具體需求的開發(fā)參照開源Flink的apache官方文檔即可,只需要保證依賴是華為官方鏡像站的。

在該需求中,是將消費的數(shù)據(jù)落盤到HDFS中。開發(fā)中要用到FlinkKafkaConsumer方法創(chuàng)建kafka消費者,拿到流數(shù)據(jù)。該方法在Flink1.17版本被棄用,但是Flink1.15仍然可以用,具體開發(fā)方法可參考Flink1.13的官方文檔Apache Kafka 連接器。

FlinkKafkaConsumer方法參考文檔:
https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/datastream/kafka/

接收的Kafka數(shù)據(jù),我們不需要處理,測試時直接測試主題的數(shù)據(jù)寫入HDFS即可,需要用StreamingFileSink方法。該方法可以設置按照日期分桶,我們設置.withBucketAssigner為每天一個桶,保證每天消費的數(shù)據(jù)在一個文件中,同時用該方法傳入日期格式參數(shù)yyyy-MM-dd,這樣便于使用shell調(diào)度每日增量數(shù)據(jù)時日期變量的傳遞。

FileSink方法參考文檔:
https://nightlies.apache.org/flink/flink-docs-release-1.13/zh/docs/connectors/datastream/file_sink/

另外,關于Sink到HDFS的數(shù)據(jù)文件(part file) 生命周期有幾種狀態(tài),其中當文件名為in-progress表示當前文件正在寫入中,此時的文件是不能被Hive讀到的,我們需要將該文件的狀態(tài)通過checkpoint機制轉(zhuǎn)變?yōu)镕inished。需要配置env.enableCheckpointing(60000)開啟checkpoint,該參數(shù)是60秒開啟一次。

完成代碼開發(fā)后無法在本地測試,只能通過maven打包到華為服務器,通過flink run提交到y(tǒng)arn,此時可以指定并行度及其他配置。

通過以上方法即可實現(xiàn)將我們測試主題中的數(shù)據(jù)存儲在按照每天一個yyyy-mm-dd命名的文件夾中。

3 HDFS與Hive

HDFS與Hive的交互也可以使用FlinkSQL,但是考慮到未來對數(shù)據(jù)的加工過濾,在此需求中選擇將數(shù)據(jù)落盤HDFS再通過Shell命令調(diào)度至Hive。

3.1 Shell腳本的編寫思路

  1. source華為的環(huán)境,認證狀態(tài)成功;

  2. 創(chuàng)建日期變量:c_date=$(date '+%Y-%m-%d');

  3. 在beeline -u中執(zhí)行HiveSQL代碼:

    • 使用beeline的變量函數(shù)--hivevar將在外部注冊的c_date變量注冊為hive beeline的變量;

    • 創(chuàng)建臨時外部表,映射字段一行數(shù)據(jù),建表語句中指定位置為Flink寫入的當日日期變量的HDFS數(shù)據(jù)文件夾;

    • 將臨時表中的數(shù)據(jù)解析,一般是json數(shù)據(jù),可通過get_json_object函數(shù)解析為字段,insert into table存入貼源層正式表;

    • 刪除臨時表;

  4. 有需要的話,也可以添加日志路徑,將執(zhí)行結果追加至日志。

3.2 腳本測試方法

該腳本的執(zhí)行原理是首先在刷新華為租戶環(huán)境,然后創(chuàng)建時間變量,并且是yyyy-mm-dd格式,與flink寫入在HDFS中的每日增量文件夾名相同;

然后在beeline客戶端中注冊beeline的變量,將linux的時間變量傳入beeline;

解下來是建臨時表,將HDFS中的增量數(shù)據(jù)先寫入,再解析字段到下一層標準表,同時刪除臨時表,通過此方法即完成每天新增數(shù)據(jù)的導入。

需要注意的是,hive -e命令似乎由于認證安全設置,無法在華為集群節(jié)點機使用。

4 DolphinScheduler

通過將腳本文件掛在DS調(diào)度中,每天在Flink完成消費落盤后,即可執(zhí)行該shell。DS的部署不在華為MRS集群,在客戶端節(jié)點中,使用開源版本即可,DS更方便查看每天的調(diào)度執(zhí)行日志。

需要注意的是,目前我的需求中每天的新增數(shù)據(jù)大約2000-10000條,可以在短時間內(nèi)完成調(diào)度執(zhí)行。文章來源地址http://www.zghlxwxcb.cn/news/detail-811516.html

到了這里,關于基于華為MRS3.2.0實時Flink消費Kafka落盤至HDFS的Hive外部表的調(diào)度方案的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領支付寶紅包贊助服務器費用

相關文章

  • 基于Flink+kafka實時告警

    基于Flink+kafka實時告警

    項目使用告警系統(tǒng)的邏輯是將實時數(shù)據(jù)保存到本地數(shù)據(jù)庫再使用定時任務做判斷,然后產(chǎn)生告警數(shù)據(jù)。這種方式存在告警的延時實在是太高了。數(shù)據(jù)從產(chǎn)生到保存,從保存到判斷都會存在時間差,按照保存數(shù)據(jù)定時5分鐘一次,定時任務5分鐘一次。最高會產(chǎn)生10分鐘的誤差,這

    2024年02月16日
    瀏覽(24)
  • 在hadoop或docker環(huán)境下基于kafka和flink的實時計算大屏展示

    在hadoop或docker環(huán)境下基于kafka和flink的實時計算大屏展示

    第一章 總體需求 1.1.課題背景 某股票交易機構已上線一個在線交易平臺,平臺注冊用戶量近千萬,每日均 接受來自全國各地的分支機構用戶提交的交易請求。鑒于公司發(fā)展及平臺管理要 求,擬委托開發(fā)一個在線實時大數(shù)據(jù)系統(tǒng),可實時觀測股票交易大數(shù)據(jù)信息,展 示部分重

    2024年02月03日
    瀏覽(19)
  • 基于Flume+Kafka+Hbase+Flink+FineBI的實時綜合案例(二)數(shù)據(jù)源

    基于Flume+Kafka+Hbase+Flink+FineBI的實時綜合案例(二)數(shù)據(jù)源

    目標 : 了解數(shù)據(jù)源的格式及實現(xiàn)模擬數(shù)據(jù)的生成 路徑 step1:數(shù)據(jù)格式 step2:數(shù)據(jù)生成 實施 數(shù)據(jù)格式 消息時間 發(fā)件人昵稱 發(fā)件人賬號 發(fā)件人性別 發(fā)件人IP 發(fā)件人系統(tǒng) 發(fā)件人手機型號 發(fā)件人網(wǎng)絡制式 發(fā)件人GPS 收件人昵稱 收件人IP 收件人賬號 收件人系統(tǒng) 收件人手機型號

    2024年02月04日
    瀏覽(47)
  • 基于Flume+Kafka+Hbase+Flink+FineBI的實時綜合案例(五)FineBI可視化

    基于Flume+Kafka+Hbase+Flink+FineBI的實時綜合案例(五)FineBI可視化

    目標 : 實現(xiàn)FineBI訪問MySQL結果數(shù)據(jù)集的配置 實施 安裝FineBI 參考《FineBI Windows版本安裝手冊.docx》安裝FineBI 配置連接 數(shù)據(jù)準備 小結 實現(xiàn)FineBI訪問MySQL結果數(shù)據(jù)集的配置 目標 : 實現(xiàn)FineBI實時報表構建 路徑 step1:實時報表構建 step2:實時報表配置 step3:實時刷新測試 實施 實

    2024年02月04日
    瀏覽(41)
  • python 實時獲取kafka消費隊列信息

    安裝 pykafka

    2024年02月16日
    瀏覽(11)
  • flink正常消費kafka數(shù)據(jù),flink沒有做checkpoint,kafka位點沒有提交

    1、背景 flink消費kafka數(shù)據(jù),多并發(fā),實現(xiàn)雙流join 2、現(xiàn)象 (1)flink任務消費kafka數(shù)據(jù),其中數(shù)據(jù)正常消費,kafka顯示消息堆積,位點沒有提交,并且flink任務沒有做checkpoint (2)其中一個流的subtask顯示finished (3)無背壓 3、問題原因 (1)其中一個topic分區(qū)為1 (2)配置的并行

    2024年02月13日
    瀏覽(22)
  • 輕松通關Flink第24講:Flink 消費 Kafka 數(shù)據(jù)業(yè)務開發(fā)

    在上一課時中我們提過在實時計算的場景下,絕大多數(shù)的數(shù)據(jù)源都是消息系統(tǒng),而 Kafka 從眾多的消息中間件中脫穎而出,主要是因為 高吞吐 、 低延遲 的特點;同時也講了 Flink 作為生產(chǎn)者像 Kafka 寫入數(shù)據(jù)的方式和代碼實現(xiàn)。這一課時我們將從以下幾個方面介紹 Flink 消費

    2024年02月08日
    瀏覽(26)
  • Flink使用 KafkaSource消費 Kafka中的數(shù)據(jù)

    目前,很多 flink相關的書籍和網(wǎng)上的文章講解如何對接 kafka時都是使用的 FlinkKafkaConsumer,如下: 新版的 flink,比如 1.14.3已經(jīng)將 FlinkKafkaConsumer標記為 deprecated(不推薦),如下: 新版本的 flink應該使用 KafkaSource來消費 kafka中的數(shù)據(jù),詳細代碼如下: 開發(fā)者在工作中應該盡量避

    2024年02月15日
    瀏覽(21)
  • 關于flink重新提交任務,重復消費kafka的坑

    關于flink重新提交任務,重復消費kafka的坑

    按照以下方式設置backend目錄和checkpoint目錄,fsbackend目錄有數(shù)據(jù),checkpoint目錄沒數(shù)據(jù) 我以為checkpoint和fsbackend要同時設置,其實,1.14.3版本,setCheckpointStorage和stateBackend改成了分著設置 我上邊代碼這樣設置,相當于首先指定了以下checkpoint按照默認的backend存儲,然后又指定了按

    2024年02月03日
    瀏覽(23)
  • flink kafka消費者如何處理kafka主題的rebalance

    flink kafka消費者如何處理kafka主題的rebalance

    我們?nèi)粘J褂胟afka客戶端消費kafka主題的消息時,當消費者退出/加入消費者組,kafka主題分區(qū)數(shù)有變等事件發(fā)生時,都會導致rebalance的發(fā)生,此時一般情況下,如果我們不自己處理offset,我們不需要理會這個rebalance的,當rebalance完成后,每個消費者會從__consumer_offsets中獲取每個

    2024年02月14日
    瀏覽(27)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領取紅包,優(yōu)惠每天領

二維碼1

領取紅包

二維碼2

領紅包