?? ? ? ?隨著時(shí)間的積累,日志數(shù)據(jù)會(huì)越來越多,當(dāng)您需要查看并分析龐雜的日志數(shù)據(jù)時(shí),可通過Filebeat+Kafka+Logstash+Elasticsearch采集日志數(shù)據(jù)到Elasticsearch中,并通過Kibana進(jìn)行可視化展示與分析。本文介紹具體的實(shí)現(xiàn)方法。
一、背景信息
Kafka是一種分布式、高吞吐、可擴(kuò)展的消息隊(duì)列服務(wù),廣泛用于日志收集、監(jiān)控?cái)?shù)據(jù)聚合、流式數(shù)據(jù)處理、在線和離線分析等大數(shù)據(jù)領(lǐng)域,已成為大數(shù)據(jù)生態(tài)中不可或缺的部分。
在實(shí)際應(yīng)用場(chǎng)景中,為了滿足大數(shù)據(jù)實(shí)時(shí)檢索的需求,您可以使用Filebeat采集日志數(shù)據(jù),并輸出到Kafka中。Kafka實(shí)時(shí)接收Filebeat采集的數(shù)據(jù),并輸出到Logstash中。輸出到Logstash中的數(shù)據(jù)在格式或內(nèi)容上可能不能滿足您的需求,此時(shí)可以通過Logstash的filter插件過濾數(shù)據(jù)。最后將滿足需求的數(shù)據(jù)輸出到Elasticsearch中進(jìn)行分布式檢索,并通過Kibana進(jìn)行數(shù)據(jù)分析與展示。簡(jiǎn)單流程如下。
二、操作流程
1、準(zhǔn)備工作
完成環(huán)境準(zhǔn)備,包括創(chuàng)建Elasticsearch、Logstash、ECS和消息隊(duì)列 Kafka 版實(shí)例、創(chuàng)建Topic和Consumer Group等。
2、步驟一:安裝并配置Filebeat
? 安裝并配置Filebeat,設(shè)置input為系統(tǒng)日志,output為Kafka,將日志數(shù)據(jù)采集到Kafka的指定Topic中。
3、步驟二:配置Logstash管道
配置Logstash管道的input為Kafka,output為阿里云Elasticsearch,使用Logstash消費(fèi)Topic中的數(shù)據(jù)并傳輸?shù)桨⒗镌艵lasticsearch中。
4、步驟三:查看日志消費(fèi)狀態(tài)
在消息隊(duì)列Kafka中查看日志數(shù)據(jù)的消費(fèi)的狀態(tài),驗(yàn)證日志數(shù)據(jù)是否采集成功。
5、步驟四:通過Kibana過濾日志數(shù)據(jù)
在Kibana控制臺(tái)的Discover頁面,通過Filter過濾出Kafka相關(guān)的日志。
三、步驟一:安裝并配置Filebeat
-
連接ECS服務(wù)器。
-
安裝Filebeat。
?本文以6.8.5版本為例,安裝命令如下,詳細(xì)信息請(qǐng)參見Install Filebeat。
curl -L -O https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-6.8.5-linux-x86_64.tar.gz tar xzvf filebeat-6.8.5-linux-x86_64.tar.gz
-
執(zhí)行以下命令,進(jìn)入Filebeat安裝目錄,創(chuàng)建并配置filebeat.kafka.yml文件。
cd filebeat-6.8.5-linux-x86_64 vi filebeat.kafka.yml
filebeat.kafka.yml配置如下。
filebeat.prospectors: - type: log enabled: true paths: - /var/log/*.log output.kafka: hosts: ["alikafka-post-cn-zvp2n4v7****-1-vpc.alikafka.aliyuncs.com:9092"] topic: estest version: 0.10.2
重要
當(dāng)Filebeat為7.0及以上版本時(shí),filebeat.prospectors需要替換為filebeat.inputs。
參數(shù)
說明
type
輸入類型。設(shè)置為log,表示輸入源為日志。
enabled
設(shè)置配置是否生效:
-
true:生效
-
false:不生效
paths
需要監(jiān)控的日志文件的路徑。多個(gè)日志可在當(dāng)前路徑下另起一行寫入日志文件路徑。
hosts
消息隊(duì)列Kafka實(shí)例的單個(gè)接入點(diǎn),可在實(shí)例詳情頁面獲取,詳情請(qǐng)參見查看接入點(diǎn)。由于本文使用的是VPC實(shí)例,因此使用默認(rèn)接入點(diǎn)中的任意一個(gè)接入點(diǎn)。
topic
日志輸出到消息隊(duì)列Kafka的Topic,請(qǐng)指定為您已創(chuàng)建的Topic。
version
Kafka的版本,可在消息隊(duì)列Kafka的實(shí)例詳情頁面獲取。
重要
-
不配置此參數(shù)會(huì)報(bào)錯(cuò)。
-
由于不同版本的Filebeat支持的Kafka版本不同,例如8.2及以上版本的Filebeat支持的Kafka版本為2.2.0,因此version需要設(shè)置為Filebeat支持的Kafka版本,否則會(huì)出現(xiàn)類似報(bào)錯(cuò):
Exiting: error initializing publisher: unknown/unsupported kafka vesion '2.2.0' accessing 'output.kafka.version' (source:'filebeat.kafka.yml')
,詳細(xì)信息請(qǐng)參見version。
-
-
啟動(dòng)Filebeat。
./filebeat -e -c filebeat.kafka.yml
四、步驟二:配置Logstash管道
- 進(jìn)入阿里云Elasticsearch控制臺(tái)的Logstash頁面。
- 進(jìn)入目標(biāo)實(shí)例。
- 在頂部菜單欄處,選擇地域。
- 在Logstash實(shí)例中單擊目標(biāo)實(shí)例ID。
-
在左側(cè)導(dǎo)航欄,單擊管道管理。
-
單擊創(chuàng)建管道。
-
在創(chuàng)建管道任務(wù)頁面,輸入管道ID并配置管道。
本文使用的管道配置如下。
input { kafka { bootstrap_servers => ["alikafka-post-cn-zvp2n4v7****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-zvp2n4v7****-2-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-zvp2n4v7****-3-vpc.alikafka.aliyuncs.com:9092"] group_id => "es-test" topics => ["estest"] codec => json } } filter { } output { elasticsearch { hosts => "http://es-cn-n6w1o1x0w001c****.elasticsearch.aliyuncs.com:9200" user =>"elastic" password =>"<your_password>" index => "kafka‐%{+YYYY.MM.dd}" } }
表 1.?input參數(shù)說明 參數(shù)
說明
bootstrap_servers
消息隊(duì)列Kafka實(shí)例的接入點(diǎn),可在實(shí)例詳情頁面獲取,詳情請(qǐng)參見查看接入點(diǎn)。由于本文使用的是VPC實(shí)例,因此使用默認(rèn)接入點(diǎn)。
group_id
指定為您已創(chuàng)建的Consumer Group的名稱。
topics
指定為您已創(chuàng)建的Topic的名稱,需要與Filebeat中配置的Topic名稱保持一致。
codec
設(shè)置為json,表示解析JSON格式的字段,便于在Kibana中分析。
表 2.?output參數(shù)說明 參數(shù)
說明
hosts
阿里云Elasticsearch的訪問地址,取值為
http://<阿里云Elasticsearch實(shí)例的私網(wǎng)地址>:9200
。說明
您可在阿里云Elasticsearch實(shí)例的基本信息頁面獲取其私網(wǎng)地址,詳情請(qǐng)參見查看實(shí)例的基本信息。
user
訪問阿里云Elasticsearch的用戶名,默認(rèn)為elastic。您也可以使用自建用戶,詳情請(qǐng)參見通過Elasticsearch X-Pack角色管理實(shí)現(xiàn)用戶權(quán)限管控。
password
訪問阿里云Elasticsearch的密碼,在創(chuàng)建實(shí)例時(shí)設(shè)置。如果忘記密碼,可進(jìn)行重置,重置密碼的注意事項(xiàng)及操作步驟請(qǐng)參見重置實(shí)例訪問密碼。
index
索引名稱。設(shè)置為
kafka‐%{+YYYY.MM.dd}
表示索引名稱以kafka為前綴,以日期為后綴,例如kafka-2020.05.27
。更多Config配置詳情請(qǐng)參見Logstash配置文件說明。
如果您有多topic的數(shù)據(jù)同步需求,需要在kafka中添加新的topic,然后在Logstash的管道配置中添加input。示例如下:
input { kafka { bootstrap_servers => ["alikafka-post-cn-zvp2n4v7****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-zvp2n4v7****-2-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-zvp2n4v7****-3-vpc.alikafka.aliyuncs.com:9092"] group_id => "es-test" topics => ["estest"] codec => json } kafka { bootstrap_servers => ["alikafka-post-cn-zvp2n4v7****-1-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-zvp2n4v7****-2-vpc.alikafka.aliyuncs.com:9092,alikafka-post-cn-zvp2n4v7****-3-vpc.alikafka.aliyuncs.com:9092"] group_id => "es-test-2" topics => ["estest_2"] codec => json } }
-
單擊下一步,配置管道參數(shù)。
參數(shù)
說明
管道工作線程
并行執(zhí)行管道的Filter和Output的工作線程數(shù)量。當(dāng)事件出現(xiàn)積壓或CPU未飽和時(shí),請(qǐng)考慮增大線程數(shù),更好地使用CPU處理能力。默認(rèn)值:實(shí)例的CPU核數(shù)。
管道批大小
單個(gè)工作線程在嘗試執(zhí)行Filter和Output前,可以從Input收集的最大事件數(shù)目。較大的管道批大小可能會(huì)帶來較大的內(nèi)存開銷。您可以設(shè)置LS_HEAP_SIZE變量,來增大JVM堆大小,從而有效使用該值。默認(rèn)值:125。
管道批延遲
創(chuàng)建管道事件批時(shí),將過小的批分派給管道工作線程之前,要等候每個(gè)事件的時(shí)長(zhǎng),單位為毫秒。默認(rèn)值:50ms。
隊(duì)列類型
用于事件緩沖的內(nèi)部排隊(duì)模型??蛇x值:
-
MEMORY:默認(rèn)值。基于內(nèi)存的傳統(tǒng)隊(duì)列。
-
PERSISTED:基于磁盤的ACKed隊(duì)列(持久隊(duì)列)。
隊(duì)列最大字節(jié)數(shù)
請(qǐng)確保該值小于您的磁盤總?cè)萘俊DJ(rèn)值:1024 MB。
隊(duì)列檢查點(diǎn)寫入數(shù)
啟用持久性隊(duì)列時(shí),在強(qiáng)制執(zhí)行檢查點(diǎn)之前已寫入事件的最大數(shù)目。設(shè)置為0,表示無限制。默認(rèn)值:1024。
警告
配置完成后,需要保存并部署才能生效。保存并部署操作會(huì)觸發(fā)實(shí)例重啟,請(qǐng)?jiān)诓挥绊憳I(yè)務(wù)的前提下,繼續(xù)執(zhí)行以下步驟。
-
-
單擊保存或者保存并部署。
-
保存:將管道信息保存在Logstash里并觸發(fā)實(shí)例變更,配置不會(huì)生效。保存后,系統(tǒng)會(huì)返回管道管理頁面??稍?strong id="uicontrol-85q-rau-utk">管道列表區(qū)域,單擊操作列下的立即部署,觸發(fā)實(shí)例重啟,使配置生效。
-
保存并部署:保存并且部署后,會(huì)觸發(fā)實(shí)例重啟,使配置生效。
-
五、步驟三:查看日志消費(fèi)狀態(tài)
-
?進(jìn)入消息隊(duì)列Kafka控制臺(tái)。
-
參見查看消費(fèi)狀態(tài),查看詳細(xì)消費(fèi)狀態(tài)。
預(yù)期結(jié)果如下:
六、步驟四:通過Kibana過濾日志數(shù)據(jù)
- 登錄目標(biāo)阿里云Elasticsearch實(shí)例的Kibana控制臺(tái),根據(jù)頁面提示進(jìn)入Kibana主頁。
-
創(chuàng)建一個(gè)索引模式。
-
在左側(cè)導(dǎo)航欄,單擊Management。
-
在Kibana區(qū)域,單擊Index Patterns。
-
單擊Create index pattern。
-
輸入Index pattern(本文使用kafka-*),單擊Next step。
-
選擇Time Filter field name(本文選擇@timestamp),單擊Create index pattern。
-
-
在左側(cè)導(dǎo)航欄,單擊Discover。
-
從頁面左側(cè)的下拉列表中,選擇您已創(chuàng)建的索引模式(kafka-*)。
-
在頁面右上角,選擇一段時(shí)間,查看對(duì)應(yīng)時(shí)間段內(nèi)的Filebeat采集的日志數(shù)據(jù)。
-
單擊Add a filter,在Add filter頁面中設(shè)置過濾條件,查看符合對(duì)應(yīng)過濾條件的日志數(shù)據(jù)。
七、常見問題
Q:同步日志數(shù)據(jù)出現(xiàn)問題,管道一直在生效中,無法將數(shù)據(jù)導(dǎo)入Elasticsearch,如何解決?
A:查看Logstash實(shí)例的主日志是否有報(bào)錯(cuò),根據(jù)報(bào)錯(cuò)判斷原因,具體操作請(qǐng)參見查詢?nèi)罩尽3R姷脑蚣敖鉀Q方法如下。
原因 |
解決方法 |
Kafka的接入點(diǎn)不正確。 |
參見查看接入點(diǎn)獲取正確的接入點(diǎn)。完成后,修改管道配置替換錯(cuò)誤接入點(diǎn)。 |
Logstash與Kafka不在同一VPC下。 |
重新購買同一VPC下的實(shí)例。購買后,修改現(xiàn)有管道配置。 說明 VPC實(shí)例只能通過專有網(wǎng)絡(luò)VPC訪問 云消息隊(duì)列 Kafka 版。 |
Kafka或Logstash集群的配置太低,例如使用了測(cè)試版集群。 |
升級(jí)集群規(guī)格,完成后,刷新實(shí)例,觀察變更進(jìn)度。升級(jí)Logstash實(shí)例規(guī)格的具體操作,請(qǐng)參見升配集群;升級(jí)Kafka實(shí)例規(guī)格的具體操作,請(qǐng)參見升級(jí)實(shí)例配置。 |
管道配置中包含了file_extend,但沒有安裝logstash-output-file_extend插件。 |
選擇以下任意一種方式處理:
|
到了這里,關(guān)于使用Filebeat+Kafka+Logstash+Elasticsearch構(gòu)建日志分析系統(tǒng)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!