国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Hadoop高手之路8-Flume日志采集

這篇具有很好參考價(jià)值的文章主要介紹了Hadoop高手之路8-Flume日志采集。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

Hadoop高手之路8-Flume日志采集

在大數(shù)據(jù)系統(tǒng)的開(kāi)發(fā)中,數(shù)據(jù)收集工作無(wú)疑是開(kāi)發(fā)者首要解決的一個(gè)難題,但由于生產(chǎn)數(shù)據(jù)的源頭豐富多樣,其中包含網(wǎng)站日志數(shù)據(jù)、后臺(tái)監(jiān)控?cái)?shù)據(jù)、用戶瀏覽網(wǎng)頁(yè)數(shù)據(jù)等,數(shù)據(jù)工程師要想將它們分門(mén)別類的采集到HDFS系統(tǒng)中,就可以使用Apache Flume(數(shù)據(jù)采集)系統(tǒng)。

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

一、Flume概述

1. Flume簡(jiǎn)介

Flume原是Cloudera公司提供的一個(gè)高可用的、高可靠的、分布式海量日志采集、聚合和傳輸系統(tǒng),而后納入到了Apache旗下,作為一個(gè)頂級(jí)開(kāi)源項(xiàng)目。Apache Flume不僅只限于日志數(shù)據(jù)的采集,由于Flume采集的數(shù)據(jù)源是可定制的,因此Flume還可用于傳輸大量事件數(shù)據(jù),包括但不限于網(wǎng)絡(luò)流量數(shù)據(jù)、社交媒體生成的數(shù)據(jù)、電子郵件消息以及幾乎任何可能的數(shù)據(jù)源。

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

2. Flume運(yùn)行機(jī)制

Flume的核心是把數(shù)據(jù)從數(shù)據(jù)源(例如Web服務(wù)器)通過(guò)數(shù)據(jù)采集器(Source)收集過(guò)來(lái),再將收集的數(shù)據(jù)通過(guò)緩沖通道(Channel)匯集到指定的接收器(Sink)。

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

Flume基本架構(gòu)中有一個(gè)Agent(代理),它是Flume的核心角色,F(xiàn)lume Agent是一個(gè)JVM進(jìn)程,它承載著數(shù)據(jù)從外部源流向下一個(gè)目標(biāo)的三個(gè)核心組件:Source、Channel和Sink。

3. Flume日志采集系統(tǒng)結(jié)構(gòu)圖

在實(shí)際開(kāi)發(fā)中, Flume需要采集數(shù)據(jù)的類型多種多樣,同時(shí)還會(huì)進(jìn)行不同的中間操作,所以根據(jù)具體需求,可以將Flume日志采集系統(tǒng)分為簡(jiǎn)單結(jié)構(gòu)和復(fù)雜結(jié)構(gòu)。

簡(jiǎn)單的Flume日志采集系統(tǒng)的結(jié)構(gòu)

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

復(fù)雜的Flume日志采集系統(tǒng)的結(jié)構(gòu)

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

二、Flume的搭建

1. 下載

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

注意:使用1.9.0這個(gè)版本

2. 上傳

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

3. 解壓

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

4. 配置環(huán)境變量

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

5. 配置flume

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

修改flume-env.sh文件
flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)
flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

三、Flume入門(mén)使用

1. 配置數(shù)據(jù)采集方案

1) 查看官網(wǎng)

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

2) 案例需求

flume連接和監(jiān)聽(tīng)服務(wù)器的某個(gè)端口,采集數(shù)據(jù)并顯示

3) 創(chuàng)建新的配置文件

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

4) 復(fù)制官網(wǎng)的采集配置示例,在此基礎(chǔ)上進(jìn)行修改
# example.conf: A single-node Flume configuration

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# Describe the sink
a1.sinks.k1.type = logger

# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

2. 啟動(dòng)flume進(jìn)行采集

啟動(dòng)命令查看官網(wǎng)

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

flume-ng agent --conf conf --conf-file conf/netcat-logger.conf --name a1 -Dflume.root.logger=INFO,console

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

3. 采集數(shù)據(jù)測(cè)試

用telnet向本機(jī)的44444端口發(fā)送數(shù)據(jù),模擬服務(wù)器產(chǎn)生數(shù)據(jù)

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

在使用之前,要先安裝telnet

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

在telnet端發(fā)送數(shù)據(jù)

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

在flume端顯示采集到的數(shù)據(jù)

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

四、Flume采集方案說(shuō)明

1. Flume Sources

在編寫(xiě)Flume采集方案時(shí),首先必須明確采集的數(shù)據(jù)源類型、出處;接著,根據(jù)這些信息與Flume已提供支持的Flume Sources進(jìn)行匹配,選擇對(duì)應(yīng)的數(shù)據(jù)采集器類型(即sources.type);再根據(jù)選擇的數(shù)據(jù)采集器類型,配置必要和非必要的數(shù)據(jù)采集器屬性,F(xiàn)lume提供并支持的Flume Sources種類如下所示。

