国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Flume實現(xiàn)Kafka數(shù)據(jù)持久化存儲到HDFS

這篇具有很好參考價值的文章主要介紹了Flume實現(xiàn)Kafka數(shù)據(jù)持久化存儲到HDFS。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

寫在前面:博主是一只經過實戰(zhàn)開發(fā)歷練后投身培訓事業(yè)的“小山豬”,昵稱取自動畫片《獅子王》中的“彭彭”,總是以樂觀、積極的心態(tài)對待周邊的事物。本人的技術路線從Java全棧工程師一路奔向大數(shù)據(jù)開發(fā)、數(shù)據(jù)挖掘領域,如今終有小成,愿將昔日所獲與大家交流一二,希望對學習路上的你有所助益。同時,博主也想通過此次嘗試打造一個完善的技術圖書館,任何與文章技術點有關的異常、錯誤、注意事項均會在末尾列出,歡迎大家通過各種方式提供素材。

  • 對于文章中出現(xiàn)的任何錯誤請大家批評指出,一定及時修改。
  • 有任何想要討論和學習的問題可聯(lián)系我:zhuyc@vip.163.com。
  • 發(fā)布文章的風格因專欄而異,均自成體系,不足之處請大家指正。

Flume實現(xiàn)Kafka數(shù)據(jù)持久化存儲到HDFS

本文關鍵字:Flume、Kafka、HDFS、實時數(shù)據(jù)、存儲

一、場景描述

對于一些實時產生的數(shù)據(jù),除了做實時計算以外,一般還需要歸檔保存,用于離線數(shù)據(jù)分析。使用Flume的配置可以實現(xiàn)對數(shù)據(jù)的處理,并按一定的時間頻率存儲,本例中將從Kafka中按天存儲數(shù)據(jù)到HDFS的不同文件夾。

1. 數(shù)據(jù)輸入

本場景中數(shù)據(jù)來自Kafka中某個Topic訂閱,數(shù)據(jù)格式為json。

2. 數(shù)據(jù)管道

使用Flume作為數(shù)據(jù)處理管道,通過配置實現(xiàn)自定義存儲規(guī)則。

3. 數(shù)據(jù)輸出

最終數(shù)據(jù)將存儲在HDFS中,每一天的數(shù)據(jù)將對應一個單獨的文件夾。

二、組件介紹

1. Kafka

來自維基百科:Kafka是由Apache軟件基金會開發(fā)的一個開源流處理平臺,由Scala和Java編寫。該項目的目標是為處理實時數(shù)據(jù)提供一個統(tǒng)一、高吞吐、低延遲的平臺。其持久化層本質上是一個“按照分布式事務日志架構的大規(guī)模發(fā)布/訂閱消息隊列”,這使它作為企業(yè)級基礎設施來處理流式數(shù)據(jù)非常有價值。

如果需要參考安裝步驟可以點擊:Kafka 3.x的解壓安裝 - Linux

2. Hadoop

來自維基百科:Apache Hadoop是一款支持數(shù)據(jù)密集型分布式應用程序并以Apache 2.0許可協(xié)議發(fā)布的開源軟件框架,有助于使用許多計算機組成的網絡來解決數(shù)據(jù)、計算密集型的問題。基于MapReduce計算模型,它為大數(shù)據(jù)的分布式存儲與處理提供了一個軟件框架。所有的Hadoop模塊都有一個基本假設,即硬件故障是常見情況,應該由框架自動處理。

如果需要參考安裝步驟可以點擊:Hadoop 3.x各模式部署 - Ubuntu

3. Flume

來自維基百科:Apache Flume是一款分布式、可靠且可用的軟件,用于高效地收集、聚合和移動大量日志數(shù)據(jù)。它有一個基于流數(shù)據(jù)流的簡單而靈活的體系結構。它具有健壯性和容錯性,具有可調的可靠性機制以及許多故障切換和恢復機制。它使用了一個簡單的可擴展數(shù)據(jù)模型,允許在線分析應用程序。

