Hadoop高手之路8-Flume日志采集
在大數(shù)據(jù)系統(tǒng)的開(kāi)發(fā)中,數(shù)據(jù)收集工作無(wú)疑是開(kāi)發(fā)者首要解決的一個(gè)難題,但由于生產(chǎn)數(shù)據(jù)的源頭豐富多樣,其中包含網(wǎng)站日志數(shù)據(jù)、后臺(tái)監(jiān)控?cái)?shù)據(jù)、用戶瀏覽網(wǎng)頁(yè)數(shù)據(jù)等,數(shù)據(jù)工程師要想將它們分門(mén)別類的采集到HDFS系統(tǒng)中,就可以使用Apache Flume(數(shù)據(jù)采集)系統(tǒng)。
一、Flume概述
1. Flume簡(jiǎn)介
Flume原是Cloudera公司提供的一個(gè)高可用的、高可靠的、分布式海量日志采集、聚合和傳輸系統(tǒng),而后納入到了Apache旗下,作為一個(gè)頂級(jí)開(kāi)源項(xiàng)目。Apache Flume不僅只限于日志數(shù)據(jù)的采集,由于Flume采集的數(shù)據(jù)源是可定制的,因此Flume還可用于傳輸大量事件數(shù)據(jù),包括但不限于網(wǎng)絡(luò)流量數(shù)據(jù)、社交媒體生成的數(shù)據(jù)、電子郵件消息以及幾乎任何可能的數(shù)據(jù)源。
2. Flume運(yùn)行機(jī)制
Flume的核心是把數(shù)據(jù)從數(shù)據(jù)源(例如Web服務(wù)器)通過(guò)數(shù)據(jù)采集器(Source)收集過(guò)來(lái),再將收集的數(shù)據(jù)通過(guò)緩沖通道(Channel)匯集到指定的接收器(Sink)。
Flume基本架構(gòu)中有一個(gè)Agent(代理),它是Flume的核心角色,F(xiàn)lume Agent是一個(gè)JVM進(jìn)程,它承載著數(shù)據(jù)從外部源流向下一個(gè)目標(biāo)的三個(gè)核心組件:Source、Channel和Sink。
3. Flume日志采集系統(tǒng)結(jié)構(gòu)圖
在實(shí)際開(kāi)發(fā)中, Flume需要采集數(shù)據(jù)的類型多種多樣,同時(shí)還會(huì)進(jìn)行不同的中間操作,所以根據(jù)具體需求,可以將Flume日志采集系統(tǒng)分為簡(jiǎn)單結(jié)構(gòu)和復(fù)雜結(jié)構(gòu)。
簡(jiǎn)單的Flume日志采集系統(tǒng)的結(jié)構(gòu)
復(fù)雜的Flume日志采集系統(tǒng)的結(jié)構(gòu)
二、Flume的搭建
1. 下載
注意:使用1.9.0這個(gè)版本
2. 上傳
3. 解壓
4. 配置環(huán)境變量
5. 配置flume
修改flume-env.sh文件
三、Flume入門(mén)使用
1. 配置數(shù)據(jù)采集方案
1) 查看官網(wǎng)
2) 案例需求
flume連接和監(jiān)聽(tīng)服務(wù)器的某個(gè)端口,采集數(shù)據(jù)并顯示
3) 創(chuàng)建新的配置文件
4) 復(fù)制官網(wǎng)的采集配置示例,在此基礎(chǔ)上進(jìn)行修改
# example.conf: A single-node Flume configuration
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444
# Describe the sink
a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
2. 啟動(dòng)flume進(jìn)行采集
啟動(dòng)命令查看官網(wǎng)
flume-ng agent --conf conf --conf-file conf/netcat-logger.conf --name a1 -Dflume.root.logger=INFO,console
3. 采集數(shù)據(jù)測(cè)試
用telnet向本機(jī)的44444端口發(fā)送數(shù)據(jù),模擬服務(wù)器產(chǎn)生數(shù)據(jù)
在使用之前,要先安裝telnet
在telnet端發(fā)送數(shù)據(jù)
在flume端顯示采集到的數(shù)據(jù)
四、Flume采集方案說(shuō)明
1. Flume Sources
在編寫(xiě)Flume采集方案時(shí),首先必須明確采集的數(shù)據(jù)源類型、出處;接著,根據(jù)這些信息與Flume已提供支持的Flume Sources進(jìn)行匹配,選擇對(duì)應(yīng)的數(shù)據(jù)采集器類型(即sources.type);再根據(jù)選擇的數(shù)據(jù)采集器類型,配置必要和非必要的數(shù)據(jù)采集器屬性,F(xiàn)lume提供并支持的Flume Sources種類如下所示。
Avro Source | Thrift Source | Exec Source |
---|---|---|
JMS Source | Spooling Directory Source | Twitter 1% firehose Source |
Kafka Source | NetCat TCP Source | NetCat UDP Source |
Sequence Generator Source | Syslog TCP Source | Multiport Syslog TCP Source |
Syslog UDP Source | HTTP Source | Stress Source |
Avro Legacy Source | Thrift Legacy Source | Custom Source |
Scribe Source | Taildir Source |
1) Avro Source
監(jiān)聽(tīng)Avro端口并從外部Avro客戶端流中接收event數(shù)據(jù),當(dāng)與另一個(gè)Flume Agent上的Avro Sink配對(duì)時(shí),可創(chuàng)建分層集合拓?fù)洌肁vro Source可以實(shí)現(xiàn)多級(jí)流動(dòng)、扇出流、扇入流等效果,Avro Source常用配置屬性如下。
屬性名稱 | 默認(rèn)值 | 相關(guān)說(shuō)明 |
---|---|---|
channels | – | |
type | – | 組件類型名需必須是avro |
bind | – | 要監(jiān)聽(tīng)的主機(jī)名或IP地址 |
port | – | 要監(jiān)聽(tīng)的服務(wù)端口 |
threads | – | 要生成的工作線程的最大數(shù)目 |
ssl | false | 將此設(shè)置為true以啟用SSL加密,則還必須指定“keystore”和“keystore-password” |
keystore | – | SSL所必需的通往Java秘鑰存儲(chǔ)路徑 |
keystore-password | – | SSL所必需的Java密鑰存儲(chǔ)的密碼 |
2) Spooling Directory Source
Spooling Directory Source允許對(duì)指定磁盤(pán)上的文件目錄進(jìn)行監(jiān)控來(lái)提取數(shù)據(jù),它將查看文件的指定目錄的新增文件,并將文件中的數(shù)據(jù)讀取出來(lái)。Spooling Directory Source常用配置屬性如下表所示。
屬性名稱 | 默認(rèn)值 | 相關(guān)說(shuō)明 |
---|---|---|
channels | – | |
type | – | 組件類型名需必須是spooldir |
spoolDir | – | 從中讀取文件的目錄 |
fileSuffix | .COMPLETED | 附加到完全攝取的文件后綴 |
deletePolicy | never | 何時(shí)刪除已完成的文件:never或immediate |
fileHeader | false | 是否添加存儲(chǔ)絕對(duì)路徑文件名的標(biāo)頭 |
includePattern | ^.*$ | 正則表達(dá)式,指定要包含的文件 |
ignorePattern | ^$ | 正則表達(dá)式指定要忽略的文件 |
3) Taildir Source
Taildir Source用于觀察指定的文件,幾乎可以實(shí)時(shí)監(jiān)測(cè)到添加到每個(gè)文件的新行。如果文件正在寫(xiě)入新行,則此采集器將重試采集它們以等待寫(xiě)入完成,Source常用配置屬性如下所示。
屬性名稱 | 默認(rèn)值 | 相關(guān)說(shuō)明 |
---|---|---|
channels | – | |
type | – | 組件類型名需必須是TAILDIR |
filegroups | – | 以空格分隔的文件組列表 |
filegroups.<filegroupName> | – | 文件組的絕對(duì)路徑 |
idleTimeout | 120000 | 關(guān)閉非活動(dòng)文件的時(shí)間(毫秒) |
writePosInterval | 3000 | 寫(xiě)入位置文件上每個(gè)文件的最后位置的間隔時(shí)間 |
batchSize | 100 | 一次讀取和發(fā)送到通道的最大行數(shù) |
backoffSleepIncrement | 1000 | 當(dāng)最后一次嘗試未找到任何新數(shù)據(jù)時(shí),每次重新嘗試輪詢新數(shù)據(jù)之間的最大時(shí)間延遲 |
fileHeader | false | 是否添加存儲(chǔ)絕對(duì)路徑文件名的標(biāo)頭 |
fileHeaderKey | file | 將絕對(duì)路徑文件名附加到event header時(shí)使用的header關(guān)鍵字 |
channels | – | |
type | 組件類型名需必須是http | |
port | – | 采集源要綁定的端口 |
bind | 0.0.0.0 | 要監(jiān)聽(tīng)綁定的主機(jī)名或IP地址 |
handler | org.apache.flume.source.http.JSONHandler | handler類的全路徑名 |
4) HTTP Source
HTTP Source可以通過(guò)HTTP POST和GET請(qǐng)求方式接收event數(shù)據(jù),GET通常只能用于測(cè)試使用,POST請(qǐng)求發(fā)送的所有的events都被認(rèn)為是一個(gè)批次,會(huì)在一個(gè)事務(wù)中插入channel,HTTP Source常用配置屬性如下所示。
屬性名稱 | 默認(rèn)值 | 相關(guān)說(shuō)明 |
---|---|---|
channels | – | |
type | 組件類型名需必須是http | |
port | – | 采集源要綁定的端口 |
bind | 0.0.0.0 | 要監(jiān)聽(tīng)綁定的主機(jī)名或IP地址 |
handler | org.apache.flume.source.http.JSONHandler | handler類的全路徑名 |
2. Flume Channels
? Channels通道是event在Agent上暫存的存儲(chǔ)庫(kù),Source向Channel中添加event,Sink在讀取完數(shù)據(jù)后再刪除它。在配置Channels時(shí),需要明確的就是將要傳輸?shù)膕ources數(shù)據(jù)源類型;根據(jù)這些信息結(jié)合開(kāi)發(fā)中的實(shí)際需求,選擇Flume已提供的支持的Flume Channels;再根據(jù)選擇的Channel類型,配置必要和非必要的Channel屬性,F(xiàn)lume提供并支持的Flume Channels種類如下所示。
Memory Channel | JDBC Channel | Kafka Channel |
---|---|---|
File Channel | Spillable Memory Channel | Pseudo Transaction Channel |
Custom Channel |
1) Memory Channel
Memory Channel會(huì)將event存儲(chǔ)在具有可配置最大尺寸的內(nèi)存隊(duì)列中,適用于需要更高吞吐量的流量,但在Agent發(fā)生故障時(shí)會(huì)丟失部分階段數(shù)據(jù),下表為Memory Channel常用配置屬性。
屬性名稱 | 默認(rèn)值 | 相關(guān)說(shuō)明 |
---|---|---|
type | – | 組件類型名需必須是memory |
capacity | 100 | 存儲(chǔ)在channel中的最大event數(shù) |
transactionCapacity | 100 | channel從source接收或向sink傳遞的每個(gè)事務(wù)中最大event數(shù) |
keep-alive | 3 | 添加或刪除event的超時(shí)時(shí)間(秒) |
byteCapacityBufferPercentage | 20 | 定義byteCapacity與channel中所有event所占百分比 |
byteCapacity | 等于JVM可用的最大內(nèi)存的80% | 允許此channel中所有event的的最大內(nèi)存字節(jié)數(shù)總和 |
2) File Channel
File Channel是Flume的持久通道,它將所有event寫(xiě)入磁盤(pán),因此不會(huì)丟失進(jìn)程或機(jī)器關(guān)機(jī)、崩潰時(shí)的數(shù)據(jù)。File Channel通過(guò)在一次事務(wù)中提交多個(gè)event來(lái)提高吞吐量,做到了只要事務(wù)被提交,那么數(shù)據(jù)就不會(huì)有丟失,F(xiàn)ile Channel常用配置屬性如下所示。
屬性名稱 | 默認(rèn)值 | 相關(guān)說(shuō)明 |
---|---|---|
type | – | 組件類型名需必須是file |
checkpointDir | 1~/.flume/file-channel/checkpoint | 檢測(cè)點(diǎn)文件所存儲(chǔ)的目錄 |
useDualCheckpoints | false | 備份檢測(cè)點(diǎn)如果設(shè)置為true,backupChec kpointDir必須設(shè)置 |
backupCheckpointDir | – | 備份檢查點(diǎn)目錄。此目錄不能與數(shù)據(jù)目錄或檢查點(diǎn)目錄相同 |
dataDirs | ~/.flume/file-channel/data | 數(shù)據(jù)存儲(chǔ)所在的目錄設(shè)置 |
transactionCapacity | 10000 | 事務(wù)容量的最大值設(shè)置 |
checkpointInterval | 30000 | 檢測(cè)點(diǎn)之間的時(shí)間值設(shè)置(單位微秒) |
maxFileSize | 2146435071 | 一個(gè)單一日志的最大值設(shè)置(以字節(jié)為單位) |
capacity | 1000000 | channel的最大容量 |
transactionCapacity | 10000 | 事務(wù)容量的最大值設(shè)置 |
3. Flume Sinks
Flume Soures采集的數(shù)據(jù)通過(guò)Channels通道流向Sink中,此時(shí)Sink類似一個(gè)集結(jié)的遞進(jìn)中心,它需要根據(jù)需求進(jìn)行配置,從而最終選擇發(fā)送目的地。配置Sinks時(shí),明確將要傳輸?shù)臄?shù)據(jù)目的地、結(jié)果類型;然后根據(jù)實(shí)際需求信息,選擇Flume已提供支持的Flume Sinks;再根據(jù)選擇的Sinks類型,配置必要和非必要的Sinks屬性。Flume提供并支持的Flume Sinks種類如下所示。
HDFS Sink | Hive Sink | Logger Sink |
---|---|---|
Avro Sink | Thrift Sink | IRC Sink |
File Roll Sink | Null Sink | HBaseSink |
AsyncHBase Sink | MorphlineSolr Sink | ElasticSearch Sink |
Kite Dataset Sink | Kafka Sink | HTTP Sink |
Custom Sink |
1) HDFS Sink
HDFS Sink將event寫(xiě)入Hadoop分布式文件系統(tǒng)(HDFS),它目前支持創(chuàng)建文本和序列文件,以及兩種類型的壓縮文件,下表為HDFS Sink常用配置屬性。
屬性名稱 | 默認(rèn)值 | 相關(guān)說(shuō)明 |
---|---|---|
channel | – | |
type | – | 組件類型名需必須是hdfs |
hdfs.path | – | HDFS目錄路徑 |
hdfs.filePrefix | FlumeData | 為在hdfs目錄中由Flume創(chuàng)建的文件指定前綴 |
hdfs.round | false | 是否應(yīng)將時(shí)間戳向下舍入 |
hdfs.roundValue | 1 | 舍入到此最高倍數(shù),小于當(dāng)前時(shí)間 |
hdfs.roundUnit | second | 舍入值的單位 - 秒、分鐘或小時(shí) |
hdfs.rollInterval | 30 | 滾動(dòng)當(dāng)前文件之前等待的秒數(shù) |
hdfs.rollSize | 1024 | 觸發(fā)滾動(dòng)的文件大小,以字節(jié)為單位 |
hdfs.rollCount | 10 | 在滾動(dòng)之前寫(xiě)入文件的事件數(shù) |
2) Logger Sink
Logger Sink用于記錄INFO級(jí)別event,它通常用于調(diào)試。Logger Sink接收器的不同之處是它不需要在“記錄原始數(shù)據(jù)”部分中說(shuō)明額外的配置,Logger Sink常用配置屬性如下所示。
屬性名稱 | 默認(rèn)值 | 相關(guān)說(shuō)明 |
---|---|---|
channel | – | |
type | – | 組件類型名需必須是logger |
maxBytesToLog | 16 | 要記錄的event body的最大字節(jié)數(shù) |
3) Avro Sink
Avro Sink形成Flume分層收集支持的一半,發(fā)送到此接收器的Flume event轉(zhuǎn)換為Avro event,并發(fā)送到對(duì)應(yīng)配置的主機(jī)名/端口,event將從配置的channel中批量獲取配置的批處理大小,Avro Sink常用配置屬性如下所示。
屬性名稱 | 默認(rèn)值 | 相關(guān)說(shuō)明 |
---|---|---|
channel | – | |
type | – | 組件類型名需必須是avro |
hostname | – | 要監(jiān)聽(tīng)的主機(jī)名或IP地址 |
port | – | 要監(jiān)聽(tīng)的服務(wù)端口 |
batch-size | 100 | 要一起批量發(fā)送的event數(shù) |
connect-timeout | 20000 | 允許第一次(握手)請(qǐng)求的時(shí)間量(ms) |
request-timeout | 20000 | 在第一個(gè)之后允許請(qǐng)求的時(shí)間量(ms) |
五、Flume采集數(shù)據(jù)案例一——監(jiān)控文件夾的變化
1. 需求
監(jiān)控服務(wù)器下的某個(gè)文件夾(日志輸出文件夾),如果該文件夾下有新的文件,則采集該文件的內(nèi)容上傳到hadoop集群上。
2. 編寫(xiě)數(shù)據(jù)采集配置方案
source的官網(wǎng)示例
sink的官網(wǎng)示例
# example.conf: A single-node Flume configuration
# Name the components on this agent
# 定義代理的名字及各個(gè)組件sources、sinks、channels
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
# 配置數(shù)據(jù)源
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /var/log/flumeSpool
a1.sources.r1.fileHeader = true
# Describe the sink
# 描述數(shù)據(jù)的目的地(下沉)
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/events/%Y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
# Use a channel which buffers events in memory
# 定義管道
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
# 組裝各個(gè)組件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
# 啟動(dòng)命令
# flume-ng agent --conf conf --conf-file conf/spoolingdir-hdfs.conf --name a1 -Dflume.root.logger=INFO,console
3. 啟動(dòng)hdfs集群
4. 啟動(dòng)flume
出現(xiàn)錯(cuò)誤,提示監(jiān)控目錄不存在,需要提前創(chuàng)建
再次啟動(dòng)flume
5. 測(cè)試flume數(shù)據(jù)采集
在/var/log/flumeSpool文件夾下添加一個(gè)新文件
出現(xiàn)錯(cuò)誤,修改采集方案
再次啟動(dòng)flume測(cè)試
提示錯(cuò)誤
用hadoop的guava新版本替換flume的舊版本
再次啟動(dòng)flume,然后在/var/log/flumeSpool文件夾下添加一個(gè)新文件
在hdfs集群上查看
解決亂碼,修改文件:
# example.conf: A single-node Flume configuration
# Name the components on this agent
# 定義代理的名字及各個(gè)組件sources、sinks、channels
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
# 配置數(shù)據(jù)源
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /var/log/flumeSpool
a1.sources.r1.fileHeader = true
# Describe the sink
# 描述數(shù)據(jù)的目的地(下沉)
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/events/%Y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.useLocalTimeStamp = true
a1.sinks.k1.hdfs.rollInterval = 3
a1.sinks.k1.hdfs.rollSize = 20
a1.sinks.k1.hdfs.rollCount = 5
a1.sinks.k1.hdfs.batchSize = 1
a1.sinks.k1.hdfs.fileType = DataStream
# Use a channel which buffers events in memory
# 定義管道
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
# 組裝各個(gè)組件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
# 啟動(dòng)命令
# flume-ng agent --conf conf --conf-file conf/spoolingdir-hdfs.conf --name a1 -Dflume.root.logger=INFO,console
重新啟動(dòng)flume,在創(chuàng)建一個(gè)新的文件
這樣結(jié)果就可以了。
六、Flume采集數(shù)據(jù)案例二——監(jiān)控文件的變化
1.采集方案
數(shù)據(jù)源
數(shù)據(jù)下沉不要修改,采集配置方案如下:
配置代碼:
# example.conf: A single-node Flume configuration
# Name the components on this agent
# 定義代理的名字a1及各個(gè)組件sources、sinks和channels
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
# 定義數(shù)據(jù)源
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/text.log
# Describe the sink
# 定義數(shù)據(jù)的目的地(下沉)
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
# 是否循環(huán)創(chuàng)建文件夾
a1.sinks.k1.hdfs.round = true
# 循環(huán)創(chuàng)建文件夾的時(shí)間間隔是10分鐘
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
# 使用本地時(shí)間格式
a1.sinks.k1.hdfs.useLocalTimeStamp = true
# 時(shí)間間隔
a1.sinks.k1.hdfs.rollInterval = 3
# 大小間隔
a1.sinks.k1.hdfs.rollSize = 20
# event的個(gè)數(shù),這三個(gè)參數(shù)誰(shuí)先滿足就出發(fā)循環(huán)滾動(dòng)
a1.sinks.k1.hdfs.rollCount = 5
# 批處理數(shù)量
a1.sinks.k1.hdfs.batchSize = 1
# 文件格式,表示普通文本格式
a1.sinks.k1.hdfs.fileType = DataStream
# Use a channel which buffers events in memory
# 定義管道
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
# 組裝組件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
# 啟動(dòng)命令
# flume-ng agent --conf conf --conf-file conf/exec-hdfs.conf --name a1 -Dflume.root.logger=INFO,console
2. 測(cè)試采集功能
模擬場(chǎng)景
先寫(xiě)一個(gè)shell腳本,持續(xù)輸出當(dāng)前日期到監(jiān)控文件/var/log/test.log中,模擬服務(wù)器日志的文件
while true;do date >> /var/log/text.log;done
3. 再克隆一個(gè)會(huì)話,查看新增的內(nèi)容
4. 啟動(dòng)flume
flume-ng agent --conf conf --conf-file conf/exec-hdfs.conf --name a1 -Dflume.root.logger=INFO,console
5. 查看HDFS上的結(jié)果
七、Flume的可靠性保證-負(fù)載均衡
1. 概念
配置的采集方案是通過(guò)唯一一個(gè)Sink作為接收器接收后續(xù)需要的數(shù)據(jù),但會(huì)出現(xiàn)當(dāng)前Sink故障或數(shù)據(jù)收集請(qǐng)求量較大的情況,這時(shí)單一Sink配置可能就無(wú)法保證Flume開(kāi)發(fā)的可靠性。因此,F(xiàn)lume 提供Flume Sink Processors解決上述問(wèn)題。
Sink處理器允許定義Sink groups,將多個(gè)sink分組到一個(gè)實(shí)體中,Sink處理器就可通過(guò)組內(nèi)多個(gè)sink為服務(wù)提供負(fù)載均衡功能。
負(fù)載均衡接收器處理器(Load balancing sink processor)提供了在多個(gè)sink上進(jìn)行負(fù)載均衡流量的功能,它維護(hù)一個(gè)活躍的sink索引列表,需在其上分配負(fù)載,還支持round_robin(輪詢)和random(隨機(jī))選擇機(jī)制進(jìn)行流量分配,默認(rèn)選擇機(jī)制為round_robin。Load balancing sink processor提供的配置屬性如下所示。
屬性名稱 | 默認(rèn)值 | 相關(guān)說(shuō)明 |
---|---|---|
sinks | – | 以空格分隔的參與sink組的sink列表 |
processor.type | default | 組件類型名需必須是load_balance |
processor.backoff | false | 設(shè)置失敗的sink進(jìn)入黑名單 |
processor.selector | round_robin | 選擇機(jī)制 |
processor.selector.maxTimeOut | 30000 | 失敗sink放置在黑名單的超時(shí)時(shí)間 |
2. 搭建并配置flume集群
三臺(tái)服務(wù)器的flume集群:hadoop01、hadoop02、hadoop03
1) 分發(fā)hadoop01上的flume文件到hadoop02和hadoop03上
2) 分發(fā)環(huán)境變量配置文件
3) 使環(huán)境變量起作用
3. 配置采集方案
兩級(jí)配置方案
查看官方文檔的示例
1) 在hadoop001上配置第一級(jí)采集方案
在conf下編寫(xiě)采集方案exec-avro.conf
代碼如下:
# example.conf: A single-node Flume configuration
# Name the components on this agent
# 定義代理的名字a1及各個(gè)組件sources、sinks和channels
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
#定義組的屬性
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
#定義負(fù)載均衡
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = random
a1.sinkgroups.g1.processor.selector.maxTimeOut = 30000
# Describe/configure the source
# 定義數(shù)據(jù)源
a1.sources.r1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/test1.log
# Describe the sink
# 定義數(shù)據(jù)的目的地1(下沉)
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = hadoop002
a1.sinks.k1.port = 4545
# 定義數(shù)據(jù)的目的地2(下沉)
a1.sinks.k2.type = avro
a1.sinks.k2.channel = c1
a1.sinks.k2.hostname = hadoop003
a1.sinks.k2.port = 4545
# Use a channel which buffers events in memory
# 定義管道
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 啟動(dòng)命令
# flume-ng agent --conf conf --conf-file conf/exec-avro.conf --name a1 -Dflume.root.logger=INFO,console
2)在hadoop002和hadoop003上配置第二級(jí)采集方案
hadoop002
# example.conf: A single-node Flume configuration
# Name the components on this agent
# 定義代理的名字a1及各個(gè)組件sources、sinks和channels
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
# 定義數(shù)據(jù)源
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop002
a1.sources.r1.port = 4545
# Describe the sink
# 定義數(shù)據(jù)的目的地(下沉)
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
# 是否循環(huán)創(chuàng)建文件夾
a1.sinks.k1.hdfs.round = true
# 循環(huán)創(chuàng)建文件夾的時(shí)間間隔是十分鐘
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
# 使用本地時(shí)間個(gè)數(shù)
a1.sinks.k1.hdfs.useLocalTimeStamp = true
// 列編輯模式,按住alt選擇多列
# 時(shí)間間隔
a1.sinks.k1.hdfs.rollInterval = 3
# 大小間隔
a1.sinks.k1.hdfs.rollSize = 20
# event的個(gè)數(shù),這三個(gè)參數(shù)誰(shuí)先滿足就出發(fā)循環(huán)滾動(dòng)
a1.sinks.k1.hdfs.rollCount = 5
# 批處理數(shù)量
a1.sinks.k1.hdfs.batchSize = 1
# 文件格式 表示普通文本文件
a1.sinks.k1.hdfs.fileType = DataStream
# Use a channel which buffers events in memory
# 定義管道
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
# 組裝組件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
# 啟動(dòng)命令
# flume-ng agent --conf conf --conf-file conf/avro-hdfs.conf --name a1 -Dflume.root.logger=INFO,console
# flume-ng agent --conf conf --conf-file avro-hdfs.conf --name a1 -Dflume.root.logger=INFO,console
hadoop003
# example.conf: A single-node Flume configuration
# Name the components on this agent
# 定義代理的名字a1及各個(gè)組件sources、sinks和channels
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
# 定義數(shù)據(jù)源
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop003
a1.sources.r1.port = 4545
# Describe the sink
# 定義數(shù)據(jù)的目的地(下沉)
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
# 是否循環(huán)創(chuàng)建文件夾
a1.sinks.k1.hdfs.round = true
# 循環(huán)創(chuàng)建文件夾的時(shí)間間隔是十分鐘
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
# 使用本地時(shí)間個(gè)數(shù)
a1.sinks.k1.hdfs.useLocalTimeStamp = true
// 列編輯模式,按住alt選擇多列
# 時(shí)間間隔
a1.sinks.k1.hdfs.rollInterval = 3
# 大小間隔
a1.sinks.k1.hdfs.rollSize = 20
# event的個(gè)數(shù),這三個(gè)參數(shù)誰(shuí)先滿足就出發(fā)循環(huán)滾動(dòng)
a1.sinks.k1.hdfs.rollCount = 5
# 批處理數(shù)量
a1.sinks.k1.hdfs.batchSize = 1
# 文件格式 表示普通文本文件
a1.sinks.k1.hdfs.fileType = DataStream
# Use a channel which buffers events in memory
# 定義管道
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
# 組裝組件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
# 啟動(dòng)命令
# flume-ng agent --conf conf --conf-file conf/avro-hdfs.conf --name a1 -Dflume.root.logger=INFO,console
# flume-ng agent --conf conf --conf-file avro-hdfs.conf --name a1 -Dflume.root.logger=INFO,console
4. 啟動(dòng)flume
1) 在hadoop002和hadoop003上flume
從最后一級(jí)開(kāi)始啟動(dòng)flume
hadoop002
hadoop003
2) 在hadoop001上啟動(dòng)flume
同時(shí)在hadoop002和hadoop003上也提示連接成功
5. 負(fù)載均衡的測(cè)試
1) 克隆hadoop001的會(huì)話,編寫(xiě)腳本并運(yùn)行
while true;do date >> /var/log/test1.log;sleep 5;done
2) 克隆hadoop001的會(huì)話,查看文件變化
tail -F /var/log/test1.log
6. 查看結(jié)果
1)hadoop002
2)hadoop003
3) 在hadoop集群上
八、Flume的可靠性保證-故障恢復(fù)
1. 概念
故障轉(zhuǎn)移接收器處理器(Failover Sink Processor)維護(hù)一個(gè)具有優(yōu)先級(jí)的sink列表,保證在處理event時(shí),只需有一個(gè)可用的sink即可。
故障轉(zhuǎn)移機(jī)制工作原理是將故障的sink降級(jí)到故障池中,在池中為它們分配一個(gè)冷卻期,在重試之前冷卻時(shí)間會(huì)增加,當(dāng)sink成功發(fā)送event后,它將恢復(fù)到活躍池中。Failover Sink Processor提供的配置屬性如下所示。
屬性名稱 | 默認(rèn)值 | 相關(guān)說(shuō)明 |
---|---|---|
sinks | – | 以空格分隔的參與sink組的sink列表 |
processor.type | default | 組件類型名需必須是failover |
processor.priority. | – | 設(shè)置sink的優(yōu)先級(jí)取值 |
processor.maxpenalty | 30000 | 失敗sink的最大退避時(shí)間 |
2. 配置采集方案
只需要改動(dòng)第一級(jí)采集方案部分內(nèi)容
改為故障恢復(fù):
代碼如下:
# example.conf: A single-node Flume configuration
# Name the components on this agent
# 定義代理的名字a1及各個(gè)組件sources、sinks和channels
a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2
#定義組的屬性
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000
# Describe/configure the source
# 定義數(shù)據(jù)源
a1.sources.r1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/test1.log
# Describe the sink
# 定義數(shù)據(jù)的目的地1(下沉)
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = hadoop002
a1.sinks.k1.port = 4545
# 定義數(shù)據(jù)的目的地2(下沉)
a1.sinks.k2.type = avro
a1.sinks.k2.channel = c1
a1.sinks.k2.hostname = hadoop003
a1.sinks.k2.port = 4545
# Use a channel which buffers events in memory
# 定義管道
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# 啟動(dòng)命令
# flume-ng agent --conf conf --conf-file conf/exec-avro-failover.conf --name a1 -Dflume.root.logger=INFO,console
注意:?jiǎn)?dòng)命令也要做對(duì)應(yīng)的修改。
第二級(jí)采集方案不用修改。
3. 啟動(dòng)flume
從最后一級(jí)開(kāi)始啟動(dòng)flume。
啟動(dòng)hadoop002和hadoop003:
由于hadoop003的優(yōu)先級(jí)比hadoop002高,所以hadoop003先工作,hadoop002處于待機(jī)狀態(tài)。
啟動(dòng)hadoop001
4. 啟動(dòng)測(cè)試腳本
克隆hadoop001的會(huì)話,運(yùn)行測(cè)試腳本。
while true;do date >> /var/log/test1.log;sleep 5;done
5. 故障恢復(fù)
把hadoop03的flume關(guān)閉,等待十秒鐘(超時(shí)時(shí)間,在采集方案中進(jìn)行的定義)
查看hadoop集群
九、Flume攔截器
1. 概念
Flume Interceptors(攔截器)用于對(duì)Flume系統(tǒng)數(shù)據(jù)流中event的修改操作。使用Flume攔截器時(shí),只需參考官方配置屬性在采集方案中選擇性的配置即可,當(dāng)涉及到配置多個(gè)攔截器時(shí),攔截器名稱間需用空格分隔,且攔截器配置順序就是攔截順序。Flume 1.8.0版本中,F(xiàn)lume提供并支持的攔截器有很多,具體如下所示。
Timestamp Interceptor | Host Interceptor | Static Interceptor |
---|---|---|
Remove Header Interceptor | UUID Interceptor | Morphline Interceptor |
Search and Replace Interceptor | Regex Filtering Interceptor | Regex Extractor Interceptor |
1) Timestamp Interceptor
Timestamp Interceptor(時(shí)間戳攔截器)將流程執(zhí)行時(shí)間插入到event的header頭部,此攔截器插入帶有timestamp鍵的標(biāo)頭,其值為對(duì)應(yīng)時(shí)間戳。若配置中已存在時(shí)間戳?xí)r,此攔截器可保留現(xiàn)有時(shí)間戳,Timestamp Interceptor提供的常用配置屬性如下所示。
屬性名稱 | 默認(rèn)值 | 相關(guān)說(shuō)明 |
---|---|---|
type | – | 組件類型名需必須是timestamp |
header | timestamp | 用于放置生成的時(shí)間戳的標(biāo)頭的名稱 |
preserveExisting | false | 如果時(shí)間戳已存在,是否應(yīng)保留, true或false |
2) Static Interceptor
Static Interceptor(靜態(tài)攔截器)允許用戶將具有靜態(tài)值的靜態(tài)頭附加到所有event。當(dāng)前不支持一次指定多個(gè)header頭,但是用戶可定義多個(gè)Static Interceptor來(lái)為每一個(gè)攔截器都追加一個(gè)header,Static Interceptor提供的常用配置屬性如下所示。
屬性名稱 | 默認(rèn)值 | 相關(guān)說(shuō)明 |
---|---|---|
type | – | 組件類型名需必須是static |
preserveExisting | true | 如果配置的header已存在,是否應(yīng)保留 |
key | key | 應(yīng)創(chuàng)建的header的名稱 |
value | value | 應(yīng)創(chuàng)建的header對(duì)應(yīng)的靜態(tài)值 |
2. 場(chǎng)景
在實(shí)際開(kāi)發(fā)的應(yīng)用場(chǎng)景中,兩臺(tái)服務(wù)器A、B在實(shí)時(shí)產(chǎn)生日志數(shù)據(jù),日志類型主要為access.log、nginx.log和web.log?,F(xiàn)需要將A、B兩臺(tái)服務(wù)器產(chǎn)生的日志數(shù)據(jù)access.log、nginx.log和web.log采集匯總到服務(wù)器C上,并統(tǒng)一收集并上傳到HDFS文件系統(tǒng)進(jìn)行保存。在HDFS中保存日志數(shù)據(jù)的文件必須按照以下要求進(jìn)行歸類統(tǒng)計(jì)(20180723表示收集日志數(shù)據(jù)的當(dāng)前日期):
- /source/logs/access/20180723/**
- /source/logs/nginx/20180723/**
- /source/logs/web/20180723/**
3. 日志數(shù)據(jù)采集流程圖
根據(jù)案例需求啟動(dòng)3臺(tái)服務(wù)器,同時(shí)搭建Flume系統(tǒng)和Hadoop集群。此案例將hadoop002和hadoop003分別作為A服務(wù)器和B服務(wù)器進(jìn)行第一階段的日志數(shù)據(jù)采集,將hadoop001作為C服務(wù)器進(jìn)行日志數(shù)據(jù)匯總并上傳至HDFS。
4. hadoop002和hadoop003的配置文件
在hadoop002和hadoop003的conf目錄下編寫(xiě)相同日志采集方案exec-avro_logCollection.conf
。
# example.conf: A single-node Flume configuration
# Name the components on this agent
# 定義代理的名字a1及各個(gè)組件sources、sinks和channels
a1.sources = r1 r2 r3
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
# 定義r1數(shù)據(jù)源
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/access.log
# 定義攔截器r1
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = type
a1.sources.r1.interceptors.i1.value = access
# 定義r2數(shù)據(jù)源
a1.sources.r2.type = exec
a1.sources.r2.command = tail -F /var/log/nginx.log
# 定義攔截器r2
a1.sources.r2.interceptors = i2
a1.sources.r2.interceptors.i2.type = static
a1.sources.r2.interceptors.i2.key = type
a1.sources.r2.interceptors.i2.value = nginx
# 定義r3數(shù)據(jù)源
a1.sources.r3.type = exec
a1.sources.r3.command = tail -F /var/log/web.log
# 定義攔截器r3
a1.sources.r3.interceptors = i3
a1.sources.r3.interceptors.i3.type = static
a1.sources.r3.interceptors.i3.key = type
a1.sources.r3.interceptors.i3.value = web
# Describe the sink
# 定義數(shù)據(jù)的目的地(下沉)
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop001
a1.sinks.k1.port = 41414
# Use a channel which buffers events in memory
# 定義管道
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
# 組裝組件
a1.sources.r1.channels = c1
a1.sources.r2.channels = c1
a1.sources.r3.channels = c1
a1.sinks.k1.channel = c1
# 啟動(dòng)命令
# flume-ng agent --conf conf --conf-file conf/exec-avro_logCollection.conf --name a1 -Dflume.root.logger=INFO,console
在hadoop002上創(chuàng)建
在hadoop003上創(chuàng)建
5. hadoop001的配置文件
在hadoop001機(jī)器的conf目錄下編寫(xiě)第二級(jí)日志采集方案avro-hdfs_logCollection.conf
# Name the components on this agent
# 定義代理的名字a1及各個(gè)組件sources、sinks和channels
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
# 定義數(shù)據(jù)源
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop001
a1.sources.r1.port = 41414
#定義攔截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp
# Describe the sink
# 定義數(shù)據(jù)的目的地(下沉)
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/logs/%{type}/%y%m%d
a1.sinks.k1.hdfs.filePrefix = events-
# 是否循環(huán)創(chuàng)建文件夾
a1.sinks.k1.hdfs.round = true
# 循環(huán)創(chuàng)建文件夾的時(shí)間間隔是十分鐘
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
# 使用本地時(shí)間個(gè)數(shù)
a1.sinks.k1.hdfs.useLocalTimeStamp = true
// 列編輯模式,按住alt選擇多列
# 時(shí)間間隔
a1.sinks.k1.hdfs.rollInterval = 0
# 大小間隔
a1.sinks.k1.hdfs.rollSize = 10485760
# event的個(gè)數(shù),這三個(gè)參數(shù)誰(shuí)先滿足就出發(fā)循環(huán)滾動(dòng)
a1.sinks.k1.hdfs.rollCount = 0
# 批處理數(shù)量
a1.sinks.k1.hdfs.batchSize = 10
# 文件格式 表示普通文本文件
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.threadsPoolSize=10
a1.sinks.k1.hdfs.callTimeout=30000
# Use a channel which buffers events in memory
# 定義管道
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
# 組裝組件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
# 啟動(dòng)命令
# flume-ng agent --conf conf --conf-file conf/avro-hdfs_logCollection.conf --name a1 -Dflume.root.logger=INFO,console
6. 啟動(dòng)flume
1) 先啟動(dòng)hadoop01上的flume
2) 啟動(dòng)hadoop002和hadoop003
啟動(dòng)hadoop002
啟動(dòng)hadoop003
3) 查看hadoop001
7. 測(cè)試效果
1) 執(zhí)行腳本文件
在hadoop002和hadoop003上分別克隆三個(gè)會(huì)話,分別執(zhí)行以下三個(gè)腳本,用來(lái)產(chǎn)生生產(chǎn)日志數(shù)據(jù):
while true;do echo "access access ……" >> /var/log/access.log;sleep 5;done
while true;do echo "nginx nginx ……" >> /var/log/nginx.log;sleep 5;done
while true;do echo "web web ……" >> /var/log/web.log;sleep 5;done
在窗口中分別執(zhí)行上述指令后,會(huì)不斷循環(huán)產(chǎn)生數(shù)據(jù),為了后續(xù)更好查看效果,執(zhí)行一會(huì)后就可以直接關(guān)停上述三個(gè)指令。
2) 查看hadoop001結(jié)果
關(guān)閉上述指令后查看hadoop001
3) 查看hadoop集群
文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-755823.html
文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-755823.html
到了這里,關(guān)于Hadoop高手之路8-Flume日志采集的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!