Avro Source Thrift Source Exec Source
JMS Source Spooling Directory Source Twitter 1% firehose Source
Kafka Source NetCat TCP Source NetCat UDP Source
Sequence Generator Source Syslog TCP Source Multiport Syslog TCP Source
Syslog UDP Source HTTP Source Stress Source
Avro Legacy Source Thrift Legacy Source Custom Source
Scribe Source Taildir Source
1) Avro Source

監(jiān)聽(tīng)Avro端口并從外部Avro客戶端流中接收event數(shù)據(jù),當(dāng)與另一個(gè)Flume Agent上的Avro Sink配對(duì)時(shí),可創(chuàng)建分層集合拓?fù)洌肁vro Source可以實(shí)現(xiàn)多級(jí)流動(dòng)、扇出流、扇入流等效果,Avro Source常用配置屬性如下。

屬性名稱 默認(rèn)值 相關(guān)說(shuō)明
channels
type 組件類型名需必須是avro
bind 要監(jiān)聽(tīng)的主機(jī)名或IP地址
port 要監(jiān)聽(tīng)的服務(wù)端口
threads 要生成的工作線程的最大數(shù)目
ssl false 將此設(shè)置為true以啟用SSL加密,則還必須指定“keystore”和“keystore-password”
keystore SSL所必需的通往Java秘鑰存儲(chǔ)路徑
keystore-password SSL所必需的Java密鑰存儲(chǔ)的密碼
2) Spooling Directory Source

Spooling Directory Source允許對(duì)指定磁盤(pán)上的文件目錄進(jìn)行監(jiān)控來(lái)提取數(shù)據(jù),它將查看文件的指定目錄的新增文件,并將文件中的數(shù)據(jù)讀取出來(lái)。Spooling Directory Source常用配置屬性如下表所示。

屬性名稱 默認(rèn)值 相關(guān)說(shuō)明
channels
type 組件類型名需必須是spooldir
spoolDir 從中讀取文件的目錄
fileSuffix .COMPLETED 附加到完全攝取的文件后綴
deletePolicy never 何時(shí)刪除已完成的文件:never或immediate
fileHeader false 是否添加存儲(chǔ)絕對(duì)路徑文件名的標(biāo)頭
includePattern ^.*$ 正則表達(dá)式,指定要包含的文件
ignorePattern ^$ 正則表達(dá)式指定要忽略的文件
3) Taildir Source

Taildir Source用于觀察指定的文件,幾乎可以實(shí)時(shí)監(jiān)測(cè)到添加到每個(gè)文件的新行。如果文件正在寫(xiě)入新行,則此采集器將重試采集它們以等待寫(xiě)入完成,Source常用配置屬性如下所示。

屬性名稱 默認(rèn)值 相關(guān)說(shuō)明
channels
type 組件類型名需必須是TAILDIR
filegroups 以空格分隔的文件組列表
filegroups.<filegroupName> 文件組的絕對(duì)路徑
idleTimeout 120000 關(guān)閉非活動(dòng)文件的時(shí)間(毫秒)
writePosInterval 3000 寫(xiě)入位置文件上每個(gè)文件的最后位置的間隔時(shí)間
batchSize 100 一次讀取和發(fā)送到通道的最大行數(shù)
backoffSleepIncrement 1000 當(dāng)最后一次嘗試未找到任何新數(shù)據(jù)時(shí),每次重新嘗試輪詢新數(shù)據(jù)之間的最大時(shí)間延遲
fileHeader false 是否添加存儲(chǔ)絕對(duì)路徑文件名的標(biāo)頭
fileHeaderKey file 將絕對(duì)路徑文件名附加到event header時(shí)使用的header關(guān)鍵字
channels
type 組件類型名需必須是http
port 采集源要綁定的端口
bind 0.0.0.0 要監(jiān)聽(tīng)綁定的主機(jī)名或IP地址
handler org.apache.flume.source.http.JSONHandler handler類的全路徑名
4) HTTP Source

HTTP Source可以通過(guò)HTTP POST和GET請(qǐng)求方式接收event數(shù)據(jù),GET通常只能用于測(cè)試使用,POST請(qǐng)求發(fā)送的所有的events都被認(rèn)為是一個(gè)批次,會(huì)在一個(gè)事務(wù)中插入channel,HTTP Source常用配置屬性如下所示。

屬性名稱 默認(rèn)值 相關(guān)說(shuō)明
channels
type 組件類型名需必須是http
port 采集源要綁定的端口
bind 0.0.0.0 要監(jiān)聽(tīng)綁定的主機(jī)名或IP地址
handler org.apache.flume.source.http.JSONHandler handler類的全路徑名

2. Flume Channels

? Channels通道是event在Agent上暫存的存儲(chǔ)庫(kù),Source向Channel中添加event,Sink在讀取完數(shù)據(jù)后再刪除它。在配置Channels時(shí),需要明確的就是將要傳輸?shù)膕ources數(shù)據(jù)源類型;根據(jù)這些信息結(jié)合開(kāi)發(fā)中的實(shí)際需求,選擇Flume已提供的支持的Flume Channels;再根據(jù)選擇的Channel類型,配置必要和非必要的Channel屬性,F(xiàn)lume提供并支持的Flume Channels種類如下所示。

