寫在前面:博主是一只經過實戰(zhàn)開發(fā)歷練后投身培訓事業(yè)的“小山豬”,昵稱取自動畫片《獅子王》中的“彭彭”,總是以樂觀、積極的心態(tài)對待周邊的事物。本人的技術路線從Java全棧工程師一路奔向大數(shù)據(jù)開發(fā)、數(shù)據(jù)挖掘領域,如今終有小成,愿將昔日所獲與大家交流一二,希望對學習路上的你有所助益。同時,博主也想通過此次嘗試打造一個完善的技術圖書館,任何與文章技術點有關的異常、錯誤、注意事項均會在末尾列出,歡迎大家通過各種方式提供素材。
- 對于文章中出現(xiàn)的任何錯誤請大家批評指出,一定及時修改。
- 有任何想要討論和學習的問題可聯(lián)系我:zhuyc@vip.163.com。
- 發(fā)布文章的風格因專欄而異,均自成體系,不足之處請大家指正。
Flume實現(xiàn)Kafka數(shù)據(jù)持久化存儲到HDFS
本文關鍵字:Flume、Kafka、HDFS、實時數(shù)據(jù)、存儲
一、場景描述
對于一些實時產生的數(shù)據(jù),除了做實時計算以外,一般還需要歸檔保存,用于離線數(shù)據(jù)分析。使用Flume的配置可以實現(xiàn)對數(shù)據(jù)的處理,并按一定的時間頻率存儲,本例中將從Kafka中按天存儲數(shù)據(jù)到HDFS的不同文件夾。
1. 數(shù)據(jù)輸入
本場景中數(shù)據(jù)來自Kafka中某個Topic訂閱,數(shù)據(jù)格式為json。
2. 數(shù)據(jù)管道
使用Flume作為數(shù)據(jù)處理管道,通過配置實現(xiàn)自定義存儲規(guī)則。
3. 數(shù)據(jù)輸出
最終數(shù)據(jù)將存儲在HDFS中,每一天的數(shù)據(jù)將對應一個單獨的文件夾。
二、組件介紹
1. Kafka
來自維基百科:Kafka是由Apache軟件基金會開發(fā)的一個開源流處理平臺,由Scala和Java編寫。該項目的目標是為處理實時數(shù)據(jù)提供一個統(tǒng)一、高吞吐、低延遲的平臺。其持久化層本質上是一個“按照分布式事務日志架構的大規(guī)模發(fā)布/訂閱消息隊列”,這使它作為企業(yè)級基礎設施來處理流式數(shù)據(jù)非常有價值。
如果需要參考安裝步驟可以點擊:Kafka 3.x的解壓安裝 - Linux
2. Hadoop
來自維基百科:Apache Hadoop是一款支持數(shù)據(jù)密集型分布式應用程序并以Apache 2.0許可協(xié)議發(fā)布的開源軟件框架,有助于使用許多計算機組成的網絡來解決數(shù)據(jù)、計算密集型的問題。基于MapReduce計算模型,它為大數(shù)據(jù)的分布式存儲與處理提供了一個軟件框架。所有的Hadoop模塊都有一個基本假設,即硬件故障是常見情況,應該由框架自動處理。
如果需要參考安裝步驟可以點擊:Hadoop 3.x各模式部署 - Ubuntu
3. Flume
來自維基百科:Apache Flume是一款分布式、可靠且可用的軟件,用于高效地收集、聚合和移動大量日志數(shù)據(jù)。它有一個基于流數(shù)據(jù)流的簡單而靈活的體系結構。它具有健壯性和容錯性,具有可調的可靠性機制以及許多故障切換和恢復機制。它使用了一個簡單的可擴展數(shù)據(jù)模型,允許在線分析應用程序。
Flume的運行只需要預先配置好JDK即可,安裝過程只需要解壓以及環(huán)境變量的配置。
三、前置準備
1. Flume下載
- 官網地址:https://flume.apache.org/
- 點擊Download -> 選擇binary中的tar.gz
- 進入鏡像地址列表,右鍵復制下載鏈接
- 使用wget下載到Linux系統(tǒng)
wget https://dlcdn.apache.org/flume/1.11.0/apache-flume-1.11.0-bin.tar.gz
2. Flume安裝
關于前置環(huán)境JDK的安裝可以參考:Hadoop 3.x各模式部署 - Ubuntu中前置環(huán)境的部分【點擊可直接跳轉到指定位置】。
- Flume解壓縮
tar -zvxf apache-flume-1.11.0-bin.tar.gz
- 環(huán)境變量配置
vi ~/.bashrc
export FLUME_HOME=/path/to/apache-flume-1.11.0-bin
export PATH=$PATH:$FLUME_HOME/bin
3. 數(shù)據(jù)源準備
可以在Kafka中創(chuàng)建一個新的Topic用于測試,具體步驟可以參考:Kafka 3.x的解壓安裝 - Linux中Console測試的部分【點擊可直接跳轉到指定位置】。
四、配置文件
在Flume中主要需要配置3個部分,source、channel、sink。本例中source為kafka,sink為HDFS,channel同樣有多種選擇。
1. 以內存為channel
- 優(yōu)缺點
- 優(yōu)點:速度較快,不會占用額外硬盤空間
- 缺點:只依賴Kafka的偏移量記錄,F(xiàn)lume自身不會存儲偏移量信息
- 核心配置項
- agent.sources.kafka-source.batchSize:每一批次處理的數(shù)據(jù)量,可以根據(jù)需要修改
- agent.sources.kafka-source.kafka.bootstrap.servers:Kafka的訂閱地址,包含主機及端口號
- agent.sources.kafka-source.kafka.topics:Kafka的Topic名稱
- agent.sinks.hdfs-sink.hdfs.path:最終數(shù)據(jù)在HDFS的保存路徑,父級目錄需要手動創(chuàng)建
- 在Flume的conf文件夾中新建配置文件kafka-memory-hdfs.conf:
# Name the components on this agent
agent.sources = kafka-source
agent.channels = memory-channel
agent.sinks = hdfs-sink
# Describe/configure the source
agent.sources.kafka-source.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.kafka-source.batchSize = 1000
agent.sources.kafka-source.kafka.bootstrap.servers = localhost:9092
agent.sources.kafka-source.kafka.topics = my-topic
agent.sources.kafka-source.kafka.consumer.group.id = flume-memory-hdfs
agent.sources.kafka-source.kafka.consumer.auto.offset.reset = earliest
# Describe/configure the channel
agent.channels.memory-channel.type = memory
agent.channels.memory-channel.capacity = 10000
agent.channels.memory-channel.transactionCapacity = 1000
# Describe the sink
agent.sinks.hdfs-sink.type = hdfs
agent.sinks.hdfs-sink.hdfs.path = /flume_data/%Y-%m-%d
agent.sinks.hdfs-sink.hdfs.fileSuffix = .jsonl
agent.sinks.hdfs-sink.hdfs.rollInterval = 3600
agent.sinks.hdfs-sink.hdfs.rollSize = 0
agent.sinks.hdfs-sink.hdfs.rollCount = 1000
agent.sinks.hdfs-sink.transactionCapacity = 1000
agent.sinks.hdfs-sink.hdfs.fileType = DataStream
agent.sinks.hdfs-sink.hdfs.writeFormat = Text
# Bind the source and sink to the channel
agent.sources.kafka-source.channels = memory-channel
agent.sinks.hdfs-sink.channel = memory-channel
2. 以文件為channel
- 優(yōu)缺點
- 優(yōu)點:可以保證數(shù)據(jù)不丟失,將數(shù)據(jù)狀態(tài)保存在本地磁盤上
- 缺點:會額外占用硬盤存儲空間,讀寫速度相對較慢,需要合理移除歷史文件
- 核心配置項
- agent.sources.kafka-source.batchSize:每一批次處理的數(shù)據(jù)量,可以根據(jù)需要修改
- agent.sources.kafka-source.kafka.bootstrap.servers:Kafka的訂閱地址,包含主機及端口號
- agent.sources.kafka-source.kafka.topics:Kafka的Topic名稱
- agent.channels.file-channel.checkpointDir:本地磁盤路徑,需要預先創(chuàng)建父級目錄
- agent.channels.file-channel.useDualCheckpoints:設置為true則開啟雙重機制,可額外設置一個備份路徑
- agent.channels.file-channel.maxFileSize:單位為字節(jié),當達到文件大小時會自動滾動新建
- agent.sinks.hdfs-sink.hdfs.path:最終數(shù)據(jù)在HDFS的保存路徑,父級目錄需要手動創(chuàng)建
- 在Flume的conf文件夾中新建配置文件kafka-file-hdfs.conf:
# Name the components on this agent
agent.sources = kafka-source
agent.channels = file-channel
agent.sinks = hdfs-sink
# Describe/configure the source
agent.sources.kafka-source.type = org.apache.flume.source.kafka.KafkaSource
agent.sources.kafka-source.batchSize = 1000
agent.sources.kafka-source.kafka.bootstrap.servers = localhost:9092
agent.sources.kafka-source.kafka.topics = my-topic
agent.sources.kafka-source.kafka.consumer.group.id = flume-file-hdfs
agent.sources.kafka-source.kafka.consumer.auto.offset.reset = earliest
# Describe/configure the channel
agent.channels.file-channel.type = file
agent.channels.file-channel.capacity = 10000
agent.channels.file-channel.transactionCapacity = 1000
agent.channels.file-channel.checkpointDir = /tmp/flume/checkpoint/
agent.channels.file-channel.backupCheckpointDir = /tmp/flume/backup/
agent.channels.file-channel.checkpointInterval = 300
agent.channels.file-channel.maxFileSize = 104857600
agent.channels.file-channel.useDualCheckpoints = true
# Describe the sink
agent.sinks.hdfs-sink.type = hdfs
agent.sinks.hdfs-sink.hdfs.path = /flume_data/%Y-%m-%d
agent.sinks.hdfs-sink.hdfs.fileSuffix = .jsonl
agent.sinks.hdfs-sink.hdfs.rollInterval = 3600
agent.sinks.hdfs-sink.hdfs.rollSize = 0
agent.sinks.hdfs-sink.hdfs.rollCount = 1000
agent.sinks.hdfs-sink.transactionCapacity = 1000
agent.sinks.hdfs-sink.hdfs.fileType = DataStream
agent.sinks.hdfs-sink.hdfs.writeFormat = Text
# Bind the source and sink to the channel
agent.sources.kafka-source.channels = file-channel
agent.sinks.hdfs-sink.channel = file-channel
五、運行測試
開始執(zhí)行后,會按照預先配置的存儲規(guī)則 %Y-%m-%d,將每一天產生的數(shù)據(jù)存放在不同的文件夾,但是由于數(shù)據(jù)是分批到達的,所以每個文件夾中會有多個文件,但是這不影響數(shù)據(jù)的計算,如果需要可以合并整理。
1. 直接運行
Flume啟動時可以通過conf -f參數(shù)指定配置文件,建議分配較多的內存,防止溢出:
nohup flume-ng agent -c conf -f ptah/to/kafka-memory-hdfs.conf -n agent -Dflume.root.logger=INFO,console -Xmx2g &
運行日志可以在FLUME_HOME/flume.log中找到,測試穩(wěn)定后可以將進程掛在后臺執(zhí)行。
2. 監(jiān)控運行
如果需要方便的進行指標監(jiān)控,可以在啟動時加入Prometheus,具體安裝步驟可以查看可以自定義指標的監(jiān)控工具 - Prometheus的安裝部署。
- jmx環(huán)境準備
下載jar包存儲在合適位置:jmx_prometheus_javaagent-0.18.0.jar
- 配置文件修改
在flume的conf配置文件中【kafka-memory-hdfs.conf/kafka-file-hdfs.conf】添加如下內容:
flume.monitoring.type = jmx
- 添加監(jiān)控規(guī)則:config.yaml
新建一個config.yaml文件,存放在合適位置。
startDelaySeconds: 0
ssl: false
lowercaseOutputName: false
lowercaseOutputLabelNames: false
whitelistObjectNames:
- 'org.apache.flume.*:*'
blacklistObjectNames: []
- 添加監(jiān)控配置:prometheus.yml
在scrape_configs配置中增加一組和flume相關的job,修改后需要重新加載配置文件或者重啟Prometheus進程
scrape_configs:
# The job name is added as a label `job=<job_name>` to any timeseries scraped from this config.
- job_name: "prometheus"
# metrics_path defaults to '/metrics'
# scheme defaults to 'http'.
static_configs:
- targets: ["localhost:9090"]
- job_name: "flume"
static_configs:
- targets: ["localhost:9998"]
- 啟動命令
在啟動Flume時,額外指定jar包所在路徑,以及監(jiān)控規(guī)則文件所在路徑,設置的端口號為9998,與Prometheus中的設置保持一致。文章來源:http://www.zghlxwxcb.cn/news/detail-464118.html
nohup flume-ng agent -c conf -f path/to/kafka-memory-hdfs.conf -n agent -Dflume.root.logger=INFO,console -Xmx2g -javaagent:/path/to/jmx_prometheus_javaagent-0.18.0.jar=9998:/path/to/config.yaml &
- 監(jiān)控效果
部署完成后可以通過jvm_threads_state指標來查看Flume的進程狀態(tài):文章來源地址http://www.zghlxwxcb.cn/news/detail-464118.html
到了這里,關于Flume實現(xiàn)Kafka數(shù)據(jù)持久化存儲到HDFS的文章就介紹完了。如果您還想了解更多內容,請在右上角搜索TOY模板網以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網!