Flume的運行只需要預先配置好JDK即可,安裝過程只需要解壓以及環(huán)境變量的配置。

三、前置準備

1. Flume下載

  • 官網地址:https://flume.apache.org/

Flume實現(xiàn)Kafka數(shù)據(jù)持久化存儲到HDFS

  • 點擊Download -> 選擇binary中的tar.gz

Flume實現(xiàn)Kafka數(shù)據(jù)持久化存儲到HDFS

  • 進入鏡像地址列表,右鍵復制下載鏈接

Flume實現(xiàn)Kafka數(shù)據(jù)持久化存儲到HDFS

  • 使用wget下載到Linux系統(tǒng)
wget https://dlcdn.apache.org/flume/1.11.0/apache-flume-1.11.0-bin.tar.gz

Flume實現(xiàn)Kafka數(shù)據(jù)持久化存儲到HDFS

2. Flume安裝

關于前置環(huán)境JDK的安裝可以參考:Hadoop 3.x各模式部署 - Ubuntu中前置環(huán)境的部分【點擊可直接跳轉到指定位置】。

  • Flume解壓縮
tar -zvxf apache-flume-1.11.0-bin.tar.gz
  • 環(huán)境變量配置
vi ~/.bashrc

export FLUME_HOME=/path/to/apache-flume-1.11.0-bin
export PATH=$PATH:$FLUME_HOME/bin

3. 數(shù)據(jù)源準備

可以在Kafka中創(chuàng)建一個新的Topic用于測試,具體步驟可以參考:Kafka 3.x的解壓安裝 - Linux中Console測試的部分【點擊可直接跳轉到指定位置】。

四、配置文件

在Flume中主要需要配置3個部分,source、channelsink。本例中source為kafka,sink為HDFS,channel同樣有多種選擇。

1. 以內存為channel

  • 優(yōu)缺點
    • 優(yōu)點:速度較快,不會占用額外硬盤空間
    • 缺點:只依賴Kafka的偏移量記錄,F(xiàn)lume自身不會存儲偏移量信息
  • 核心配置項
    • agent.sources.kafka-source.batchSize:每一批次處理的數(shù)據(jù)量,可以根據(jù)需要修改
    • agent.sources.kafka-source.kafka.bootstrap.servers:Kafka的訂閱地址,包含主機及端口號
    • agent.sources.kafka-source.kafka.topics:Kafka的Topic名稱
    • agent.sinks.hdfs-sink.hdfs.path:最終數(shù)據(jù)在HDFS的保存路徑,父級目錄需要手動創(chuàng)建
  • 在Flume的conf文件夾中新建配置文件kafka-memory-hdfs.conf
# Name the components on this agent
agent.sources = kafka-source
agent.channels = memory-channel
agent.sinks = hdfs-sink

# Describe/configure the source
agent.sources.kafka-source.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.kafka-source.batchSize = 1000
agent.sources.kafka-source.kafka.bootstrap.servers = localhost:9092
agent.sources.kafka-source.kafka.topics = my-topic
agent.sources.kafka-source.kafka.consumer.group.id = flume-memory-hdfs
agent.sources.kafka-source.kafka.consumer.auto.offset.reset = earliest

# Describe/configure the channel
agent.channels.memory-channel.type = memory
agent.channels.memory-channel.capacity = 10000
agent.channels.memory-channel.transactionCapacity = 1000

# Describe the sink
agent.sinks.hdfs-sink.type = hdfs
agent.sinks.hdfs-sink.hdfs.path = /flume_data/%Y-%m-%d
agent.sinks.hdfs-sink.hdfs.fileSuffix = .jsonl
agent.sinks.hdfs-sink.hdfs.rollInterval = 3600
agent.sinks.hdfs-sink.hdfs.rollSize = 0
agent.sinks.hdfs-sink.hdfs.rollCount = 1000
agent.sinks.hdfs-sink.transactionCapacity = 1000
agent.sinks.hdfs-sink.hdfs.fileType = DataStream
agent.sinks.hdfs-sink.hdfs.writeFormat = Text