Memory Channel JDBC Channel Kafka Channel
File Channel Spillable Memory Channel Pseudo Transaction Channel
Custom Channel
1) Memory Channel

Memory Channel會(huì)將event存儲(chǔ)在具有可配置最大尺寸的內(nèi)存隊(duì)列中,適用于需要更高吞吐量的流量,但在Agent發(fā)生故障時(shí)會(huì)丟失部分階段數(shù)據(jù),下表為Memory Channel常用配置屬性。

屬性名稱 默認(rèn)值 相關(guān)說(shuō)明
type 組件類型名需必須是memory
capacity 100 存儲(chǔ)在channel中的最大event數(shù)
transactionCapacity 100 channel從source接收或向sink傳遞的每個(gè)事務(wù)中最大event數(shù)
keep-alive 3 添加或刪除event的超時(shí)時(shí)間(秒)
byteCapacityBufferPercentage 20 定義byteCapacity與channel中所有event所占百分比
byteCapacity 等于JVM可用的最大內(nèi)存的80% 允許此channel中所有event的的最大內(nèi)存字節(jié)數(shù)總和
2) File Channel

File Channel是Flume的持久通道,它將所有event寫(xiě)入磁盤(pán),因此不會(huì)丟失進(jìn)程或機(jī)器關(guān)機(jī)、崩潰時(shí)的數(shù)據(jù)。File Channel通過(guò)在一次事務(wù)中提交多個(gè)event來(lái)提高吞吐量,做到了只要事務(wù)被提交,那么數(shù)據(jù)就不會(huì)有丟失,F(xiàn)ile Channel常用配置屬性如下所示。

屬性名稱 默認(rèn)值 相關(guān)說(shuō)明
type 組件類型名需必須是file
checkpointDir 1~/.flume/file-channel/checkpoint 檢測(cè)點(diǎn)文件所存儲(chǔ)的目錄
useDualCheckpoints false 備份檢測(cè)點(diǎn)如果設(shè)置為true,backupChec kpointDir必須設(shè)置
backupCheckpointDir 備份檢查點(diǎn)目錄。此目錄不能與數(shù)據(jù)目錄或檢查點(diǎn)目錄相同
dataDirs ~/.flume/file-channel/data 數(shù)據(jù)存儲(chǔ)所在的目錄設(shè)置
transactionCapacity 10000 事務(wù)容量的最大值設(shè)置
checkpointInterval 30000 檢測(cè)點(diǎn)之間的時(shí)間值設(shè)置(單位微秒)
maxFileSize 2146435071 一個(gè)單一日志的最大值設(shè)置(以字節(jié)為單位)
capacity 1000000 channel的最大容量
transactionCapacity 10000 事務(wù)容量的最大值設(shè)置

3. Flume Sinks

Flume Soures采集的數(shù)據(jù)通過(guò)Channels通道流向Sink中,此時(shí)Sink類似一個(gè)集結(jié)的遞進(jìn)中心,它需要根據(jù)需求進(jìn)行配置,從而最終選擇發(fā)送目的地。配置Sinks時(shí),明確將要傳輸?shù)臄?shù)據(jù)目的地、結(jié)果類型;然后根據(jù)實(shí)際需求信息,選擇Flume已提供支持的Flume Sinks;再根據(jù)選擇的Sinks類型,配置必要和非必要的Sinks屬性。Flume提供并支持的Flume Sinks種類如下所示。

HDFS Sink Hive Sink Logger Sink
Avro Sink Thrift Sink IRC Sink
File Roll Sink Null Sink HBaseSink
AsyncHBase Sink MorphlineSolr Sink ElasticSearch Sink
Kite Dataset Sink Kafka Sink HTTP Sink
Custom Sink
1) HDFS Sink

HDFS Sink將event寫(xiě)入Hadoop分布式文件系統(tǒng)(HDFS),它目前支持創(chuàng)建文本和序列文件,以及兩種類型的壓縮文件,下表為HDFS Sink常用配置屬性。

屬性名稱 默認(rèn)值 相關(guān)說(shuō)明
channel
type 組件類型名需必須是hdfs
hdfs.path HDFS目錄路徑
hdfs.filePrefix FlumeData 為在hdfs目錄中由Flume創(chuàng)建的文件指定前綴
hdfs.round false 是否應(yīng)將時(shí)間戳向下舍入
hdfs.roundValue 1 舍入到此最高倍數(shù),小于當(dāng)前時(shí)間
hdfs.roundUnit second 舍入值的單位 - 秒、分鐘或小時(shí)
hdfs.rollInterval 30 滾動(dòng)當(dāng)前文件之前等待的秒數(shù)
hdfs.rollSize 1024 觸發(fā)滾動(dòng)的文件大小,以字節(jié)為單位
hdfs.rollCount 10 在滾動(dòng)之前寫(xiě)入文件的事件數(shù)
2) Logger Sink

Logger Sink用于記錄INFO級(jí)別event,它通常用于調(diào)試。Logger Sink接收器的不同之處是它不需要在“記錄原始數(shù)據(jù)”部分中說(shuō)明額外的配置,Logger Sink常用配置屬性如下所示。

