国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

大數(shù)據(jù)技術(shù)之Hadoop(八)——Flume日志采集系統(tǒng)

這篇具有很好參考價值的文章主要介紹了大數(shù)據(jù)技術(shù)之Hadoop(八)——Flume日志采集系統(tǒng)。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報違法"按鈕提交疑問。

目錄

素材

一、Flume的概述

1、Flume的認(rèn)識

2、Flume的運(yùn)行機(jī)制

(1)Source(數(shù)據(jù)采集器)

(2)Channel(緩沖通道)

(3)Sink(接收器)

3、Flume的日志采集系統(tǒng)結(jié)構(gòu)

(1)簡單結(jié)構(gòu)

(2)復(fù)雜結(jié)構(gòu)

二、Flume的基本使用

1、系統(tǒng)要求

2、Flume安裝

(1)下載Flume

(2)解壓

(3)重命名

(4)配置Flume環(huán)境

3、Flume的入門使用

(1)配置Flume采集方案

(2)指定采集方案啟動Flume

(3)Flume數(shù)據(jù)采集測試

三、Flume采集方案配置說明

1、Flume Source

(1)Avro Source

?(2)Spooling Directory Source

(3)Taildir Source

(4)HTTP Source

2、Flume channel

(1)Memory Channel

(2)File channel

3、Flume Sinks

(1)HDFS Sink

(2)Logger Sink

(3)Avro Sink

四、Flume的可靠性保證

1、負(fù)載均衡

(1)搭建并配置Flume機(jī)器

(2)配置Flume采集方案

a、exec-avro.conf

b、netcat-logger.conf

(2)啟動Flume系統(tǒng)

(3)Flume系統(tǒng)負(fù)載均衡測試

2、故障轉(zhuǎn)移

(1)配置Flume采集方案

a、avro-logger-memory.conf

?b、exec-avro-failover.conf

?(2)啟動Flume系統(tǒng)

(3)Flume系統(tǒng)故障轉(zhuǎn)移測試

五、Flume攔截器

1、Timestamp interceptor

2、 Static interceptor

3、Search and Replace Interceptor

六、案例——日志采集

1、配置采集方案

(1)exec-avro_logCollection.conf?

(2)avro-hdfs_logCollection.conf

2、啟動hadoop集群

3、啟動Flume系統(tǒng)

4、 日志采集系統(tǒng)測試

參考書籍


素材

http://鏈接: https://pan.baidu.com/s/19cSqan67QhB_x3vdnXsANQ?pwd=fh35 提取碼: fh35http://xn--gzu811i//pan.baidu.com/s/19cSqan67QhB_x3vdnXsANQ?pwd=fh35%20%E6%8F%90%E5%8F%96%E7%A0%81:%20fh35

一、Flume的概述

1、Flume的認(rèn)識

??????? Flume原是Cloudera公司提供的一個高可用的、高可靠的、分布式海量日志采集、聚合和傳輸系統(tǒng),而后納人到了Apache旗下,作為一個頂級開源項(xiàng)目。Apache Flume不僅只限日志數(shù)據(jù)的采集,由于Flume 采集的數(shù)據(jù)源是可定制的,因此Flume 還可用于傳輸大量事件數(shù)據(jù),包括但不限于網(wǎng)絡(luò)流量數(shù)據(jù)、社交媒體生成的數(shù)據(jù)、電子郵件消息以及幾乎任可能的數(shù)據(jù)源。

????????當(dāng)前 Flume 分為兩個版本:Flume 0.9x版本,統(tǒng)稱Flume-og(original generation)和 Flume 1.x版本,統(tǒng)稱Flumeng(nextgeneration)。由于早期的Flume-og存在設(shè)計(jì)不合理、代碼臃腫、不易擴(kuò)展等問題,因此在Flume納入到Apache旗下后,開發(fā)人員對Clouden Flume 的代碼進(jìn)行了重構(gòu),同時對 Flume 功能進(jìn)行了補(bǔ)充和加強(qiáng),并重命名為 Apache Flume于是就出現(xiàn)了 Flume-ng 與Flume-og兩種截然不同的版本。而在實(shí)際開發(fā)中,多數(shù)使用目前比較流行的Flumeng 版本進(jìn)行 Flume 開發(fā)。

2、Flume的運(yùn)行機(jī)制

??????? Flume的核心是把數(shù)據(jù)從數(shù)據(jù)源(Web Server)通過數(shù)據(jù)采集器(Source)收集過來,再將收集的數(shù)據(jù)通過緩沖通道(Channel)匯集到指定的接收器(Sink)。可參考下面官方文檔圖片。

大數(shù)據(jù)技術(shù)之Hadoop(八)——Flume日志采集系統(tǒng)

????????Flume 基本架構(gòu)中有一個 Agent(代理),它是Flume 的核心角色,meAgent是一個JVM進(jìn)程,它承載著數(shù)據(jù)從外部源流向下一個目標(biāo)的3個核心組件Source、Channel、Sink。

(1)Source(數(shù)據(jù)采集器)

????????用于源數(shù)據(jù)的采集,從一個Web服務(wù)器采集源數(shù)據(jù),然后將采集到的數(shù)據(jù)寫入到 Channel 中并流向 Sink;

(2)Channel(緩沖通道)

????????底層是一個緩沖隊(duì)列,對Source中的數(shù)據(jù)進(jìn)行緩存,將數(shù)據(jù)高效,準(zhǔn)確地寫人 Sink,待數(shù)據(jù)全部到達(dá) Sink后,F(xiàn)lume 就會刪除該緩存通道中的數(shù)據(jù);

(3)Sink(接收器)

????????接收并匯集流向 Sink 所有數(shù)據(jù),根據(jù)需求,可以直接進(jìn)行集中式存儲(采用HDFS進(jìn)行存儲),也可以繼續(xù)作為數(shù)據(jù)源傳人其他遠(yuǎn)程服務(wù)器或者 Source 中。

????????在整個數(shù)據(jù)傳輸?shù)倪^程中,F(xiàn)lume將流動的數(shù)據(jù)封裝到一個event(事件)中,它是 Flume 內(nèi)部數(shù)據(jù)傳輸?shù)幕締卧?。一個完整的event 包含 headers和body,其中 headers 包含了一些標(biāo)識信息,而body中就是Flume 收集到的數(shù)據(jù)信息。

3、Flume的日志采集系統(tǒng)結(jié)構(gòu)

(1)簡單結(jié)構(gòu)

??????? 當(dāng)需要采集數(shù)據(jù)的生產(chǎn)源比較單一、簡單時,可以直接使用一個 Agent 來進(jìn)行數(shù)據(jù)采集并最終存儲。

(2)復(fù)雜結(jié)構(gòu)