# Bind the source and sink to the channel
agent.sources.kafka-source.channels = memory-channel
agent.sinks.hdfs-sink.channel = memory-channel

2. 以文件為channel

  • 優(yōu)缺點
    • 優(yōu)點:可以保證數(shù)據(jù)不丟失,將數(shù)據(jù)狀態(tài)保存在本地磁盤上
    • 缺點:會額外占用硬盤存儲空間,讀寫速度相對較慢,需要合理移除歷史文件
  • 核心配置項
    • agent.sources.kafka-source.batchSize:每一批次處理的數(shù)據(jù)量,可以根據(jù)需要修改
    • agent.sources.kafka-source.kafka.bootstrap.servers:Kafka的訂閱地址,包含主機及端口號
    • agent.sources.kafka-source.kafka.topics:Kafka的Topic名稱
    • agent.channels.file-channel.checkpointDir:本地磁盤路徑,需要預先創(chuàng)建父級目錄
    • agent.channels.file-channel.useDualCheckpoints:設置為true則開啟雙重機制,可額外設置一個備份路徑
    • agent.channels.file-channel.maxFileSize:單位為字節(jié),當達到文件大小時會自動滾動新建
    • agent.sinks.hdfs-sink.hdfs.path:最終數(shù)據(jù)在HDFS的保存路徑,父級目錄需要手動創(chuàng)建
  • 在Flume的conf文件夾中新建配置文件kafka-file-hdfs.conf
# Name the components on this agent
agent.sources = kafka-source
agent.channels = file-channel
agent.sinks = hdfs-sink

# Describe/configure the source
agent.sources.kafka-source.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.kafka-source.batchSize = 1000
agent.sources.kafka-source.kafka.bootstrap.servers = localhost:9092
agent.sources.kafka-source.kafka.topics = my-topic
agent.sources.kafka-source.kafka.consumer.group.id = flume-file-hdfs
agent.sources.kafka-source.kafka.consumer.auto.offset.reset = earliest

# Describe/configure the channel
agent.channels.file-channel.type = file
agent.channels.file-channel.capacity = 10000
agent.channels.file-channel.transactionCapacity = 1000
agent.channels.file-channel.checkpointDir = /tmp/flume/checkpoint/
agent.channels.file-channel.backupCheckpointDir = /tmp/flume/backup/
agent.channels.file-channel.checkpointInterval = 300
agent.channels.file-channel.maxFileSize = 104857600
agent.channels.file-channel.useDualCheckpoints = true

# Describe the sink
agent.sinks.hdfs-sink.type = hdfs
agent.sinks.hdfs-sink.hdfs.path = /flume_data/%Y-%m-%d
agent.sinks.hdfs-sink.hdfs.fileSuffix = .jsonl
agent.sinks.hdfs-sink.hdfs.rollInterval = 3600
agent.sinks.hdfs-sink.hdfs.rollSize = 0
agent.sinks.hdfs-sink.hdfs.rollCount = 1000
agent.sinks.hdfs-sink.transactionCapacity = 1000
agent.sinks.hdfs-sink.hdfs.fileType = DataStream
agent.sinks.hdfs-sink.hdfs.writeFormat = Text

# Bind the source and sink to the channel
agent.sources.kafka-source.channels = file-channel
agent.sinks.hdfs-sink.channel = file-channel

五、運行測試

開始執(zhí)行后,會按照預先配置的存儲規(guī)則 %Y-%m-%d,將每一天產生的數(shù)據(jù)存放在不同的文件夾,但是由于數(shù)據(jù)是分批到達的,所以每個文件夾中會有多個文件,但是這不影響數(shù)據(jù)的計算,如果需要可以合并整理。

1. 直接運行