屬性名稱 默認(rèn)值 相關(guān)說(shuō)明
channel
type 組件類型名需必須是logger
maxBytesToLog 16 要記錄的event body的最大字節(jié)數(shù)
3) Avro Sink

Avro Sink形成Flume分層收集支持的一半,發(fā)送到此接收器的Flume event轉(zhuǎn)換為Avro event,并發(fā)送到對(duì)應(yīng)配置的主機(jī)名/端口,event將從配置的channel中批量獲取配置的批處理大小,Avro Sink常用配置屬性如下所示。

屬性名稱 默認(rèn)值 相關(guān)說(shuō)明
channel
type 組件類型名需必須是avro
hostname 要監(jiān)聽(tīng)的主機(jī)名或IP地址
port 要監(jiān)聽(tīng)的服務(wù)端口
batch-size 100 要一起批量發(fā)送的event數(shù)
connect-timeout 20000 允許第一次(握手)請(qǐng)求的時(shí)間量(ms)
request-timeout 20000 在第一個(gè)之后允許請(qǐng)求的時(shí)間量(ms)

五、Flume采集數(shù)據(jù)案例一——監(jiān)控文件夾的變化

1. 需求

監(jiān)控服務(wù)器下的某個(gè)文件夾(日志輸出文件夾),如果該文件夾下有新的文件,則采集該文件的內(nèi)容上傳到hadoop集群上。

2. 編寫(xiě)數(shù)據(jù)采集配置方案

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

source的官網(wǎng)示例

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

sink的官網(wǎng)示例

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

# example.conf: A single-node Flume configuration

# Name the components on this agent
# 定義代理的名字及各個(gè)組件sources、sinks、channels
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
# 配置數(shù)據(jù)源
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /var/log/flumeSpool
a1.sources.r1.fileHeader = true

# Describe the sink
# 描述數(shù)據(jù)的目的地(下沉)
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/events/%Y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute

# Use a channel which buffers events in memory
# 定義管道
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
# 組裝各個(gè)組件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

# 啟動(dòng)命令
# flume-ng agent --conf conf --conf-file conf/spoolingdir-hdfs.conf --name a1 -Dflume.root.logger=INFO,console

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

3. 啟動(dòng)hdfs集群

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

4. 啟動(dòng)flume

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

出現(xiàn)錯(cuò)誤,提示監(jiān)控目錄不存在,需要提前創(chuàng)建

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

再次啟動(dòng)flume

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

5. 測(cè)試flume數(shù)據(jù)采集

在/var/log/flumeSpool文件夾下添加一個(gè)新文件

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

出現(xiàn)錯(cuò)誤,修改采集方案

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

再次啟動(dòng)flume測(cè)試

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

提示錯(cuò)誤

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

用hadoop的guava新版本替換flume的舊版本

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

再次啟動(dòng)flume,然后在/var/log/flumeSpool文件夾下添加一個(gè)新文件

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

在hdfs集群上查看

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

解決亂碼,修改文件:

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

# example.conf: A single-node Flume configuration

# Name the components on this agent
# 定義代理的名字及各個(gè)組件sources、sinks、channels
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
# 配置數(shù)據(jù)源
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /var/log/flumeSpool
a1.sources.r1.fileHeader = true

# Describe the sink
# 描述數(shù)據(jù)的目的地(下沉)
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/events/%Y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
a1.sinks.k1.hdfs.useLocalTimeStamp = true

a1.sinks.k1.hdfs.rollInterval = 3
a1.sinks.k1.hdfs.rollSize = 20
a1.sinks.k1.hdfs.rollCount = 5
a1.sinks.k1.hdfs.batchSize = 1
a1.sinks.k1.hdfs.fileType = DataStream

# Use a channel which buffers events in memory
# 定義管道
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
# 組裝各個(gè)組件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

# 啟動(dòng)命令
# flume-ng agent --conf conf --conf-file conf/spoolingdir-hdfs.conf --name a1 -Dflume.root.logger=INFO,console

重新啟動(dòng)flume,在創(chuàng)建一個(gè)新的文件

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

這樣結(jié)果就可以了。

六、Flume采集數(shù)據(jù)案例二——監(jiān)控文件的變化

1.采集方案

數(shù)據(jù)源

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

數(shù)據(jù)下沉不要修改,采集配置方案如下:

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

配置代碼:

# example.conf: A single-node Flume configuration

# Name the components on this agent
# 定義代理的名字a1及各個(gè)組件sources、sinks和channels

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
# 定義數(shù)據(jù)源
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/text.log


# Describe the sink
# 定義數(shù)據(jù)的目的地(下沉)
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
# 是否循環(huán)創(chuàng)建文件夾
a1.sinks.k1.hdfs.round = true
# 循環(huán)創(chuàng)建文件夾的時(shí)間間隔是10分鐘
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
# 使用本地時(shí)間格式
a1.sinks.k1.hdfs.useLocalTimeStamp = true
# 時(shí)間間隔
a1.sinks.k1.hdfs.rollInterval = 3
# 大小間隔
a1.sinks.k1.hdfs.rollSize = 20
# event的個(gè)數(shù),這三個(gè)參數(shù)誰(shuí)先滿足就出發(fā)循環(huán)滾動(dòng)
a1.sinks.k1.hdfs.rollCount = 5
# 批處理數(shù)量
a1.sinks.k1.hdfs.batchSize = 1
# 文件格式,表示普通文本格式
a1.sinks.k1.hdfs.fileType = DataStream 