??????? 當(dāng)需要采集數(shù)據(jù)的數(shù)據(jù)源分布在不同的服務(wù)器上時,使用一個Agent 進(jìn)行數(shù)據(jù)采集不再適用,這時就可以根據(jù)業(yè)務(wù)需要部署多個 Agent 進(jìn)行數(shù)據(jù)采集并最終存儲。也就是說,對每一個需要收集數(shù)據(jù)的 Web 服務(wù)端都搭建了一個 Agent 進(jìn)行數(shù)據(jù)采集,接著再將這多個 Agent 中的數(shù)據(jù)作為下一個 Agent 的 Source 進(jìn)行采集并最終集中存儲到 HDFS 中。除此之外,在開發(fā)過程工作中可能遇到從同一個服務(wù)端采集數(shù)據(jù),然后通過多路復(fù)用流分別傳輸并存儲到不同目的地情況,根據(jù)具體需求,將一個 Agent 采集的數(shù)據(jù)通過不同的 Channel 分別流向了不同的 Sink,然后再進(jìn)行下一階段的傳輸或存儲。

二、Flume的基本使用

1、系統(tǒng)要求

????????作為 Apache 旗下的一個頂級項(xiàng)目,想要使用 Flume 進(jìn)行開發(fā),必須滿足一定的系統(tǒng)要求,這里以官方說明為準(zhǔn),具體要求如下。

(1)安裝Java 1.8 或更高版本 Java 運(yùn)行環(huán)境(針對本次使用的 Flume 1.8版本);

(2)為Source(數(shù)據(jù)采集器)、Channel(緩沖通道)和Sink(接收器)的配置提供足夠的內(nèi)存空間;

(3)為Channel(緩沖通道)和 Sink(接收器)的配置提供足夠的磁盤空間;

(4)保證Agent(代理)對要操作的目錄有讀寫權(quán)限。

????????上述系統(tǒng)要求中,Java運(yùn)行環(huán)境的版本與將要安裝使用的Flume版本是對應(yīng)的,如果使用Flume 1.6 版本,則要求使用Java 1.6及以上運(yùn)行環(huán)境,由于本章后續(xù)將以Flume 1.8.0 為準(zhǔn),所以要求安裝 Java 1.8 及以上運(yùn)行環(huán)境。

2、Flume安裝

(1)下載Flume

????????下載flume到 /export/software/ 目錄中

https://dlcdn.apache.org/flume/1.8.0/apache-flume-1.8.0-bin.tar.gz

(2)解壓

進(jìn)入目錄/export/software/,執(zhí)行命令
tar -xzvf apache-flume-1.8.0-bin.tar.gz -C /export/servers/

(3)重命名

進(jìn)入目錄/export/servers/,執(zhí)行命令
mv apache-flume-1.8.0-bin flume

(4)配置Flume環(huán)境

配置 flume-env.sh
cd /export/servers/flume/conf
cp flume-env.sh.template flume-env.sh

vi flume-env.sh #編輯文件,增加如下行
export JAVA_HOME=/export/servers/jdk

配置 /etc/profile
vi /etc/profile #編輯文件,增加如下行
export FLUME_HOME=/export/servers/flume
export PATH=$PATH:$FLUME_HOME/bin

3、Flume的入門使用

(1)配置Flume采集方案

??????? 在/export/servers/flume/conf的目錄下配置netcat-logger.conf,相關(guān)代碼如下

# 示例配置方案: 單節(jié)點(diǎn) Flume 配置

#為agent各組件命名
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 描述并配置sources組件(數(shù)據(jù)類型、采集數(shù)據(jù)源的應(yīng)用地址)
a1.sources.r1.type = netcat
a1.sources.r1.bind = localhost
a1.sources.r1.port = 44444

# 描述并配置sinks組件(采集后數(shù)據(jù)流出的類型)
a1.sinks.k1.type = logger

# 描述并配置channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 將source 和 sink 通過同一個channel連接綁定
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

a、采集方案的名稱可以自定義,但為了方便管理和使用,通常會根據(jù)數(shù)據(jù)源類型和收集的結(jié)果類型進(jìn)行命名。如 netcat-logger.conf 表示采集 netcat 類型數(shù)據(jù)源并最終作為 logger 日志信息收集。

b、采集方案文件的位置可以自定義存放,在使用的時候會要求指定配置方案的具體位置,為了方便統(tǒng)一管理,通常會將采集方案統(tǒng)一存放。如本案例中,會將所有自定義的采集方案文件保存在/export/servers/flume/conf目錄下。

c、采集方案中的sources、channels、sinks是在具體編寫時根據(jù)業(yè)務(wù)需求進(jìn)行配置的,不能隨意定義。Flume支持采集的數(shù)據(jù)類型可以通過查看官網(wǎng)進(jìn)行詳細(xì)了解(地址https://flume.apache.org/FlumeUserGuide.html ),同時針對不同的 sources type、channelstype和 sinks type 需要編寫不同的配置屬性。

????????注意:配置采集方案中,在編寫Source、Sink 與Channel 關(guān)聯(lián)綁定時特別容易出錯,如文件netcat-logger.conf中所示的al.sources.rl.channels = c1 和 al.sinks. kl.channel = cl, sources 的 channels 比 sinks 的 channel 多了一個s。這是因?yàn)?,在一個 Agent 中,同一個Source 可以有多個Channel,所以配置時使用 channels(channel 的復(fù)數(shù)形式);而同一個 Sink 只能為一個 Channel 服務(wù),所以配置時必須使用 Channel。

(2)指定采集方案啟動Flume

進(jìn)入 /export/servers/flume 目錄,使用指定采集方案啟動FLume
flume-ng agent --conf conf/ --conf-file conf/netcat-logger.conf --name a1 -Dflume.root.logger=INFO,console

????????執(zhí)行上述指令后,就會使用前面編寫的采集方案 netcat-logger.conf來啟動Flume.該 Flume系統(tǒng)會根據(jù)采集方案的配置監(jiān)聽當(dāng)前主機(jī) localhost下44444 端口發(fā)送的neteat類型源數(shù)據(jù),并將信息收集接收到類型為 logger 的 Sink中。

接下來,對上述指令中的各部分內(nèi)容進(jìn)行說明,具體如下所示。

a、flume-ng agent:表示使用flume-ng啟動一個agent;

b、-confconf/:-conf選項(xiàng)指定了Flume自帶的配置文件路徑,可用-c簡寫格式;

c、-conf-file conf/netcat-logger.conf:-conf-file 選項(xiàng)指定了開發(fā)者編寫的采集方案,可用-f簡寫格式,需要注意配置文件所在路徑,建議讀者使用絕對路徑指定采集方案,否則將提示文件不存在的錯誤;

d、-nameal:表示啟動的agent名稱為al,該名稱al 必須與采集方案中agent的名稱保持一致;

e、-Dflume.root.logger=INFO,console:表示將采集處理后的信息通過logger日志的信息輸出到控制臺進(jìn)行展示。

(3)Flume數(shù)據(jù)采集測試

如果出現(xiàn)”-bash: telnet: command not found",請使用以下指令安裝telnet工具。
yum -y install telnet

使用telnet 連接到本地主機(jī) localhost端口44444,用來持續(xù)發(fā)送數(shù)據(jù)信息作為Flume將要采集的源數(shù)據(jù)。
telnet localhost 44444

登錄后發(fā)送信息:
hello
OK(收到信息)

world
OK(收到信息)

大數(shù)據(jù)技術(shù)之Hadoop(八)——Flume日志采集系統(tǒng)大數(shù)據(jù)技術(shù)之Hadoop(八)——Flume日志采集系統(tǒng)