Flume啟動時可以通過conf -f參數(shù)指定配置文件,建議分配較多的內存,防止溢出:

nohup flume-ng agent -c conf -f ptah/to/kafka-memory-hdfs.conf -n agent -Dflume.root.logger=INFO,console -Xmx2g &

運行日志可以在FLUME_HOME/flume.log中找到,測試穩(wěn)定后可以將進程掛在后臺執(zhí)行。

2. 監(jiān)控運行

如果需要方便的進行指標監(jiān)控,可以在啟動時加入Prometheus,具體安裝步驟可以查看可以自定義指標的監(jiān)控工具 - Prometheus的安裝部署。

  • jmx環(huán)境準備

下載jar包存儲在合適位置:jmx_prometheus_javaagent-0.18.0.jar

  • 配置文件修改

在flume的conf配置文件中【kafka-memory-hdfs.conf/kafka-file-hdfs.conf】添加如下內容:

flume.monitoring.type = jmx
  • 添加監(jiān)控規(guī)則:config.yaml

新建一個config.yaml文件,存放在合適位置。

startDelaySeconds: 0
ssl: false
lowercaseOutputName: false
lowercaseOutputLabelNames: false
whitelistObjectNames:
  - 'org.apache.flume.*:*'
blacklistObjectNames: []
  • 添加監(jiān)控配置:prometheus.yml

scrape_configs配置中增加一組和flume相關的job,修改后需要重新加載配置文件或者重啟Prometheus進程

scrape_configs:
  # The job name is added as a label `job=<job_name>` to any timeseries scraped from this config.
  - job_name: "prometheus"

    # metrics_path defaults to '/metrics'
    # scheme defaults to 'http'.

    static_configs:
      - targets: ["localhost:9090"]

  - job_name: "flume"
    static_configs:
      - targets: ["localhost:9998"]
  • 啟動命令

在啟動Flume時,額外指定jar包所在路徑,以及監(jiān)控規(guī)則文件所在路徑,設置的端口號為9998,與Prometheus中的設置保持一致。

nohup flume-ng agent -c conf -f path/to/kafka-memory-hdfs.conf -n agent -Dflume.root.logger=INFO,console -Xmx2g -javaagent:/path/to/jmx_prometheus_javaagent-0.18.0.jar=9998:/path/to/config.yaml &
  • 監(jiān)控效果

部署完成后可以通過jvm_threads_state指標來查看Flume的進程狀態(tài):
Flume實現(xiàn)Kafka數(shù)據(jù)持久化存儲到HDFS文章來源地址http://www.zghlxwxcb.cn/news/detail-464118.html

到了這里,關于Flume實現(xiàn)Kafka數(shù)據(jù)持久化存儲到HDFS的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網!

本文來自互聯(lián)網用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。如若轉載,請注明出處: 如若內容造成侵權/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經查實,立即刪除!

領支付寶紅包贊助服務器費用