# Use a channel which buffers events in memory
# 定義管道
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
# 組裝組件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

# 啟動(dòng)命令
# flume-ng agent --conf conf --conf-file conf/exec-hdfs.conf --name a1 -Dflume.root.logger=INFO,console


2. 測(cè)試采集功能

模擬場(chǎng)景
先寫(xiě)一個(gè)shell腳本,持續(xù)輸出當(dāng)前日期到監(jiān)控文件/var/log/test.log中,模擬服務(wù)器日志的文件

while true;do date >> /var/log/text.log;done

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

3. 再克隆一個(gè)會(huì)話,查看新增的內(nèi)容

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

4. 啟動(dòng)flume

flume-ng agent --conf conf --conf-file conf/exec-hdfs.conf --name a1 -Dflume.root.logger=INFO,console

5. 查看HDFS上的結(jié)果

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

七、Flume的可靠性保證-負(fù)載均衡

1. 概念

配置的采集方案是通過(guò)唯一一個(gè)Sink作為接收器接收后續(xù)需要的數(shù)據(jù),但會(huì)出現(xiàn)當(dāng)前Sink故障或數(shù)據(jù)收集請(qǐng)求量較大的情況,這時(shí)單一Sink配置可能就無(wú)法保證Flume開(kāi)發(fā)的可靠性。因此,F(xiàn)lume 提供Flume Sink Processors解決上述問(wèn)題。

Sink處理器允許定義Sink groups,將多個(gè)sink分組到一個(gè)實(shí)體中,Sink處理器就可通過(guò)組內(nèi)多個(gè)sink為服務(wù)提供負(fù)載均衡功能。

負(fù)載均衡接收器處理器(Load balancing sink processor)提供了在多個(gè)sink上進(jìn)行負(fù)載均衡流量的功能,它維護(hù)一個(gè)活躍的sink索引列表,需在其上分配負(fù)載,還支持round_robin(輪詢)和random(隨機(jī))選擇機(jī)制進(jìn)行流量分配,默認(rèn)選擇機(jī)制為round_robin。Load balancing sink processor提供的配置屬性如下所示。

屬性名稱 默認(rèn)值 相關(guān)說(shuō)明
sinks 以空格分隔的參與sink組的sink列表
processor.type default 組件類型名需必須是load_balance
processor.backoff false 設(shè)置失敗的sink進(jìn)入黑名單
processor.selector round_robin 選擇機(jī)制
processor.selector.maxTimeOut 30000 失敗sink放置在黑名單的超時(shí)時(shí)間

2. 搭建并配置flume集群

三臺(tái)服務(wù)器的flume集群:hadoop01、hadoop02、hadoop03

1) 分發(fā)hadoop01上的flume文件到hadoop02和hadoop03上

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

2) 分發(fā)環(huán)境變量配置文件

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

3) 使環(huán)境變量起作用

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

3. 配置采集方案

兩級(jí)配置方案

查看官方文檔的示例

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

1) 在hadoop001上配置第一級(jí)采集方案

在conf下編寫(xiě)采集方案exec-avro.conf

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

代碼如下:

# example.conf: A single-node Flume configuration

# Name the components on this agent
# 定義代理的名字a1及各個(gè)組件sources、sinks和channels

a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2

#定義組的屬性
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance

#定義負(fù)載均衡
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = random
a1.sinkgroups.g1.processor.selector.maxTimeOut = 30000


# Describe/configure the source
# 定義數(shù)據(jù)源
a1.sources.r1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/test1.log

# Describe the sink
# 定義數(shù)據(jù)的目的地1(下沉)
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = hadoop002
a1.sinks.k1.port = 4545

# 定義數(shù)據(jù)的目的地2(下沉)
a1.sinks.k2.type = avro
a1.sinks.k2.channel = c1
a1.sinks.k2.hostname = hadoop003
a1.sinks.k2.port = 4545


# Use a channel which buffers events in memory
# 定義管道
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 啟動(dòng)命令
# flume-ng agent --conf conf --conf-file conf/exec-avro.conf --name a1 -Dflume.root.logger=INFO,console
2)在hadoop002和hadoop003上配置第二級(jí)采集方案

hadoop002

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

# example.conf: A single-node Flume configuration

# Name the components on this agent
# 定義代理的名字a1及各個(gè)組件sources、sinks和channels

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
# 定義數(shù)據(jù)源
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop002
a1.sources.r1.port = 4545
# Describe the sink

# 定義數(shù)據(jù)的目的地(下沉)
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
# 是否循環(huán)創(chuàng)建文件夾
a1.sinks.k1.hdfs.round = true
# 循環(huán)創(chuàng)建文件夾的時(shí)間間隔是十分鐘
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
# 使用本地時(shí)間個(gè)數(shù)
a1.sinks.k1.hdfs.useLocalTimeStamp = true
// 列編輯模式,按住alt選擇多列