三、Flume采集方案配置說明

1、Flume Source

??????? 在編寫 Flume 采集方案時,首先必須明確的是采集的數(shù)據(jù)源類型、出處;接著,根據(jù)這些信息與Flume 已提供的支持的 Flume Source 進(jìn)行匹配,選擇對應(yīng)的數(shù)據(jù)采集器類型(source. type);然后,再根據(jù)選擇的數(shù)據(jù)采集器類型,匹配必要和非必要的數(shù)據(jù)采集器屬性。Flume提供的并支持的 Flume Source 有很多,具體的可前往官網(wǎng)查看https://flume.apache.org/FlumeUserGuide.html#flume-sourceshttps://flume.apache.org/FlumeUserGuide.html#flume-sources????????這里列舉一些常用的 Flume Source。

(1)Avro Source

??????? 監(jiān)聽 Avro 端口并從外部 Avro 客戶端流中接收 event 數(shù)據(jù),當(dāng)與另外一個 Flume Agent 上的 Avro Sink 配對時,它可以創(chuàng)建分層集合拓?fù)?,利?Avro Source 可以實(shí)現(xiàn)多級流動、扇出流、扇入流等效果。

Avro Source常用屬性(加粗部分為必須屬性)
屬性名稱 默認(rèn)值 說明
channels ——
type —— 組件類型名必須是 avro
bind —— 要監(jiān)聽的主機(jī)名或 IP 地址
port —— 要監(jiān)聽的服務(wù)端口
threads —— 要生成的工作線程的最大數(shù)目
ssl false 將此設(shè)置為 true 以啟用 SSL 加密,則必須指定 keystore 和 keystore-password
keystore —— SSL 所必需的通往 Java 密鑰存儲路徑
keystore-password —— SSL 所必需的 Java 密鑰存儲的密碼
使用 Avro Source 采集器配置一個名稱為 a1 的 Agent。
a1.sources=r1
a1.channels=c1
a1.sources.r1.type=avro
a1.sources.r1.channels=c1
a1.sources.r1.bind=0.0.0.0
a1.sources.r1.port=4141

?(2)Spooling Directory Source

????????Spooling Directory Source 允許對指定磁盤上的文件目錄進(jìn)行監(jiān)控來提取數(shù)據(jù),它將查看文件的指定目錄的新增文件,并將文件中的數(shù)據(jù)讀取出來。

Spooling Directory Source常用屬性(加粗部分為必須屬性)
屬性名稱 默認(rèn)值 說明
channels ——
type —— 組件類型名必須是spooldir
spoolDir —— 從中讀取文件的目錄
fileSuffix . COMPLETED 附加到完全攝取的文件后綴
deletePolicy never 何時刪除已完成的文件:never 或 immediate
fileHeader false 是否添加存儲絕對路徑文件名的標(biāo)頭
includePattern ^. * $ 正則表達(dá)式,指定要包含的文件
ignorePattern ^ $ 正則表達(dá)式,指定要忽略的文件
使用 Spooling Directory Source 采集器配置一個名稱為 a1 的 Agent。
a1.channels=ch-1
a1.sources=src-1
a1.sources.src-1.type=spooldir
a1.sources.src-1.channels=ch-1
a1.sources.src-1.spoolDir=/var/log/apache/flumeSpool
a1.sources.src-1.fileHeader=true

(3)Taildir Source

????????Taildir Source 用于觀察指定的文件,幾乎可以實(shí)時監(jiān)測到添加到每個文件的新行。如果文件正在寫入新行,則此采集器將重試采集它們以等待寫入完成。

Taildir Source常用屬性(加粗部分為必須屬性)
屬性名稱 默認(rèn)值 說明
channels ——
type —— 組件類型名必須是 TAILDIR
filegroups —— 以空格分隔的文件組列表。每個文件組都指定了要監(jiān)控的一系列文件
filegroups. <filegroupName> —— 文件組的絕對路徑。正則表達(dá)式(而不是文件系統(tǒng)模式)只能用于文件名
idle Timeout 120000 關(guān)閉非活動文件的時間(ms)。如果關(guān)閉的文件附加了新行,則此源將自動重新打開它
writePosInterval 3000 寫入位置文件上每個文件的最后位置的間隔時間(ms)
batchSize 100 一次讀取和發(fā)送到通道的最大行數(shù)。使用默認(rèn)值通常效果較好
backoffSleepIncrement 1000 當(dāng)最后一次嘗試未找到任何新數(shù)據(jù)時,每次重新嘗試輪詢新數(shù)據(jù)之間的最大時間延遲
fileHeader false 是否添加存儲絕對路徑文件名的標(biāo)頭
fileHeaderKey file 將絕對路徑文件名附加到 event header 時使用的 header 關(guān)鍵字
使用 Taildir Source 采集器配置一個名稱為 a1 的 Agent。
a1.sources=r1
a1.channels=c1
a1.sources.r1.type=TAILDIR
a1.sources.r1.channels=c1
a1.sources.r1.positionFile=/var/log/flume/taildir_position.json
a1.sources.r1.filegroups=f1 f2
a1.sources.r1.filegroups.f1=/var/log/test1/example.log
a1.sources.r1.headers.f1.headerKet1=value1
a1.sources.r1.filegroups.f2=/var/log/test2/. * log. *
a1.sources.r1.headers.f2.headersKey1=value2
a1.sources.r1.headers.f2.headersKey2=value2-2
a1.sources.r1.fileHeader=true

(4)HTTP Source

????????HTTP Source 可以通過 HTTP POST 和 GET 請求方式接收 event 數(shù)據(jù),GET 通常只能用于測試使用。HTTP 請求會被實(shí)現(xiàn)了 HTTPSourceHandler 接口的 handler(處理器)可插拔插件轉(zhuǎn)成 Flume events,這個 handler 接收 HttpServletRequest,返回 Flume events列表。一個 HTTP請求處理的所有事件都在一個事務(wù)中提交給通道,從而允許在諸如file channel 之類的 channel 上提高效率。如果 handler 拋出異常,source 會返回400;如果 channel 滿了或者 source 不能再向 channel 追加 event,source 會返回 503。在一個 POST 請求發(fā)送的所有的 events 都被認(rèn)為是一個批次,會在一個事務(wù)中插入 channel。

HTTP Source常用配置屬性(加粗部分為必須屬性)
屬性名稱 默認(rèn)值 說明
channels ——
type 組件類型名必須是 http
port —— 采集源要綁定的端口
bind 0.0.0.0 要監(jiān)聽綁定的主機(jī)名或 IP 地址
handler org.apache.flume.source.http.JSONHandler handler 類的全路徑名
handler. * —— 配置 handler 的參數(shù)
使用 HTTP Source 采集器配置一個名稱為a1 的 Agent。
a1.sources=r1
a1.channels=c1
a1.sources.r1.type=http
a1.sources.r1.port=5140
a1.sources.r1.channels=c1
a1.sources.r1.handler=org.example.rest.RestHandler
a1.sources.r1.handler.nickname=random props

2、Flume channel

