一、數(shù)據(jù)采集的問題
數(shù)據(jù)采集一般指的是將數(shù)據(jù)采集到大數(shù)據(jù)環(huán)境下進行持久化、海量化的保存,目的主要是為了我們后期的大數(shù)據(jù)處理(數(shù)據(jù)統(tǒng)計分析、數(shù)據(jù)挖掘等等)沉底數(shù)據(jù)基礎(chǔ)。
不同的來源的數(shù)據(jù)我們一般有不同的數(shù)據(jù)采集方式
1、數(shù)據(jù)來源于我們的RDBMS關(guān)系型數(shù)據(jù)庫:Sqoop數(shù)據(jù)遷移工具實現(xiàn)數(shù)據(jù)的采集
2、數(shù)據(jù)來源于我們系統(tǒng)運行產(chǎn)生的日志文件:日志文件記錄的數(shù)據(jù)量特別龐大,但是日志文件不屬于大數(shù)據(jù)存儲系統(tǒng)中東西,因此日志文件記錄不了海量的數(shù)據(jù),日志文件都會有一個定期清理規(guī)則。采集日志文件數(shù)據(jù)到大數(shù)據(jù)環(huán)境中。
一般采集日志文件數(shù)據(jù)到大數(shù)據(jù)環(huán)境使用的就是Flume技術(shù)
3、數(shù)據(jù)來源于其他網(wǎng)站:開發(fā)一個電影網(wǎng)站,電影網(wǎng)站應該具備哪些功能,哪些類型的電影能受用戶的歡迎。分析競品數(shù)據(jù),這種情況竟品數(shù)據(jù)都是人家別人家網(wǎng)站的數(shù)據(jù),但是我們需要分析,但是人家不給你數(shù)據(jù),通過爬蟲獲取數(shù)據(jù)(一不留神就犯法)。
4、數(shù)據(jù)來源于各種傳感器設(shè)備:不需要我們管
5、第三方提供、購買的第三方數(shù)據(jù)、開源數(shù)據(jù)集平臺提供的(阿里云的天池數(shù)據(jù)集、kaggle數(shù)據(jù)集平臺、飛漿數(shù)據(jù)集平臺、各個地區(qū)的政府公開數(shù)據(jù)集平臺)
二、數(shù)據(jù)采集一般使用的技術(shù)
sqoop技術(shù):采集RDBMS的數(shù)據(jù)到大數(shù)據(jù)環(huán)境中
Flume技術(shù):采集系統(tǒng)/網(wǎng)站產(chǎn)生的日志文件數(shù)據(jù)、端口數(shù)據(jù)等等到大數(shù)據(jù)環(huán)境中
爬蟲技術(shù):采集第三方的數(shù)據(jù),爬蟲一般是把采集的數(shù)據(jù)放到一個文件或者RDBMS數(shù)據(jù)庫當中
三、擴展:通過爬蟲技術(shù)采集第三方網(wǎng)站數(shù)據(jù)
爬蟲技術(shù)就是通過讀取網(wǎng)頁/網(wǎng)站的界面結(jié)構(gòu),獲取網(wǎng)頁中嵌套的數(shù)據(jù)
爬蟲目前主要有兩種類型的爬蟲
- 通過代碼進行爬蟲
python寫的- 優(yōu)點:在于可以定制化爬蟲內(nèi)容
- 缺點:
1、編寫代碼,代碼是非常復雜
2、很多網(wǎng)站做了反爬蟲校驗,可能寫了代碼也無法爬取數(shù)據(jù)
- 通過可視化爬蟲工具爬蟲
- 優(yōu)點:不需要寫一行代碼,只需要點點點就可以定制化數(shù)據(jù)爬蟲,反爬蟲問題不用擔心
- 缺點:1、無法隨心所欲爬取數(shù)據(jù),2、可能會收費
- 八爪魚爬蟲工具、集搜客爬蟲工具…
四、Flume日志采集工具概述
Flume也是Apache開源的頂尖項目,專門用來采集海量的日志數(shù)據(jù)到指定的目的地。
Flume采集數(shù)據(jù)采用一種流式架構(gòu)思想,只要數(shù)據(jù)源有數(shù)據(jù),就可以源源不斷的采集數(shù)據(jù)源的數(shù)據(jù)到目的地
Flume的組成架構(gòu)
- Flume之所以可以實現(xiàn)采集不同數(shù)據(jù)源(不僅僅只包含日志文件數(shù)據(jù))到指定的目的地,源于Flume的設(shè)計機構(gòu)。
- Agent:一個Flume采集數(shù)據(jù)的進程,一個Flume軟件可以啟動多個Flume采集進程Agent
- Source:Flume的一個數(shù)據(jù)源組件,是Flume專門用來連接數(shù)據(jù)源的組件,一個Flume采集進程Agent中,Source組件可以有一個也可以有多個
- Channel:Flume中一個類似于緩存池的組件,緩存池的主要作用就是用來臨時保存source數(shù)據(jù)源采集的數(shù)據(jù),目的地需要數(shù)據(jù),從緩沖池中獲取,防止數(shù)據(jù)源數(shù)據(jù)產(chǎn)生過快,而目的地消費數(shù)據(jù)過慢,導致程序崩潰的問題。一個Agent中,可以存在多個Channel組件
- Sink:Flume中一個目的地(下沉地)組件,是Flume專門用來連接目的地的組件,一個Flume進程中,sink組件也可以有多個,但是一個sink只能從一個channel中獲取數(shù)據(jù)。不能一個sink從不同channel拉取數(shù)據(jù)
- event:Flume中數(shù)據(jù)傳輸單位。Flume采集數(shù)據(jù)源的數(shù)據(jù)時,會把數(shù)據(jù)源的數(shù)據(jù)封裝為一個個的event。
- 腳本文件xxx.conf:需要用戶自己編寫的,F(xiàn)lume采集數(shù)據(jù)時,數(shù)據(jù)源和目的地有很多種,因此如果我們采集數(shù)據(jù)時,我們必須自定義一個腳本文件,在腳本文件中需要定義采集的數(shù)據(jù)源的類型、channel管道的類型、sink的目的地的類型、以及source channel sink三者之間的關(guān)系。腳本文件定義成功之后,我們才能去根據(jù)腳本文件啟動Flume采集進程Agent
- 【注意】一個source只能連接一個數(shù)據(jù)源,一個sink只能連接一個目的地
Flume的采集數(shù)據(jù)的工作流程
- 首先我們先編寫xx.conf腳本文件定義我們的采集的數(shù)據(jù)源、目的地、管道的類型,定義成功之后我們根據(jù)腳本啟動Flume采集進程Agent。一旦當Flume采集進程啟動成功,source就會去監(jiān)聽數(shù)據(jù)源的數(shù)據(jù),一旦當數(shù)據(jù)源有數(shù)據(jù)產(chǎn)生,那么source組件會把數(shù)據(jù)源的數(shù)據(jù)封裝為一個個的event,然后source把event數(shù)據(jù)單位傳輸?shù)絚hannel管道中緩存,然后sink組件會從channel中拉取指定個數(shù)的event,將event中數(shù)據(jù)發(fā)送給sink連接的目的地。
Flume安裝部署:三部曲
-
1、上傳解壓
-
2、配置環(huán)境變量
-
export FLUME_HOME=/opt/app/flume-1.11.0 export PATH=$PATH:$FLUME_HOME/bin
-
-
-
3、修改配置文件
- conf/flume-env.sh
- bin/flume-ng
flume運行需要Java環(huán)境,文件中需要指定Flume運行需要的內(nèi)存容量
- conf/flume-env.sh
五、Flume采集數(shù)據(jù)的時候,核心是編寫Flume的采集腳本xxx.conf
Flume支持多種數(shù)據(jù)源、管道、目的地,我們采集數(shù)據(jù)的時候,并不是所有的數(shù)據(jù)源和目的地都要使用,而是使用我們需要的源頭和目的地。但是Flume不知道你需要什么數(shù)據(jù)源、需要什么目的地。
通過腳本文件指定我們采集的數(shù)據(jù)源、目的地、管道
腳本文件主要由五部分組成:
- 1、起別名
- 我們可以根據(jù)采集腳本啟動一個Flume進程Agent,一個Flume支持啟動多個Agent,Flume要求每一個Agent必須有自己的一個別名,F(xiàn)lume啟動的多個Agent的別名不能重復。
- 同時Flume一個Agent進程中,可以有多個source、多個channel、多個sink,如何區(qū)分多個組件?
我們還需要多Agent進程中的source、channel、sink起別名的 - Agent、source、channel、sink起別名
- 2、配置Source組件
- 我們一個Flume進程中,可能存在1個或者多個數(shù)據(jù)源,每一個source組件需要連接一個數(shù)據(jù)源,但是數(shù)據(jù)源到底是誰,如何連接,我們需要配置。
- 3、配置channel組件
- 一個Agent中,可能存在一個或者多個channel,channel也有很多種類型的,因此我們需要配置我們channel的類型以及channel的容量。
- 4、配置Sink組件
- 一個Agent,可以同時將數(shù)據(jù)下沉到多個目的地,一個sink只能連接一個目的地,目的地到底是誰,如何連接,需要配置sink。
- 5、組裝source、channel、sink(核心)
- 一個source的數(shù)據(jù)可以發(fā)送給多個channel,一個sink只能讀取一個channel的數(shù)據(jù)。因此我們需要根據(jù)業(yè)務(wù)邏輯配置source、channel、sink的連接關(guān)系。
六、Flume案例實操
1、采集一個網(wǎng)絡(luò)端口的數(shù)據(jù)到控制臺
1、分析案例的組件類型
- source:網(wǎng)絡(luò)端口 netcat
- channel:基于內(nèi)存的管道即可memory
- sink:控制臺–Flume的日志輸出logger
2、編寫腳本文件portToConsole.conf
# 1、配置agent、source、channel、sink的別名
demo.sources=s1
demo.channels=c1
demo.sinks=k1
# 2、配置source組件連接的數(shù)據(jù)源--不同數(shù)據(jù)源的配置項都不一樣 監(jiān)聽netcat type bind port
demo.sources.s1.type=netcat
demo.sources.s1.bind=localhost
demo.sources.s1.port=44444
# 3、配置channel組件的類型--不同類型的管道配置項也不一樣 基于內(nèi)存memory的管道
demo.channels.c1.type=memory
demo.channels.c1.capacity=1000
demo.channels.c1.transactionCapacity=200
# 4、配置sink組件連接的目的地--不同類型的sink配置項不一樣 基于logger的下沉地
demo.sinks.k1.type=logger
# 5、配置source channel sink之間的連接 source 連接channel sink也要連接channel
# 一個source的數(shù)據(jù)可以發(fā)送給多個channel 一個sink只能拉去一個channel的數(shù)據(jù)
demo.sources.s1.channels=c1
demo.sinks.k1.channel=c1
3、根據(jù)腳本文件啟動Flume采集程序
- flume-ng agent -n agent的別名(必須和文件中別名保持一致) -f xxx.conf的路徑 -Dflume.root.logger=INFO,console
4、測試
- 我們只需要給本地的44444端口發(fā)送數(shù)據(jù),看看Flume的控制臺能否把數(shù)據(jù)輸出即可
- 需要新建一個和Linux的連接窗口,然后使用
telnet localhost 44444 命令連接本地的44444端口發(fā)送數(shù)據(jù) - telnet軟件linux默認沒有安裝,需要使用yum安裝一下
yum install -y telnet - 必須先啟動flume采集程序,再telnet連接網(wǎng)絡(luò)端口發(fā)送數(shù)據(jù)
2、采集一個文件的數(shù)據(jù)控制臺
1、案例需求
- 現(xiàn)在有一個文件,文件源源不斷的記錄用戶的訪問日志信息,我們現(xiàn)在想通過Flume去監(jiān)聽這個文件,一旦當這個文件有新的用戶數(shù)據(jù)產(chǎn)生,把數(shù)據(jù)采集到flume的控制臺上
2、案例分析
- source:exec(將一個linux命令的輸出當作數(shù)據(jù)源、自己寫監(jiān)聽命令) 、taildir
- channel:memory
- sink:logger
3、編寫腳本文件
# 1、起別名
demo01.sources=s1
demo01.channels=c1
demo01.sinks=k1
# 2、定義數(shù)據(jù)源 exec linux命令 監(jiān)聽一個文件 tail -f|-F 文件路徑
demo01.sources.s1.type=exec
demo01.sources.s1.command=tail -F /root/a.log
# 3、定義管道
demo01.channels.c1.type=memory
demo01.channels.c1.capacity=1000
demo01.channels.c1.transactionCapacity=200
# 4、配置sink目的地 logger
demo01.sinks.k1.type=logger
# 5、關(guān)聯(lián)組件
demo01.sources.s1.channels=c1
demo01.sinks.k1.channel=c1
4、啟動
flume-ng agent -n demo01 -f /root/fileToConsole.cong -Dflume.root.logger=INFO,console
echo "zs" >> a.log
5、測試
3、采集一個文件夾下的新文件數(shù)據(jù)到控制臺
1、案例需求
- 有一個文件夾,文件夾下記錄著網(wǎng)站產(chǎn)生的很多日志數(shù)據(jù),而且日志文件不止一個,就想把文件夾下所有的文件數(shù)據(jù)采集到控制臺,同時如果這個文件夾下有新的數(shù)據(jù)文件產(chǎn)生,也會把新文件的數(shù)據(jù)全部采集到控制臺上。
2、案例分析
- source:Spooling Directory Source
- channel:memory
- sink:logger
3、編寫配置文件
# 1、起別名
demo01.sources=s1
demo01.channels=c1
demo01.sinks=k1
# 2、定義數(shù)據(jù)源 Spooling Directory Source
demo01.sources.s1.type=spooldir
demo01.sources.s1.spoolDir=/root/demo
# 3、定義管道
demo01.channels.c1.type=memory
demo01.channels.c1.capacity=1000
demo01.channels.c1.transactionCapacity=200
# 4、配置sink目的地 logger
demo01.sinks.k1.type=logger
# 5、關(guān)聯(lián)組件
demo01.sources.s1.channels=c1
demo01.sinks.k1.channel=c1
4、運行
5、測試
4、采集一個網(wǎng)絡(luò)端口的數(shù)據(jù)到HDFS中
1、案例需求
- 監(jiān)控一個網(wǎng)絡(luò)端口產(chǎn)生的數(shù)據(jù),一旦當端口產(chǎn)生新的數(shù)據(jù),就把數(shù)據(jù)采集到HDFS上以文件的形式進行存放
2、案例分析
- source:網(wǎng)絡(luò)端口netcat
- channel:基于內(nèi)存的管道 memory
- sink:HDFS
3、編寫腳本文件
# 1、配置agent、source、channel、sink的別名
demo.sources=s1
demo.channels=c1
demo.sinks=k1
# 2、配置source組件連接的數(shù)據(jù)源--不同數(shù)據(jù)源的配置項都不一樣 監(jiān)聽netcat type bind port
demo.sources.s1.type=netcat
demo.sources.s1.bind=localhost
demo.sources.s1.port=44444
# 3、配置channel組件的類型--不同類型的管道配置項也不一樣 基于內(nèi)存memory的管道
demo.channels.c1.type=memory
demo.channels.c1.capacity=1000
demo.channels.c1.transactionCapacity=200
# 4、配置sink組件連接的目的地--基于HDFS的
demo.sinks.k1.type=hdfs
# 配置采集到HDFS上的目錄 數(shù)據(jù)在目錄下以文件的形式進行存放 文件的格式 FlumeData.時間戳
demo.sinks.k1.hdfs.path=hdfs://single:9000/flume
# 目錄下生成的文件的前綴 如果沒有配置 默認就是FlumeData
demo.sinks.k1.hdfs.filePrefix=collect
# 指定生成的文件的后綴 默認是沒有后綴的 生成的文件的格式collect.時間戳.txt
demo.sinks.k1.hdfs.fileSuffix=txt
# 目錄下采集的數(shù)據(jù)并不是記錄到一個文件中,文件是會滾動生成新的文件的
# 滾動的規(guī)則有三種:1、基于時間 2、基于文件的容量滾動 3、基于文件的記錄的event數(shù)量進行滾動
# 默認值: 時間30s 容量 1024b event 10
# 時間滾動規(guī)則 如果值設(shè)置為0 那么就代表不基于時間生成新的文件
demo.sinks.k1.hdfs.rollInterval=60
# 文件容量的滾動規(guī)則 單位b 如果設(shè)置為0 代表不基于容量滾動生成新的文件
demo.sinks.k1.hdfs.rollSize=100
# event數(shù)量的滾動規(guī)則 一般設(shè)置為0 代表不急于event數(shù)量滾動生成新的文件
demo.sinks.k1.hdfs.rollCount=0
# 文件在HDFS上的默認的存儲格式是SequenceFile文件格式
demo.sinks.k1.hdfs.fileType=DataStream
# 設(shè)置event的頭部使用本地時間戳作為header
demo.sinks.k1.hdfs.useLocalTimeStamp=true
# 5、配置source channel sink之間的連接 source 連接channel sink也要連接channel
# 一個source的數(shù)據(jù)可以發(fā)送給多個channel 一個sink只能拉去一個channel的數(shù)據(jù)
demo.sources.s1.channels=c1
demo.sinks.k1.channel=c1
4、啟動采集進程(必須先啟動HDFS)
【注意】flume的依賴的guava和hadoop的guava有沖突,需要將flume的lib目錄下的guava依賴刪除,同時將hadoop的share/common/lib/guava依賴復制到flume的lib目錄下
5、多數(shù)據(jù)源和多目的地案例
1、案例需求
- 現(xiàn)在有三個數(shù)據(jù)源:1、網(wǎng)絡(luò)端口 2、文件 3、文件夾
想把這三個數(shù)據(jù)源的數(shù)據(jù)全部采集到HDFS的指定目錄下,同時還要求把文件數(shù)據(jù)源的數(shù)據(jù)在控制臺上同步進行展示
2、案例分析
- source:netcat exec spooldir
- channel:兩個基于內(nèi)存的
- sink:1、hdfs 2、logger
3、編寫腳本文件
# 1、起別名 三個數(shù)據(jù)源 兩個管道 兩個sink
more.sources=s1 s2 s3
more.channels=c1 c2
more.sinks=k1 k2
# 2、定義數(shù)據(jù)源 三個
# 定義s1數(shù)據(jù)源 s1連接的是網(wǎng)絡(luò)端口
more.sources.s1.type=netcat
more.sources.s1.bind=localhost
more.sources.s1.port=44444
# 定義s2的數(shù)據(jù)源 s2連接的是文件 /root/more.log文件
more.sources.s2.type=exec
more.sources.s2.command=tail -F /root/more.log
# 定義s3的數(shù)據(jù)源 s3監(jiān)控的是一個文件夾 /root/more
more.sources.s3.type=spooldir
more.sources.s3.spoolDir=/root/more
# 3、定義channel管道 兩個 基于內(nèi)存的
# 定義c1管道 c1管道需要接受三個數(shù)據(jù)源的數(shù)據(jù)
more.channels.c1.type=memory
more.channels.c1.capacity=20000
more.channels.c1.transactionCapacity=5000
# 定義c2管道 c2管道只需要接收一個數(shù)據(jù)源 s2的數(shù)據(jù)
more.channels.c2.type=memory
more.channels.c2.capacity=5000
more.channels.c2.transactionCapacity=500
# 4、定義sink 兩個 HDFS logger
# 定義k1這個sink 基于hdfs
more.sinks.k1.type=hdfs
# hdfs支持生成動態(tài)目錄--基于時間的 /more/2023-08-25
more.sinks.k1.hdfs.path=hdfs://single:9000/more/%Y-%m-%d
# 如果設(shè)置了動態(tài)目錄,那么必須指定動態(tài)目錄的滾動規(guī)則-多長時間生成一個新的目錄
more.sinks.k1.hdfs.round=true
more.sinks.k1.hdfs.roundValue=24
more.sinks.k1.hdfs.roundUnit=hour
more.sinks.k1.hdfs.filePrefix=collect
more.sinks.k1.hdfs.fileSuffix=.txt
more.sinks.k1.hdfs.rollInterval=0
more.sinks.k1.hdfs.rollSize=134217728
more.sinks.k1.hdfs.rollCount=0
more.sinks.k1.hdfs.fileType=DataStream
more.sinks.k1.hdfs.useLocalTimeStamp=true
# 定義k2 logger
more.sinks.k2.type=logger
# 5、組合agent的組件
more.sources.s1.channels=c1
more.sources.s2.channels=c1 c2
more.sources.s3.channels=c1
more.sinks.k1.channel=c1
more.sinks.k2.channel=c2
6、多Flume進程組合的案例
1、案例需求
- 三個Flume進程,其中第一個Flume采集端口的數(shù)據(jù),第二個Flume采集文件的數(shù)據(jù),要求第一個Flume進程和第二個Flume進程將采集到的數(shù)據(jù)發(fā)送給第三個Flume進程,第三個Flume進程將接受到的數(shù)據(jù)采集到控制臺上。
2、案例分析
- first agent
- source :netcat
- channel:memory
- sink:avro
- second agent
- source:exec
- channel:memory
- sink:avro
- third agent
- source:avro
- channel:memory
- sink:logger
3、編寫腳本文件
-
第一個腳本監(jiān)聽端口到avro的
-
first.sources=s1 first.channels=c1 first.sinks=k1 first.sources.s1.type=netcat first.sources.s1.bind=localhost first.sources.s1.port=44444 first.channels.c1.type=memory first.channels.c1.capacity=1000 first.channels.c1.transactionCapacity=500 first.sinks.k1.type=avro first.sinks.k1.hostname=localhost first.sinks.k1.port=60000 first.sources.s1.channels=c1 first.sinks.k1.channel=c1
-
-
第二腳本文件監(jiān)聽文件數(shù)據(jù)到avro的
-
second.sources=s1 second.channels=c1 second.sinks=k1 second.sources.s1.type=exec second.sources.s1.command=tail -F /root/second.txt second.channels.c1.type=memory second.channels.c1.capacity=1000 second.channels.c1.transactionCapacity=500 second.sinks.k1.type=avro second.sinks.k1.hostname=localhost second.sinks.k1.port=60000 second.sources.s1.channels=c1 second.sinks.k1.channel=c1
-
-
第三個腳本文件監(jiān)聽avro匯總的數(shù)據(jù)到logger的
-
third.sources=s1 third.channels=c1 third.sinks=k1 # avro類型當作source 需要bind和port參數(shù) 如果當作sink使用 需要hostname port third.sources.s1.type=avro third.sources.s1.bind=localhost third.sources.s1.port=60000 third.channels.c1.type=memory third.channels.c1.capacity=1000 third.channels.c1.transactionCapacity=500 third.sinks.k1.type=logger third.sources.s1.channels=c1 third.sinks.k1.channel=c1
-
4、啟動腳本程序文章來源:http://www.zghlxwxcb.cn/news/detail-709655.html
-
先啟動第三個腳本,再啟動第一個和第二腳本文章來源地址http://www.zghlxwxcb.cn/news/detail-709655.html
-
flume-ng agent -n third -f /root/third.conf -Dflume.root.logger=INFO,console flume-ng agent -n first -f /root/first.conf -Dflume.root.logger=INFO,console flume-ng agent -n second -f /root/second.conf -Dflume.root.logger=INFO,console
-
到了這里,關(guān)于Hadoop生態(tài)圈中的Flume數(shù)據(jù)日志采集工具的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!