# 時(shí)間間隔
a1.sinks.k1.hdfs.rollInterval = 3
# 大小間隔
a1.sinks.k1.hdfs.rollSize = 20
# event的個(gè)數(shù),這三個(gè)參數(shù)誰(shuí)先滿足就出發(fā)循環(huán)滾動(dòng)
a1.sinks.k1.hdfs.rollCount = 5
# 批處理數(shù)量
a1.sinks.k1.hdfs.batchSize = 1 
# 文件格式 表示普通文本文件
a1.sinks.k1.hdfs.fileType = DataStream


# Use a channel which buffers events in memory
# 定義管道
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
# 組裝組件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

# 啟動(dòng)命令
# flume-ng agent --conf conf --conf-file conf/avro-hdfs.conf --name a1 -Dflume.root.logger=INFO,console
# flume-ng agent --conf conf --conf-file avro-hdfs.conf --name a1 -Dflume.root.logger=INFO,console


hadoop003

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

# example.conf: A single-node Flume configuration

# Name the components on this agent
# 定義代理的名字a1及各個(gè)組件sources、sinks和channels

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
# 定義數(shù)據(jù)源
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop003
a1.sources.r1.port = 4545
# Describe the sink

# 定義數(shù)據(jù)的目的地(下沉)
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
# 是否循環(huán)創(chuàng)建文件夾
a1.sinks.k1.hdfs.round = true
# 循環(huán)創(chuàng)建文件夾的時(shí)間間隔是十分鐘
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
# 使用本地時(shí)間個(gè)數(shù)
a1.sinks.k1.hdfs.useLocalTimeStamp = true
// 列編輯模式,按住alt選擇多列

# 時(shí)間間隔
a1.sinks.k1.hdfs.rollInterval = 3
# 大小間隔
a1.sinks.k1.hdfs.rollSize = 20
# event的個(gè)數(shù),這三個(gè)參數(shù)誰(shuí)先滿足就出發(fā)循環(huán)滾動(dòng)
a1.sinks.k1.hdfs.rollCount = 5
# 批處理數(shù)量
a1.sinks.k1.hdfs.batchSize = 1 
# 文件格式 表示普通文本文件
a1.sinks.k1.hdfs.fileType = DataStream


# Use a channel which buffers events in memory
# 定義管道
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
# 組裝組件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

# 啟動(dòng)命令
# flume-ng agent --conf conf --conf-file conf/avro-hdfs.conf --name a1 -Dflume.root.logger=INFO,console
# flume-ng agent --conf conf --conf-file avro-hdfs.conf --name a1 -Dflume.root.logger=INFO,console


4. 啟動(dòng)flume

1) 在hadoop002和hadoop003上flume

從最后一級(jí)開(kāi)始啟動(dòng)flume
hadoop002

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

hadoop003

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

2) 在hadoop001上啟動(dòng)flume

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

同時(shí)在hadoop002和hadoop003上也提示連接成功

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

5. 負(fù)載均衡的測(cè)試

1) 克隆hadoop001的會(huì)話,編寫(xiě)腳本并運(yùn)行
while true;do date >> /var/log/test1.log;sleep 5;done

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

2) 克隆hadoop001的會(huì)話,查看文件變化
tail -F /var/log/test1.log

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

6. 查看結(jié)果

1)hadoop002

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

2)hadoop003

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

3) 在hadoop集群上

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

八、Flume的可靠性保證-故障恢復(fù)

1. 概念

故障轉(zhuǎn)移接收器處理器(Failover Sink Processor)維護(hù)一個(gè)具有優(yōu)先級(jí)的sink列表,保證在處理event時(shí),只需有一個(gè)可用的sink即可。

故障轉(zhuǎn)移機(jī)制工作原理是將故障的sink降級(jí)到故障池中,在池中為它們分配一個(gè)冷卻期,在重試之前冷卻時(shí)間會(huì)增加,當(dāng)sink成功發(fā)送event后,它將恢復(fù)到活躍池中。Failover Sink Processor提供的配置屬性如下所示。

屬性名稱 默認(rèn)值 相關(guān)說(shuō)明
sinks 以空格分隔的參與sink組的sink列表
processor.type default 組件類型名需必須是failover
processor.priority. 設(shè)置sink的優(yōu)先級(jí)取值
processor.maxpenalty 30000 失敗sink的最大退避時(shí)間

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

2. 配置采集方案

只需要改動(dòng)第一級(jí)采集方案部分內(nèi)容
flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

改為故障恢復(fù):

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

代碼如下:

# example.conf: A single-node Flume configuration

# Name the components on this agent
# 定義代理的名字a1及各個(gè)組件sources、sinks和channels

a1.sources = r1
a1.sinks = k1 k2
a1.channels = c1 c2

#定義組的屬性
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1 = 5
a1.sinkgroups.g1.processor.priority.k2 = 10
a1.sinkgroups.g1.processor.maxpenalty = 10000


# Describe/configure the source
# 定義數(shù)據(jù)源
a1.sources.r1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/test1.log

# Describe the sink
# 定義數(shù)據(jù)的目的地1(下沉)
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = hadoop002
a1.sinks.k1.port = 4545