相關文章

  • 深入了解Kafka的數(shù)據(jù)持久化機制

    深入了解Kafka的數(shù)據(jù)持久化機制

    歡迎來到我的博客,代碼的世界里,每一行都是一個故事 在消息傳遞的舞臺上,數(shù)據(jù)就像是時間的旅行者,承載著信息的流動。然而,時間不停歇。本文將帶你進入數(shù)據(jù)的永恒之路,探尋在Kafka中,數(shù)據(jù)如何通過持久化機制守護信息的不朽之旅。 持久化的基本概念: 在 Kaf

    2024年04月28日
    瀏覽(22)
  • docker (五)-docker存儲-數(shù)據(jù)持久化

    docker (五)-docker存儲-數(shù)據(jù)持久化

    將數(shù)據(jù)存儲在容器中,一旦容器被刪除,數(shù)據(jù)也會被刪除。同時也會使容器變得越來越大,不方便恢復和遷移。 將數(shù)據(jù)存儲到容器之外,這樣刪除容器也不會丟失數(shù)據(jù)。一旦容器故障,我們可以重新創(chuàng)建一個容器,將數(shù)據(jù)掛載到容器里,就可以快速的恢復。 volume 卷 卷存儲在

    2024年02月20日
    瀏覽(30)
  • Docker Swarm NFS 數(shù)據(jù)持久化存儲

    Docker Swarm NFS 數(shù)據(jù)持久化存儲

    可參考我前面的博客《基于 Linux 的 Docker Swarm 集群部署及應用》 本次實驗: master :192.168.56.142 work1 :192.168.56.132 work2 :192.168.56.180 可參考我前面的博客《構建NFS-FTP文件共享存儲》 本次實驗: NFS IP :192.168.56.141 Shared Dir :/data/sharedir 3.1 通過 Volume 3.1.1 創(chuàng)建 Volume 1、創(chuàng)建 Do

    2024年02月04日
    瀏覽(21)
  • (九)K8S數(shù)據(jù)持久化高級存儲

    NFS(Network File System)是一種分布式文件系統(tǒng)協(xié)議,用于通過網絡共享文件和目錄。它允許客戶端計算機通過網絡訪問和讀取遠程服務器上的文件,就像它們在本地文件系統(tǒng)中一樣。NFS 是一種常見的網絡文件共享協(xié)議,在許多環(huán)境中被廣泛使用。 在 Kubernetes 中,NFS 可以作為一

    2024年02月06日
    瀏覽(25)
  • 快速搞懂Pinia及數(shù)據(jù)持久化存儲(詳細教程)

    一.安裝及使用Pinia 1.安裝Pinia兩種方式都可,根據(jù)個人習慣來 2.在main.ts 中引入并掛載到根實例 3.src目錄下新建store/study/index.js并寫入 Store 是用defineStore()定義的,它的第一個參數(shù)是一個獨一無二的id,也是必須傳入的,Pinia 將用它來連接 store 和 devtools。 defineStore()第二個參數(shù)可

    2023年04月15日
    瀏覽(27)
  • 【云原生】第八篇--Docker容器數(shù)據(jù)持久化存儲機制

    物理機或虛擬機數(shù)據(jù)持久化存儲 由于物理機或虛擬機本身就擁有大容量的磁盤,所以可以直接

    2023年04月09日
    瀏覽(22)
  • Vuex的插件vuex-persistedstate數(shù)據(jù)持久化存儲

    用 sessionStorage 緩存上面 state 的數(shù)據(jù), key 名為 store

    2024年02月05日
    瀏覽(20)
  • 持續(xù)集成部署-k8s-數(shù)據(jù)持久化-基本存儲方式

    關于k8s 數(shù)據(jù)持久化,可以先看下官方的介紹:

    2024年02月13日
    瀏覽(27)
  • 持續(xù)集成部署-k8s-數(shù)據(jù)持久化-高級存儲方式

    持久卷(PersistentVolume,PV) 是集群中的一塊存儲,可以由管理員事先制備, 或者使用

    2024年02月16日
    瀏覽(22)
  • Redis九種數(shù)據(jù)類型及其持久化機制:探索數(shù)據(jù)存儲的奇妙世界

    Redis九種數(shù)據(jù)類型及其持久化機制:探索數(shù)據(jù)存儲的奇妙世界

    目錄 一、9種數(shù)據(jù)類型 3.1 Key操作 3.1.1 相關命令 練習: 3.2 String 3.2.1 結構圖 3.2.2 相關命令 ?練習: 3.3 List(雙向的鏈表) 3.3.1 結構圖 3.3.2 相關命令 練習: 3.4 Set(無序集合) 3.4.1 結構圖 3.4.2 相關命令 練習: 3.5 Zset(有序集合) 3.5.1 結構圖 3.5.2 相關命令 練習 3.6 Hash 3.6.1 結構

    2024年02月16日
    瀏覽(90)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領取紅包,優(yōu)惠每天領

二維碼1

領取紅包

二維碼2

領紅包