????????Channels 通道是event 在Agent 上暫存的存儲庫,Source 向 Channel 中添加event,Sink 在讀取完數(shù)據(jù)后再刪除它。在配置Channels時,需要明確的是將要傳輸?shù)?sources 數(shù)據(jù)源類型;接著,根據(jù)這些信息并結(jié)合開發(fā)中的實(shí)際需求,選擇Flume 已提供支持的 Flume Channels;然后,再根據(jù)選擇的Channel類型,配置必要和非必要的Channel屬性。

??????? 這是官方文檔

https://flume.apache.org/FlumeUserGuide.html#flume-channelshttps://flume.apache.org/FlumeUserGuide.html#flume-channels

(1)Memory Channel

????????Memory Channel 會將 event 存儲在具有可配置最大尺寸的內(nèi)存隊(duì)列中,它非常適用于需要更高吞吐量的流量,但是在 Agent發(fā)生故障時會丟失部分階段數(shù)據(jù)。

Memory Channel常用配置屬性(加粗部分為必須屬性)
屬性名稱 默認(rèn)值 說明
type —— 組件類型名必須是memory
capacity 100 存儲在 Channel 中的最大 even數(shù)
transactionCapacity 100 Channel 將從 Source 接收或向 Sink傳遞的每一個事務(wù)中的最大 event數(shù)
keep-alive 3

添加或刪除 event 的超時時間(s)

byteCapacityBufferPercentage 20

定義 byteCapacity與Channel中所有event的估計(jì)總大小之間的緩沖區(qū)百分比,以計(jì)算 header中的數(shù)據(jù)

byteCapacity (見說明)

允許此Channel 中所有event的最大內(nèi)存字節(jié)數(shù)總和。該統(tǒng)計(jì)僅計(jì)算Eventbody,這也是提供byteCapacityBufferPercentage 配置參數(shù)的原因。默認(rèn)計(jì)算值,等于JVM可用的最大內(nèi)存的 80%(即命令行傳遞的-Xmx 值的 80%)

使用Memory Channel 通道配置一個名稱為 a1 的 Agent。
a1.channels=c1
a1.channels.c1.type=memory
a1.channels.c1.capacity=10000
a1.channels.c1.transactionCapacity=10000
a1.channels.c1.byteCapacityBufferPercentage=20
a1.channels.c1.byteCapacity=800000

(2)File channel

????????File channel 是 Flum 的持久通道,它將所有的 event 寫入磁盤,因此不會丟失進(jìn)程或機(jī)器關(guān)機(jī)、崩潰時的數(shù)據(jù)。File channel 通過在一次事務(wù)中提交多個 event 來提高吞吐量,做到了只要事務(wù)被提交,那么數(shù)據(jù)就不會有丟失。

File channel常用配置屬性(加粗部分為必須屬性)
屬性名稱 默認(rèn)值 說明
type —— 組件類型名必須是file
checkpointDir ~/.flume/file-channel/checkpoint 檢測點(diǎn)文件所存儲的目錄
useDualCheckpoints false 備份檢測點(diǎn)如果設(shè)置為 true,backupCheckpointDir 必須設(shè)置
backupCheckpointDir —— 備份檢查點(diǎn)目錄。此目錄不能與數(shù)據(jù)目最或檢查點(diǎn)目錄相同
dataDirs ~/.flume/file-channel/data 數(shù)據(jù)存儲所在的目錄設(shè)置
transactionCapacity 10000 事務(wù)容量的最大值設(shè)置
checkpointInterval 30000 檢測點(diǎn)之間的時間值設(shè)置(ms)
maxFileSize 2146435071 一個單一日志的最大值設(shè)置(以字節(jié)為單位)
capacity 100000 Channel的最大容量
使用 File channel 通道配置一個名稱為 a1 的 Agent。
a1.channels=c1
a1.channels.c1.type=file
a1.channels.c1.checkpointDir=/mnt/flume/checkpoint
a1.channels.c1.dataDirs=/mnt/flume/data

3、Flume Sinks

????????Flume Sources 采集到的數(shù)據(jù)通過 Channels 就會流向 Sink 中,此時的 Sink類似一個區(qū)結(jié)的遞進(jìn)中心,它需要根據(jù)后續(xù)需求進(jìn)行配置,從而最終選擇是將數(shù)據(jù)直接進(jìn)行集中式存儲(例如,直接存儲到 HDFS中),還是繼續(xù)作為其他 Agent 的 Source 進(jìn)行傳輸。在配置Sinks時,需要明確的就是將要傳輸?shù)臄?shù)據(jù)目的地、結(jié)果類型;接著,根據(jù)這些實(shí)際需求信息,選擇Flume已提供支持的 Flume Sinks;然后,再根據(jù)選擇的 Sinks類型,配置必要和非必要的 Sinks 屬性。

??????? 具體可前往官方文檔查看

https://flume.apache.org/FlumeUserGuide.html#flume-sinkshttps://flume.apache.org/FlumeUserGuide.html#flume-sinks

(1)HDFS Sink

??????? HDFS Sink 將 event 寫入 Hadoop 分布式文件系統(tǒng)(HDFS),它目前支持創(chuàng)建文本和序列文件,以及兩種類型的壓縮文件。HDFS Sink 可以基于經(jīng)過的時間或數(shù)據(jù)大小或 event數(shù)量來周期性地滾動文件(關(guān)閉前文件并創(chuàng)建新文件),同時,它還通過屬性(如 event發(fā)生的時間戳或機(jī)器)來對數(shù)據(jù)進(jìn)分桶/分區(qū)。HDFS 目錄路徑可能包含將由 HDFS 接收器替換的格式化轉(zhuǎn)義序列,以生用于存儲 event 的目錄/文件名,使用 HDFS Sink 時需要安裝 Hadoop,以便 Flume 可以有用Hadoop jar 與HDFS 集群進(jìn)行通信。

HDFS Sink常用配置屬性(加粗部分為必須屬性)
屬性名稱 默認(rèn)值 說明
channel ——
type —— 組件類型名必須是hdfs

hdfs.path