# 定義數(shù)據(jù)的目的地2(下沉)
a1.sinks.k2.type = avro
a1.sinks.k2.channel = c1
a1.sinks.k2.hostname = hadoop003
a1.sinks.k2.port = 4545


# Use a channel which buffers events in memory
# 定義管道
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 啟動(dòng)命令
# flume-ng agent --conf conf --conf-file conf/exec-avro-failover.conf --name a1 -Dflume.root.logger=INFO,console


注意:?jiǎn)?dòng)命令也要做對(duì)應(yīng)的修改。

第二級(jí)采集方案不用修改。

3. 啟動(dòng)flume

從最后一級(jí)開(kāi)始啟動(dòng)flume。

啟動(dòng)hadoop002和hadoop003:

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

由于hadoop003的優(yōu)先級(jí)比hadoop002高,所以hadoop003先工作,hadoop002處于待機(jī)狀態(tài)。

啟動(dòng)hadoop001

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

4. 啟動(dòng)測(cè)試腳本

克隆hadoop001的會(huì)話,運(yùn)行測(cè)試腳本。

while true;do date >> /var/log/test1.log;sleep 5;done

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

5. 故障恢復(fù)

把hadoop03的flume關(guān)閉,等待十秒鐘(超時(shí)時(shí)間,在采集方案中進(jìn)行的定義)

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

查看hadoop集群

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

九、Flume攔截器

1. 概念

Flume Interceptors(攔截器)用于對(duì)Flume系統(tǒng)數(shù)據(jù)流中event的修改操作。使用Flume攔截器時(shí),只需參考官方配置屬性在采集方案中選擇性的配置即可,當(dāng)涉及到配置多個(gè)攔截器時(shí),攔截器名稱間需用空格分隔,且攔截器配置順序就是攔截順序。Flume 1.8.0版本中,F(xiàn)lume提供并支持的攔截器有很多,具體如下所示。

Timestamp Interceptor Host Interceptor Static Interceptor
Remove Header Interceptor UUID Interceptor Morphline Interceptor
Search and Replace Interceptor Regex Filtering Interceptor Regex Extractor Interceptor
1) Timestamp Interceptor

Timestamp Interceptor(時(shí)間戳攔截器)將流程執(zhí)行時(shí)間插入到event的header頭部,此攔截器插入帶有timestamp鍵的標(biāo)頭,其值為對(duì)應(yīng)時(shí)間戳。若配置中已存在時(shí)間戳?xí)r,此攔截器可保留現(xiàn)有時(shí)間戳,Timestamp Interceptor提供的常用配置屬性如下所示。

屬性名稱 默認(rèn)值 相關(guān)說(shuō)明
type 組件類型名需必須是timestamp
header timestamp 用于放置生成的時(shí)間戳的標(biāo)頭的名稱
preserveExisting false 如果時(shí)間戳已存在,是否應(yīng)保留, true或false
2) Static Interceptor

Static Interceptor(靜態(tài)攔截器)允許用戶將具有靜態(tài)值的靜態(tài)頭附加到所有event。當(dāng)前不支持一次指定多個(gè)header頭,但是用戶可定義多個(gè)Static Interceptor來(lái)為每一個(gè)攔截器都追加一個(gè)header,Static Interceptor提供的常用配置屬性如下所示。

屬性名稱 默認(rèn)值 相關(guān)說(shuō)明
type 組件類型名需必須是static
preserveExisting true 如果配置的header已存在,是否應(yīng)保留
key key 應(yīng)創(chuàng)建的header的名稱
value value 應(yīng)創(chuàng)建的header對(duì)應(yīng)的靜態(tài)值

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

2. 場(chǎng)景