—— HDFS目錄路徑(如hdfs://namenode/ume/webdata/)
hdfs.filePrefix FlumeData 為在 hdis 目錄中由 Flume 創(chuàng)建的文件指定前綴
hdís.round false

是否應(yīng)將時間戳向下舍人(如果為 true.則影響除%;之外的

所有基于時間的轉(zhuǎn)義序列)

hdfs.roundValue 1

舍人到此最高倍數(shù)(在使用 hdfs.roundUnit 配置的單位中),小于當(dāng)前時間

hdfs.roundUnit second 舍人值的單位(秒、分鐘或小時)
hdfs. rollInterval 30 滾動當(dāng)前文件之前等待的秒數(shù)(0==根據(jù)時間間隔從不滾動)
hdfs.rollSize 1024 觸發(fā)滾動的文件大小,以字節(jié)為單位(0:永不基于文件大小滾動)
hdfs.rollCount 10 在滾動之前寫入文件的事件數(shù)(0==從不基于事件數(shù)滾動)
hdfs.batchSize 100 在將文件刷新到 HDFS之前寫人文件的 event 數(shù)
hdfs.useLocalTimeStamp false

替換轉(zhuǎn)義序列時,請使用本地時間(而不是event beader 中的時間戳)

使用 HDFS Sink 配置一個名稱為 a1 的 Agent。
a1.channels=c1
a1.sinks=k1
a1.sinks.k1.type=hdfs
a1.sinks.k1.channel=c1
a1.sinks.k1.hdfs.path=/flume/events/%y-%m-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix=events-
a1.sinks.k1.hdfs.round=true
a1.sinks.k1.hdfs.roundValue=10
a1.sinks.k1.hdfs.roundUnit=minute

(2)Logger Sink

????????Logger Sink用于記錄 INFO 級別 event,它通常用于調(diào)試。Logger Sink 接收器的不同之處是它不需要在“記錄原始數(shù)據(jù)”部分中說明額外配置。

Logger Sink常用配置屬性(加粗部分為必須屬性)
屬性名稱 默認(rèn)值 說明
channel ——
type —— 組件類型名必須是 logger
maxBytes ToLog 16 要記錄的 event body 的最大字節(jié)數(shù)
使用 Logger Sink 配置一個名稱為 a1 的 Agent。
a1.channels=c1
a1.sinks=k1
a1.sinks.k1.type=logger
a1.sinks.k1.channel=c1

(3)Avro Sink

????????Avro Sink 形成了 Flume 的分層收集支持的一半,發(fā)送到此接收器的 Flume event 將轉(zhuǎn)換為 Avro event 并發(fā)送到配置的主機(jī)名/端口對上,event 將從配置的 Channel中批量獲取配置的批處理大小。

Avro Sink常用配置屬性(加粗部分為必須屬性)
屬性名稱 默認(rèn)值 說明
channel ——
type —— 組件類型名必須是 avro
hostname —— 要監(jiān)聽的主機(jī)名或IP地址
port —— 要監(jiān)聽的服務(wù)端口
batch-size 100 要一起批量發(fā)送的 event 數(shù)
connect-timeout 20000 允許第一次(握手)請求的時間量(ms)
request-timeout 20000 在第一個之后允許請求的時間量
使用 Avro Sink 配置一個名稱為 a1 的 Agent。
a1.channels=c1
a1.sinks=k1
a1.sinks.k1.type=avro
a1.sinks.k1.channel=c1
a1.sinks.k1.hostname=10.10.10.10
a1.sinks.k1.port=4545

四、Flume的可靠性保證

??????? 前面講解的Flume 人門使用中,配置的采集方案是通過唯一一個Sink作為接收器接收后續(xù)需要的數(shù)據(jù),但有時候會出現(xiàn)當(dāng)前 Sink故障或者數(shù)據(jù)收集請求量較大的情況,這候單一的 Sink 配置可能就無法保證 Flume 開發(fā)的可靠性。為此,F(xiàn)lume 提供了Flume Sink Processors(Flume Sink 處理器)來解決上述問題。

????????Sink 處理器允許開發(fā)者定義一個 Sink groups(接收器組),將多個 Sink 分組到一個實(shí)體中,這樣Sink處理器就可以通過組內(nèi)的多個Sink為服務(wù)提供負(fù)載均衡功能,或者是在某個Sink出現(xiàn)短暫故障的時候?qū)崿F(xiàn)從一個 Sink 到另一個 Sink的故障轉(zhuǎn)移。

1、負(fù)載均衡

????????負(fù)載均衡接收器處理器(Load balancing sink processor)提供了在多個 Sink 上進(jìn)行負(fù)載均衡流量的功能,它維護(hù)了一個活躍的Sink索引列表,必須在其上分配負(fù)載。Load balancing sink processor 支持使用 round_robin (輪詢)和 random (隨機(jī))選擇機(jī)制進(jìn)行流量分配,其默認(rèn)選擇機(jī)制為 round_robin,但可以通過配置進(jìn)行覆蓋。還支持繼承 AbstractSinkSelector 的自定義類來自定義選擇機(jī)制。

????????在使用時選擇器(selector)會根據(jù)配置的選擇機(jī)制挑選下一個可用的Sink并進(jìn)行調(diào)用。對于round_robin和random兩種選擇機(jī)制,如果所選 Sink無法收集 event,則處理器會通過其配置的選擇機(jī)制選擇下一個可用 Sink。這種實(shí)現(xiàn)方案不會將失敗的Sink列人黑單,而是繼續(xù)樂觀地嘗試每個可用的 Sink。如果所有 Sink 都調(diào)用失敗,則選擇器將故障傳播到接收器運(yùn)行器(sink runner)。

????????如果啟用了 backoff 屬性,則 Sink處理器會將失敗的Sink列人黑名單。當(dāng)超時結(jié)束時,如果Sink仍然沒有響應(yīng),則超時會呈指數(shù)級增加,以避免在無響應(yīng)的 Sink 上長時間等待時卡住。在禁用backoff 功能的情況下,在 round_robin 機(jī)制下,所有失敗的 Sink 將被傳遞到 Sink 隊(duì)列中的下一個 Sink后,因此不再均衡。

Load balancing sink processor提供的配置屬性(加粗部分為必須屬性)。
屬性名稱 默認(rèn)值 說明
sinks —— 以空格分隔的參與 sink 組的 sink 列表
processor.type default 組件類型名必須是 load_balance
processor.backoff false 設(shè)置失敗的 sink 進(jìn)人黑名單
processor.selector round_robin 選擇機(jī)制。必須是 round_robin、random或是繼承自AbstractSinkSelector 的自定義選擇機(jī)制類全路徑名
processor.selector.maxTimeOut 30000

失敗 sink 放置在黑名單的超時時間,失敗sink在指

指定時間后仍無法啟用,則超時時間呈指數(shù)增加

????????processor.type 屬性的默認(rèn)值為 default,這是因?yàn)?Sink 處理器的 processor.type 提供了3 種處理機(jī)制:default(默認(rèn)值)、failover 和 load_balance。其中, default 表示配置單獨(dú)一個sink,配置和使用非常簡單,同時也不強(qiáng)制要求使用 sink group 進(jìn)行封裝;另外的 failover 和 load_balance 就分別代表故障轉(zhuǎn)移和負(fù)載均衡情況下的配置屬性。

使用 Load balancing sink processor 配置一個名稱為 al 的 Agent。
a1.sinkgroups=g1
a1.sinkgroups.g1.sinks=kl k2
a1.sinkgroups.g1.processor.type=load_balance 
al.sinkgroups.g1.processor.backoff-true
al.ainkgroups.g1.processor.selector=random

(1)搭建并配置Flume機(jī)器

在hadoop01上,將Flume同步到hadoop02、hadoop03上
scp -r /export/servers/flume hadoop02.bgd01:/export/servers/
scp -r /export/servers/flume hadoop03.bgd01:/export/servers/

scp -r /etc/profile hadoop02.bgd01:/etc/profile
scp -r /etc/profile hadoop03.bgd01:/etc/profile

分別在hadoop02、hadoop03上執(zhí)行如下命令,立即刷新配置
source /etc/profile

(2)配置Flume采集方案

a、exec-avro.conf

????????在hadoop01.bgd01上配置第一級采集配置,在/export/servers/flume/conf/目錄下編寫采集方案exec-avro.conf。

?# 配置load balancing sink processor一級采集方案
a1.sources = r1

#用空格分別配置2個sink
a1.sinks = k1 k2
a1.channels = c1

# 描述并配置sources組件(數(shù)據(jù)類型、采集數(shù)據(jù)源的應(yīng)用地址)
a1.sources.r1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /root/logs/123.log

# 描述并配置channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 設(shè)置sink1,流向Hadoop02,由Hadoop02上的Agent進(jìn)行采集
a1.sinks.k1.channel = c1
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop02.bgd01
a1.sinks.k1.port = 52020

# 設(shè)置sink2,流向Hadoop03,由Hadoop03上的Agent進(jìn)行采集
a1.sinks.k2.channel = c1
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop03.bgd01
a1.sinks.k2.port = 52020

# 配置sink組及處理器策略
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = load_balance
a1.sinkgroups.g1.processor.backoff = true
a1.sinkgroups.g1.processor.selector = random
a1.sinkgroups.g1.processor.timeout = 10000

b、netcat-logger.conf

???????? 分別在hadoop02.bgd01和hadoop03.bgd01上配置第二級采集配置,在/export/servers/flume/conf/目錄下編寫采集方案netcat-logger.conf。

# 配置load balancing sink processor二級采集方案的一個Sink分支

#為agent各組件命名
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 描述并配置sources組件(數(shù)據(jù)類型、采集數(shù)據(jù)源的應(yīng)用地址)
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop02.bgd01
a1.sources.r1.port = 52020

# 描述并配置sinks組件(采集后數(shù)據(jù)流出的類型)
a1.sinks.k1.type = logger

# 描述并配置channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 將source 和 sink 通過同一個channel連接綁定
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
# 配置load balancing sink processor二級采集方案的一個Sink分支

#為agent各組件命名
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 描述并配置sources組件(數(shù)據(jù)類型、采集數(shù)據(jù)源的應(yīng)用地址)
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop03.bgd01
a1.sources.r1.port = 52020

# 描述并配置sinks組件(采集后數(shù)據(jù)流出的類型)
a1.sinks.k1.type = logger

# 描述并配置channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 將source 和 sink 通過同一個channel連接綁定
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

??????? 兩個采集方案內(nèi)容的唯一區(qū)別就是 source.bind 的不同,hadoop02.bgd01 機(jī)器的source.bind=hadoop02.bgd01,而hadoop03.bgd01 機(jī)器的 source.bind=hadoop03.bgd01,在 r述兩個文件中,均設(shè)置了一個名為 al的Agent,在該 Agent 內(nèi)部設(shè)置了 source. type= avro、source.bind=hadoop02.bgd01/hadoop03.bgd01 以及 source. port=52020,特意用來對接在 hddoop01.bgd01中前一個Agent收集后到 Sink的數(shù)據(jù)類型和配置傳輸?shù)哪繕?biāo);最后,又設(shè)置了二采集方案的 sink.type=logger,將二次收集的數(shù)據(jù)作為日志收集打印。

(2)啟動Flume系統(tǒng)

1、分別在hadoop02、hadoop03上,進(jìn)入 /export/servers/flume 目錄,使用netcat-logger采集方案啟動FLume
flume-ng agent --conf conf/ --conf-file conf/netcat-logger.conf --name a1 -Dflume.root.logger=INFO,console

2、在hadoop01上,進(jìn)入 /export/servers/flume 目錄,使用exec-avro.conf采集方案啟動FLume
flume-ng agent --conf conf/ --conf-file conf/exec-avro.conf --name a1 -Dflume.root.logger=INFO,console

hadoop02.bgd01

大數(shù)據(jù)技術(shù)之Hadoop(八)——Flume日志采集系統(tǒng)

?hadoop03.bgd01

大數(shù)據(jù)技術(shù)之Hadoop(八)——Flume日志采集系統(tǒng)

?hadoop01.bgd01

大數(shù)據(jù)技術(shù)之Hadoop(八)——Flume日志采集系統(tǒng)

(3)Flume系統(tǒng)負(fù)載均衡測試

在hadoop01.bgd01的root目錄創(chuàng)建logs目錄
mkdir -p /root/logs

在hadoop01上,重新打開一個終端,執(zhí)行如下命令:
while true; do echo "access access ..." >> /root/logs/123.log; sleep 1; done

hadoop02.bgd01

大數(shù)據(jù)技術(shù)之Hadoop(八)——Flume日志采集系統(tǒng)

hadoop03.bgd01

大數(shù)據(jù)技術(shù)之Hadoop(八)——Flume日志采集系統(tǒng)

2、故障轉(zhuǎn)移

??????? 故障轉(zhuǎn)移接收器處理器(Failover Sink Processor)維護(hù)一個具有優(yōu)先級的sink列表,保證在處理 event 只要有一個可用的 sink 即可。故障轉(zhuǎn)移機(jī)制的工作原理是將故障的sink降級到故障池中,在池中為它們分配一個冷卻期,在重試之前冷卻時間會增加,當(dāng) sink 成功發(fā)送 event后,它將恢復(fù)到活躍池中。sink具有與之相關(guān)的優(yōu)先級,數(shù)值越大,優(yōu)先級越高。如果在發(fā)送 event 時 sink 發(fā)生故障,則會嘗試下一個具有最高優(yōu)先級的 sink來繼續(xù)發(fā)送event。如果未指定優(yōu)先級,則根據(jù)配置文件中指定 sink 的順序確定優(yōu)先級。

Failover Sink Processor配置屬性(加粗部分為必須屬性)。
屬性名稱 默認(rèn)值 說明
sinks —— 以空格分隔的參與 sink 組的 sink 列表
processor.type default 組件類型名必須是 failover
processor.priority.<sinkName> —— 設(shè)置 sink 的優(yōu)先級取值
processor.maxpenalty 30000 失敗 sink 的最大退避時間
使用 Failover Sink Processor 配置一個名稱為al的Agent。

al.sinkgroups=g1
a1.sinkgroups.g1.sinks=kl k2
al.sinkgroups.g1.processor.type=failover 
al.sinkgroups.g1.processor.priority.k1=5
a1.sinkgroups.gl.processor.priority.k2-10 al.sinkgroups.g1.processor.maxpenalty=10000

(1)配置Flume采集方案

a、avro-logger-memory.conf

????????在hadoop01.bgd01上配置第一級采集配置,在/export/servers/flume/conf/目錄下編寫采集方案avro-logger-memory.conf。

# 配置load balancing sink processor一級采集方案
a1.sources = r1

#用空格分別配置2個sink
a1.sinks = k1 k2
a1.channels = c1

# 描述并配置sources組件(數(shù)據(jù)類型、采集數(shù)據(jù)源的應(yīng)用地址)
a1.sources.r1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /root/logs/456.log

# 描述并配置channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 設(shè)置sink1,流向Hadoop02,由Hadoop02上的Agent進(jìn)行采集
a1.sinks.k1.channel = c1
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop02.bgd01
a1.sinks.k1.port = 52020

# 設(shè)置sink2,流向Hadoop03,由Hadoop03上的Agent進(jìn)行采集
a1.sinks.k2.channel = c1
a1.sinks.k2.type = avro
a1.sinks.k2.hostname = hadoop03.bgd01
a1.sinks.k2.port = 52020