在實(shí)際開(kāi)發(fā)的應(yīng)用場(chǎng)景中,兩臺(tái)服務(wù)器A、B在實(shí)時(shí)產(chǎn)生日志數(shù)據(jù),日志類型主要為access.log、nginx.log和web.log?,F(xiàn)需要將A、B兩臺(tái)服務(wù)器產(chǎn)生的日志數(shù)據(jù)access.log、nginx.log和web.log采集匯總到服務(wù)器C上,并統(tǒng)一收集并上傳到HDFS文件系統(tǒng)進(jìn)行保存。在HDFS中保存日志數(shù)據(jù)的文件必須按照以下要求進(jìn)行歸類統(tǒng)計(jì)(20180723表示收集日志數(shù)據(jù)的當(dāng)前日期):

  • /source/logs/access/20180723/**
  • /source/logs/nginx/20180723/**
  • /source/logs/web/20180723/**

3. 日志數(shù)據(jù)采集流程圖

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

根據(jù)案例需求啟動(dòng)3臺(tái)服務(wù)器,同時(shí)搭建Flume系統(tǒng)和Hadoop集群。此案例將hadoop002和hadoop003分別作為A服務(wù)器和B服務(wù)器進(jìn)行第一階段的日志數(shù)據(jù)采集,將hadoop001作為C服務(wù)器進(jìn)行日志數(shù)據(jù)匯總并上傳至HDFS。

4. hadoop002和hadoop003的配置文件

在hadoop002和hadoop003的conf目錄下編寫(xiě)相同日志采集方案exec-avro_logCollection.conf。

# example.conf: A single-node Flume configuration

# Name the components on this agent
# 定義代理的名字a1及各個(gè)組件sources、sinks和channels

a1.sources = r1 r2 r3
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
# 定義r1數(shù)據(jù)源

a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/access.log

# 定義攔截器r1
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = type
a1.sources.r1.interceptors.i1.value = access

# 定義r2數(shù)據(jù)源

a1.sources.r2.type = exec
a1.sources.r2.command = tail -F /var/log/nginx.log

# 定義攔截器r2
a1.sources.r2.interceptors = i2
a1.sources.r2.interceptors.i2.type = static
a1.sources.r2.interceptors.i2.key = type
a1.sources.r2.interceptors.i2.value = nginx

# 定義r3數(shù)據(jù)源

a1.sources.r3.type = exec
a1.sources.r3.command = tail -F /var/log/web.log

# 定義攔截器r3
a1.sources.r3.interceptors = i3
a1.sources.r3.interceptors.i3.type = static
a1.sources.r3.interceptors.i3.key = type
a1.sources.r3.interceptors.i3.value = web

# Describe the sink
# 定義數(shù)據(jù)的目的地(下沉)
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop001
a1.sinks.k1.port = 41414



# Use a channel which buffers events in memory
# 定義管道
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
# 組裝組件
a1.sources.r1.channels = c1
a1.sources.r2.channels = c1
a1.sources.r3.channels = c1
a1.sinks.k1.channel = c1

# 啟動(dòng)命令
# flume-ng agent --conf conf --conf-file conf/exec-avro_logCollection.conf --name a1 -Dflume.root.logger=INFO,console

在hadoop002上創(chuàng)建

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

在hadoop003上創(chuàng)建

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

5. hadoop001的配置文件

在hadoop001機(jī)器的conf目錄下編寫(xiě)第二級(jí)日志采集方案avro-hdfs_logCollection.conf

# Name the components on this agent
# 定義代理的名字a1及各個(gè)組件sources、sinks和channels

a1.sources = r1
a1.sinks = k1
a1.channels = c1

# Describe/configure the source
# 定義數(shù)據(jù)源
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop001
a1.sources.r1.port = 41414

#定義攔截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp

# Describe the sink
# 定義數(shù)據(jù)的目的地(下沉)

a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = /flume/logs/%{type}/%y%m%d
a1.sinks.k1.hdfs.filePrefix = events-
# 是否循環(huán)創(chuàng)建文件夾
a1.sinks.k1.hdfs.round = true
# 循環(huán)創(chuàng)建文件夾的時(shí)間間隔是十分鐘
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
# 使用本地時(shí)間個(gè)數(shù)
a1.sinks.k1.hdfs.useLocalTimeStamp = true
// 列編輯模式,按住alt選擇多列

# 時(shí)間間隔
a1.sinks.k1.hdfs.rollInterval = 0
# 大小間隔
a1.sinks.k1.hdfs.rollSize = 10485760
# event的個(gè)數(shù),這三個(gè)參數(shù)誰(shuí)先滿足就出發(fā)循環(huán)滾動(dòng)
a1.sinks.k1.hdfs.rollCount = 0
# 批處理數(shù)量
a1.sinks.k1.hdfs.batchSize = 10 
# 文件格式 表示普通文本文件
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text
a1.sinks.k1.hdfs.threadsPoolSize=10
a1.sinks.k1.hdfs.callTimeout=30000


# Use a channel which buffers events in memory
# 定義管道
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# Bind the source and sink to the channel
# 組裝組件
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

# 啟動(dòng)命令
# flume-ng agent --conf conf --conf-file conf/avro-hdfs_logCollection.conf --name a1 -Dflume.root.logger=INFO,console

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

6. 啟動(dòng)flume

1) 先啟動(dòng)hadoop01上的flume

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

2) 啟動(dòng)hadoop002和hadoop003

啟動(dòng)hadoop002

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

啟動(dòng)hadoop003

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

3) 查看hadoop001

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

7. 測(cè)試效果

1) 執(zhí)行腳本文件

在hadoop002和hadoop003上分別克隆三個(gè)會(huì)話,分別執(zhí)行以下三個(gè)腳本,用來(lái)產(chǎn)生生產(chǎn)日志數(shù)據(jù):

while true;do echo "access access ……"  >> /var/log/access.log;sleep 5;done
while true;do echo "nginx nginx ……"  >> /var/log/nginx.log;sleep 5;done
while true;do echo "web web ……"  >> /var/log/web.log;sleep 5;done

在窗口中分別執(zhí)行上述指令后,會(huì)不斷循環(huán)產(chǎn)生數(shù)據(jù),為了后續(xù)更好查看效果,執(zhí)行一會(huì)后就可以直接關(guān)停上述三個(gè)指令。

2) 查看hadoop001結(jié)果

關(guān)閉上述指令后查看hadoop001

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

3) 查看hadoop集群

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)

flume采集日志,Hadoop高手之路,hadoop,flume,大數(shù)據(jù)文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-755823.html

到了這里,關(guān)于Hadoop高手之路8-Flume日志采集的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包