# 配置sink組及處理器策略
a1.sinkgroups = g1
a1.sinkgroups.g1.sinks = k1 k2
a1.sinkgroups.g1.processor.type = failover
a1.sinkgroups.g1.processor.priority.k1=5
a1.sinkgroups.g1.processor.priority.k2=10
a1.sinkgroups.g1.processor.timeout = 10000

?b、exec-avro-failover.conf

????????分別在hadoop02.bgd01和hadoop03.bgd01上配置第二級采集配置,在/export/servers/flume/conf/目錄下編寫采集方案exec-avro-failover.conf。

# 配置load balancing sink processor二級采集方案的一個Sink分支

#為agent各組件命名
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 描述并配置sources組件(數(shù)據(jù)類型、采集數(shù)據(jù)源的應(yīng)用地址)
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop02.bgd01
a1.sources.r1.port = 52020

# 描述并配置sinks組件(采集后數(shù)據(jù)流出的類型)
a1.sinks.k1.type = logger

# 描述并配置channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 將source 和 sink 通過同一個channel連接綁定
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
# 配置load balancing sink processor二級采集方案的一個Sink分支

#為agent各組件命名
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 描述并配置sources組件(數(shù)據(jù)類型、采集數(shù)據(jù)源的應(yīng)用地址)
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop03.bgd01
a1.sources.r1.port = 52020

# 描述并配置sinks組件(采集后數(shù)據(jù)流出的類型)
a1.sinks.k1.type = logger

# 描述并配置channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100

# 將source 和 sink 通過同一個channel連接綁定
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

?(2)啟動Flume系統(tǒng)

1、分別在hadoop02、hadoop03上,進(jìn)入 /export/servers/flume 目錄,使用avro-logger-memory.conf采集方案啟動FLume
flume-ng agent --conf conf/ --conf-file conf/avro-logger-memory.conf --name a1 -Dflume.root.logger=INFO,console


2、在hadoop01上,進(jìn)入 /export/servers/flume 目錄,使用exec-avro-failover.conf采集方案啟動FLume
flume-ng agent --conf conf/ --conf-file conf/exec-avro-failover.conf --name a1 -Dflume.root.logger=INFO,console

hadoop01.bgd01

大數(shù)據(jù)技術(shù)之Hadoop(八)——Flume日志采集系統(tǒng)

?hadoop02.bgd01

大數(shù)據(jù)技術(shù)之Hadoop(八)——Flume日志采集系統(tǒng)

?hadoop03.bgd01

大數(shù)據(jù)技術(shù)之Hadoop(八)——Flume日志采集系統(tǒng)

(3)Flume系統(tǒng)故障轉(zhuǎn)移測試

在hadoop01.bgd01上,重新打開一個終端,執(zhí)行如下命令:
while true; do echo "access access ..." >> /root/logs/456.log; sleep 1; done

hadoop02.bgd01

大數(shù)據(jù)技術(shù)之Hadoop(八)——Flume日志采集系統(tǒng)

hadoop03.bgd01

大數(shù)據(jù)技術(shù)之Hadoop(八)——Flume日志采集系統(tǒng)

五、Flume攔截器

????????FlumeInterceptors(攔截器)主要用于實(shí)現(xiàn)對Flume系統(tǒng)數(shù)據(jù)流中 event 的修改操作。在使用 Flume 攔截器時,只需要參考官方配置屬性在采集方案中選擇性地配置即可,當(dāng)涉及配置多個攔截器時,攔截器名稱中間需要用空格分隔,并且攔截器的配置順序就是攔微順序。這里只簡述常用的幾種的,具體可前往官方文檔查看。

https://flume.apache.org/FlumeUserGuide.html#flume-interceptorshttps://flume.apache.org/FlumeUserGuide.html#flume-interceptors

1、Timestamp interceptor

????????TimestampInterceptor(時間戳攔截器)會將流程執(zhí)行的時間插入到event的header頭部。此攔截器插人帶有 timestamp 鍵(或由 header 屬性指定鍵名)的標(biāo)頭,其值為對應(yīng)時間戳。如果配置中已存在時間戳?xí)r,此攔截器可以保留現(xiàn)有的時間戳。

Timestamp Interceptor常用配置屬性(加粗部分為必須屬性)。
屬性名稱 默認(rèn)值 說明
type —— 組件類型名必須是timestamp
header timestamp 用于放置生成的時間戳的標(biāo)頭的名稱
preserveExisting false 如果時間戳已存在,是否應(yīng)保留,true或false
為名稱為a1的Agent中配置Timestamp interceptor。
a1.sources=r1
a1.channels=c1
a1.sources.r1.channels=c1
a1.sources.r1.type=seq
a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type=timestamp

2、 Static interceptor

????????Static Interceptor(靜態(tài)攔截器)允許用戶將具有舒志值的靜態(tài)頭附加到所有even,當(dāng)前實(shí)現(xiàn)不支持一次指定多個 header頭,但是用戶可以定又多個 Static Intercrptor 來為每一個攔截器都追加一個 header。

Static Interceptor常用配置屬性(加粗部分為必須屬性)。
屬性名稱 默認(rèn)值 說明
type —— 組件類型名必須是static
preserveExisting true 如果配置的header已存在,是否應(yīng)保服
key key 應(yīng)創(chuàng)建的 header 的名稱
value value 應(yīng)創(chuàng)建的header 對應(yīng)的靜態(tài)值
為名稱是al的Agent 中配置 Static Intercepior。
a1.sources=r1
a1.channels=c1
a1.sources.r1.channels=c1
a1.sources.r1.type=seq
a1.sources.r1.interceptors=i1
a1.sources.r1.interceptors.i1.type=static
a1.sources.r1.interceptors.i1.key=datacenter
a1.sources.r1.interceptors.i1.value=BET_JING

3、Search and Replace Interceptor

????????Search and Replace Interceptor(查詢和替換攔戴器)基于Java正則表達(dá)式提供了簡用的用于字符串的搜索和替換功能,同時還具有進(jìn)行回溯/群組捕捉功能。此攔截器的便用與Matcher.replaceAllO方法具有相同的規(guī)則。

Sarch and Replace Intereeptor常用配置屬性(加租部分為必須屬性)
屬性名稱 默認(rèn)值 說明
type —— 組件類型名必須是 search_replane
searchPattern —— 要查詢或替換的模式
replaceString —— 替換的字符串
charset UTF-8 event body 的字符集,默認(rèn)為 UTF-8
為名稱為al的Agent 中配置 Search and Replace Interceptor的示例如下。
al.sources=r1 
a1.channels=cl
a1.sources.r1.channels=cl 
a1.sources.r1.type=seg
a1.sources.avroSrc.interceptors=i1
al.sources.avroSrc.interceptors.i1.type=search_replace 
# 影除 event body 中的前導(dǎo)字母數(shù)字字符
a1.sources.avroSrc.interceptors.i1.searchPattern=^[A-Za-z0-9_]+ al.sources.avroSrc.interceptors.i1.replacesString=

六、案例——日志采集

1、配置采集方案

(1)exec-avro_logCollection.conf?

??????? 分別在hadoop02.bgd01和hadoop03.bgd01上配置同樣的采集目錄,在/export/servers/flume/conf/目錄下編寫采集方案exec-avro_logCollection.conf。

?# 配置 Agent 組件

# 用3個source采集不同的日志類型數(shù)據(jù)
a1.sources = r1 r2 r3
a1.sinks = k1
a1.channels = c1

# 描述并配置第一個sources組件(數(shù)據(jù)類型、采集數(shù)據(jù)源的應(yīng)用地址、包括自帶的攔截器)
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /root/logs/access.log
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = static
a1.sources.r1.interceptors.i1.key = type
a1.sources.r1.interceptors.i1.value = access

# 描述并配置第二個sources組件(數(shù)據(jù)類型、采集數(shù)據(jù)源的應(yīng)用地址、包括自帶的攔截器)
a1.sources.r2.type = exec
a1.sources.r2.command = tail -F /root/logs/nginx.log
a1.sources.r2.interceptors = i2
a1.sources.r2.interceptors.i2.type = static
a1.sources.r2.interceptors.i2.key = type
a1.sources.r2.interceptors.i2.value = nginx

# 描述并配置第三個sources組件(數(shù)據(jù)類型、采集數(shù)據(jù)源的應(yīng)用地址、包括自帶的攔截器)
a1.sources.r3.type = exec
a1.sources.r3.command = tail -F /root/logs/web.log
a1.sources.r3.interceptors = i3
a1.sources.r3.interceptors.i3.type = static
a1.sources.r3.interceptors.i3.key = type
a1.sources.r3.interceptors.i3.value = web

# 描述并配置channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 200000
a1.channels.c1.transactionCapacity = 100000

# 描述并配置sink
a1.sinks.k1.type = avro
a1.sinks.k1.hostname = hadoop01.bgd01
a1.sinks.k1.port = 41414

# 將Source、Sink 與Channel 進(jìn)行關(guān)聯(lián)綁定
a1.sources.r1.channels = c1
a1.sources.r2.channels = c1
a1.sources.r3.channels = c1

a1.sinks.k1.channel = c1

(2)avro-hdfs_logCollection.conf

??????? 在hadoop01.bgd01上配置第二級日志采集方案,在/export/servers/flume/conf/目錄下編寫采集方案exec-hdfs_logCollection.conf。

?# 配置load balancing sink processor二級采集方案的一個Sink分支

#配置 Agent 組件
a1.sources = r1
a1.sinks = k1
a1.channels = c1

# 描述并配置sources組件(數(shù)據(jù)類型、采集數(shù)據(jù)源的應(yīng)用地址)
a1.sources.r1.type = avro
a1.sources.r1.bind = hadoop01.bgd01
a1.sources.r1.port = 41414

# 描述并配置攔截器,用于后續(xù)%Y%m%d獲取時間
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = timestamp

# 描述并配置channels
a1.channels.c1.type = memory
a1.channels.c1.capacity = 20000
a1.channels.c1.transactionCapacity = 10000

# 描述并配置sinks組件(采集后數(shù)據(jù)流出的類型)
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path =hdfs://hadoop01.bgd01:9000/source/logs/%{type}/%Y%m%d
a1.sinks.k1.hdfs.filePrefix = events
a1.sinks.k1.hdfs.fileType = DataStream
a1.sinks.k1.hdfs.writeFormat = Text

#生成的文本不按條數(shù)生成
a1.sinks.k1.hdfs.rollCount = 0

#生成的文本不按時間生成
a1.sinks.k1.hdfs.rollInterval = 0

#生成的文本按大小生成
a1.sinks.k1.hdfs.rollSize = 10485760

#批量寫入HDFS的個數(shù)
a1.sinks.k1.hdfs.batchSize = 20

#Flume操作HDFS的線程數(shù)(包括新建、寫入)
a1.sinks.k1.hdfs.threadsPoolSize = 10

#操作HDFS超時時間
a1.sinks.k1.hdfs.callTimeout = 30000

# 將source 和 sink 通過同一個channel連接綁定
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

2、啟動hadoop集群

start-dfs.sh  #啟動HDFS
start-yarn.sh #啟動YARN

3、啟動Flume系統(tǒng)

在hadoop01上啟動Flume系統(tǒng)
打開終端。進(jìn)入 /export/servers/flume 目錄,使用avro-hdfs_logCollection.conf采集方案啟動FLume。命令如下:
flume-ng agent --conf conf/ --conf-file conf/avro-hdfs_logCollection.conf --name a1 -Dflume.root.logger=INFO,console

分別在hadoop02、hadoop03上,進(jìn)入 /export/servers/flume 目錄,使用exec-avro_logCollection.conf采集方案啟動FLume。命令如下:
flume-ng agent --conf conf/ --conf-file conf/exec-avro_logCollection.conf --name a1 -Dflume.root.logger=INFO,console

大數(shù)據(jù)技術(shù)之Hadoop(八)——Flume日志采集系統(tǒng)

大數(shù)據(jù)技術(shù)之Hadoop(八)——Flume日志采集系統(tǒng)

大數(shù)據(jù)技術(shù)之Hadoop(八)——Flume日志采集系統(tǒng)

4、 日志采集系統(tǒng)測試

1、在hadoop02上,創(chuàng)建目錄/root/logs;然后打開3個終端,分別執(zhí)行執(zhí)行如下命令,用來產(chǎn)生日志數(shù)據(jù):
while true; do echo "access access ..." >> /root/logs/access.log; sleep 1; done
while true; do echo "nginx nginx ..." >> /root/logs/nginx.log; sleep 1; done
while true; do echo "web web ..." >> /root/logs/web.log; sleep 1; done

2、在hadoop03上,創(chuàng)建目錄/root/logs;然后打開3個終端,分別執(zhí)行執(zhí)行如下命令,用來產(chǎn)生日志數(shù)據(jù):
while true; do echo "access access ..." >> /root/logs/access.log; sleep 1; done
while true; do echo "nginx nginx ..." >> /root/logs/nginx.log; sleep 1; done
while true; do echo "web web ..." >> /root/logs/web.log; sleep 1; done

????????回到在hadoop01.bgd01上啟動Flume系統(tǒng)的終端窗口,觀察日志采集信息。

大數(shù)據(jù)技術(shù)之Hadoop(八)——Flume日志采集系統(tǒng)

????????在hadoop01.bgd01上,打開FireFox瀏覽器,在地址欄輸入地址 http://hadoop01.bgd01:50070(集群IP/主機(jī)名+端口),進(jìn)入到Hadoop集群UI,可以看到集群下新添加了一個Source目錄。單擊進(jìn)入Source目錄,查看內(nèi)部文件存儲結(jié)構(gòu)。

大數(shù)據(jù)技術(shù)之Hadoop(八)——Flume日志采集系統(tǒng)


參考書籍

《Hadoop大數(shù)據(jù)技術(shù)原理與應(yīng)用》文章來源地址http://www.zghlxwxcb.cn/news/detail-492464.html

到了這里,關(guān)于大數(shù)據(jù)技術(shù)之Hadoop(八)——Flume日志采集系統(tǒng)的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包