部署ELK+Kafka+Filebeat日志收集分析系統(tǒng)
一、ELK 簡(jiǎn)介
ELK是三個(gè)軟件的統(tǒng)稱,即Elasticsearch、Logstash和Kibana三個(gè)開(kāi)源軟件的縮寫(xiě)。這三款軟件都是開(kāi)源軟件,通常配合使用,并且都先后歸于Elastic.co企業(yè)名下,故被簡(jiǎn)稱為ELK協(xié)議棧。ELK主要用于部署在企業(yè)架構(gòu)中,收集多臺(tái)設(shè)備上多個(gè)服務(wù)的日志信息,并將其統(tǒng)一整合后提供給用戶。它可以從任何來(lái)源、任何格式進(jìn)行日志搜索、分析與可視化展示。
1、ELK日志分析系統(tǒng)組成
在ELK架構(gòu)中,Elasticsearch、Logstash和Kibana三款軟件作用如下:
elasticsearch (es) :通過(guò)搭建群集;存儲(chǔ)日志數(shù)據(jù),索引日志數(shù)據(jù)
logstash :收集日志,收集到了后給es存儲(chǔ)
kibana :視圖形式展現(xiàn)日志信息,更加人性化
2、Elasticsearch(es)
Elasticsearch是一個(gè)高度可擴(kuò)展的全文搜索和分析引擎,基于Apache Lucence(事實(shí)上,Lucence也是百度所采用的搜索引擎)構(gòu)建,能夠?qū)Υ笕萘康臄?shù)據(jù)進(jìn)行接近實(shí)時(shí)的存儲(chǔ)、搜索和分析操作。
3、Logstash
Logstash是一個(gè)數(shù)據(jù)收集引擎,它可以動(dòng)態(tài)的從各種數(shù)據(jù)源搜集數(shù)據(jù),并對(duì)數(shù)據(jù)進(jìn)行過(guò)濾、分析和統(tǒng)一格式等操作,并將輸出結(jié)果存儲(chǔ)到指定位置上。Logstash支持普通的日志文件和自定義Json格式的日志解析。
4、Kibana
Kibana是一個(gè)數(shù)據(jù)分析和可視化平臺(tái),通常與Elasticsearch配合使用,用于對(duì)其中的數(shù)據(jù)進(jìn)行搜索、分析,并且以統(tǒng)計(jì)圖標(biāo)的形式展示。
5、日志處理步驟
1.將APP servers的日志進(jìn)行集中化管理到Logstash agent。
2.將日志格式化(Logstash)并輸出到Elasticsearch cluster
3.對(duì)格式化后的數(shù)據(jù)進(jìn)行索引和存儲(chǔ)(Elasticsearch)
4.前端數(shù)據(jù)的展示(Kibana)
5.可以在線查看界面化展示。
- ELK是由Elasticsearch、Logstash、Kiban三個(gè)開(kāi)源軟件的組合。
- Logstash 收集APP server產(chǎn)生的log,然后存放到Elasticsearch集群節(jié)點(diǎn)中
- kibana從Elasticsearch集群節(jié)點(diǎn)中查詢數(shù)據(jù)生成圖表,再返回給Brower。
二、Elasticsearch介紹
- 提供了一個(gè)分布式多用戶能力的全文搜索引擎;
- 是一個(gè)基于Lucene的搜索服務(wù)器;
- 基于restful web接口;
- 使用java開(kāi)發(fā);
- 作為apache許可條款下的開(kāi)放源碼發(fā)布,是第二流行的企業(yè)搜索引擎;
- 被設(shè)計(jì)用于云計(jì)算中,能夠達(dá)到實(shí)時(shí)搜索、穩(wěn)定、可靠、快速、安裝實(shí)用方便的需求。
1、Elasticsearch核心概念
接近實(shí)時(shí)(NRT)
elasticsearch是一個(gè)接近實(shí)時(shí)的搜索平臺(tái),這意味著,從索引一個(gè)文檔直到這個(gè)文檔能夠被搜索到有一個(gè)輕微的延遲(通常是1秒)
cluster集群,ES是一個(gè)分布式的系統(tǒng)
- 一個(gè)集群就是由一個(gè)或多個(gè)節(jié)點(diǎn)組織在一起,它們共同持有你整個(gè)的數(shù)據(jù),并一起提供索引和搜索功能。其中一個(gè)節(jié)點(diǎn)為主節(jié)點(diǎn),這個(gè)主節(jié)點(diǎn)是可以通過(guò)選舉產(chǎn)生的,并提供跨節(jié)點(diǎn)的聯(lián)合索引和搜索的功能。集群有一個(gè)唯一性標(biāo)示的名字,默認(rèn)是elasticsearch,集群名字很重要,每個(gè)節(jié)點(diǎn)是基于集群名字加入到其集群中的。因此,確保在不同環(huán)境中使用不同的集群名字。
- —個(gè)集群可以只有一個(gè)節(jié)點(diǎn)。強(qiáng)烈建議在配置elasticsearch時(shí),配置成集群模式。es具有集群機(jī)制,節(jié)點(diǎn)通過(guò)集群名稱加入到集群中,同時(shí)在集群中的節(jié)點(diǎn)會(huì)有一個(gè)自己的唯一身份標(biāo)識(shí)(自己的名稱)
Node節(jié)點(diǎn),就是集群中的一臺(tái)服務(wù)器
節(jié)點(diǎn)就是一臺(tái)單一的服務(wù)器,是集群的一部分,存儲(chǔ)數(shù)據(jù)并參與集群的索引和搜索功能。像集群一樣,節(jié)點(diǎn)也是通過(guò)名字來(lái)標(biāo)識(shí),默認(rèn)是在節(jié)點(diǎn)啟動(dòng)時(shí)隨機(jī)分配的字符名。當(dāng)然,你可以自己定義。該名字也很重要,在集群中用于識(shí)別服務(wù)器對(duì)應(yīng)的節(jié)點(diǎn)。
節(jié)點(diǎn)可以通過(guò)指定集群名字來(lái)加入到集群中。默認(rèn)情況,每個(gè)節(jié)點(diǎn)被設(shè)置成加入到elasticsearch集群。如果啟動(dòng)了多個(gè)節(jié)點(diǎn),假設(shè)能自動(dòng)發(fā)現(xiàn)對(duì)方,他們將會(huì)自動(dòng)組建一個(gè)名為elasticsearch的集群。
index索引
一個(gè)索引就是一個(gè)擁有幾分相似特征的文檔的集合。比如說(shuō),你可以有一個(gè)客戶數(shù)據(jù)的索引、一個(gè)產(chǎn)品目錄的索引、還有一個(gè)訂單數(shù)據(jù)的索引。一個(gè)索引用一個(gè)名字來(lái)標(biāo)識(shí)(必須全部是小寫(xiě)字母組合),并且當(dāng)我們要對(duì)相應(yīng)的索引中的文檔進(jìn)行索引、收縮、更新和刪除的時(shí)候,都要用到這個(gè)名字。在一個(gè)集群中,可以定義多個(gè)索引。(索引相對(duì)于關(guān)系型數(shù)據(jù)庫(kù)的庫(kù))
類(lèi)型相對(duì)于關(guān)系型數(shù)據(jù)庫(kù)的表 ——》索引(庫(kù))-》類(lèi)型(表)-》文檔(記錄)
類(lèi)型(type)
類(lèi)型(type)在一個(gè)索引中,你可以定義一種或多種類(lèi)型。一個(gè)類(lèi)型是你的索引的一個(gè)邏輯上的分類(lèi)分區(qū),其寓意完全由你來(lái)定義。通常,會(huì)為具有一組共同字段的文檔定義一個(gè)類(lèi)型。比如:我們假設(shè)運(yùn)營(yíng)一個(gè)博客平臺(tái)并且將所有的數(shù)據(jù)存儲(chǔ)到一個(gè)索引中,在這個(gè)索引中,你可以為用戶數(shù)據(jù)定義一個(gè)類(lèi)型,為博客數(shù)據(jù)定義一個(gè)類(lèi)型,也可以為評(píng)論數(shù)據(jù)定義另一個(gè)類(lèi)型。(類(lèi)型相對(duì)于關(guān)系型數(shù)據(jù)庫(kù)的表)
分片和副本(shards & replicas)
在實(shí)際情況下,索引存儲(chǔ)的數(shù)據(jù)可能超過(guò)單個(gè)節(jié)點(diǎn)的硬件限制。如一個(gè)10億文檔需1TB空間可能不適合存儲(chǔ)在單個(gè)節(jié)點(diǎn)的磁盤(pán)上或者從單個(gè)節(jié)點(diǎn)搜索請(qǐng)求太慢了。為了解決這個(gè)問(wèn)題,elasticsearch提供將索引分成多個(gè)分片的功能。當(dāng)在創(chuàng)建索引時(shí),可以定義想要分片的數(shù)量。每一個(gè)分片就是一個(gè)全功能的獨(dú)立的索引,可以位于集群中任何節(jié)點(diǎn)上。
2、開(kāi)啟分片副本的主要原因
Elasticsearch采用分片式,可以進(jìn)行水平分割橫向擴(kuò)展,增大存儲(chǔ)量;分布式并行跨分片操作,提高性能和吞吐量。
- 高可用性,以應(yīng)對(duì)分片或者節(jié)點(diǎn)故障,處于這個(gè)原因,分片副本要在不同節(jié)點(diǎn)上;
- 提高I/O性能,增大吞吐量,搜索可以并行在所有副本執(zhí)行??傊?,每個(gè)索引可以被分成多個(gè)分片,一個(gè)索引也可以被復(fù)制0次或者多次。一旦復(fù)制了,每個(gè)索引就有了主分片(可以作為復(fù)制源的原始分片)和復(fù)制分片(主分片的拷貝)之分。分片和副本的數(shù)量可以在索引創(chuàng)建的時(shí)候指定,在索引創(chuàng)建之后,你可以在任何時(shí)候動(dòng)態(tài)改變副本的數(shù)量,但是你事后無(wú)法改變分片的數(shù)量。默認(rèn)情況下,Elasticsearch中的每個(gè)索引被分片為5個(gè)主分片和1個(gè)副本,這意味著,如果你的集群中至少有兩個(gè)節(jié)點(diǎn)的情況下,你的索引將會(huì)有5個(gè)主分片和另外5個(gè)副本分片(1個(gè)完全拷貝),這樣的話每個(gè)索引總共就有10個(gè)分片。
三、Logstash
1.—款強(qiáng)大的數(shù)據(jù)處理工具
2.可實(shí)現(xiàn)數(shù)據(jù)傳輸、格式處理、格式化輸出
3.數(shù)據(jù)輸入(從業(yè)務(wù)輸入)、數(shù)據(jù)加工(如過(guò)濾、改寫(xiě)等)以及數(shù)據(jù)輸出(輸出到Elasticsearch群集)
4.由LRuby語(yǔ)言編寫(xiě),基于消息(message-based)的簡(jiǎn)單架構(gòu),并運(yùn)行在Java虛擬機(jī)(JVM)上;
5.不同于分離的代理端(agent)或主機(jī)端(server),Logstash可配置單一的代理端(agent)與其他開(kāi)源軟件結(jié)合,以實(shí)現(xiàn)不同的功能。
1、Logstash的主要組件
- Shipper:日志收集負(fù)責(zé)監(jiān)控本地日志文件的變化,及時(shí)把日志文件的最新內(nèi)容收集起來(lái)。通常,遠(yuǎn)程代理端(agent)只需要運(yùn)行這個(gè)組件即可。
- Indexer:日志存儲(chǔ)負(fù)責(zé)接受日志并寫(xiě)入到本地文件。
- Broker:日志hub負(fù)責(zé)鏈接多個(gè)shipper和對(duì)應(yīng)數(shù)目的indexer。
- Search and Storage允許對(duì)事件進(jìn)行搜索和存儲(chǔ)。
- Web Interface基于web的展示界面。以上組件在Logstash架構(gòu)中可以獨(dú)立部署,因此提供了很好的集群擴(kuò)展性
四、Kibana介紹
1、Kibana概述
- 一個(gè)針對(duì)Elasticsearch的開(kāi)源分析及可視化平臺(tái);
- 搜索、查看存儲(chǔ)在Elasticsearch索引中的數(shù)據(jù);
- 通過(guò)各種圖標(biāo)進(jìn)行高級(jí)數(shù)據(jù)分析及展示;
- 讓海量數(shù)據(jù)更容易理解;
- 操作簡(jiǎn)單,基于瀏覽器地用戶界面就可以快速創(chuàng)建儀表板(dashboard)實(shí)時(shí)顯示Elasticsearch查詢動(dòng)態(tài);
- 設(shè)置安裝Kibana非常簡(jiǎn)單,無(wú)需編寫(xiě)代碼,幾分鐘內(nèi)就可以完成Kibana安裝并啟動(dòng)Elasticsearch監(jiān)測(cè)。
2、Kibana主要功能
Elasticsearch無(wú)縫之集成。Kibana架構(gòu)為Elasticsearch定制,可以將任何結(jié)構(gòu)化和非結(jié)構(gòu)化數(shù)據(jù)加入Elasticsearch索引。Kibana還充分利用了Elasticsearch強(qiáng)大的搜索和分析功能。
整合數(shù)據(jù):Kibana能夠更好地處理海量數(shù)據(jù),并據(jù)此創(chuàng)建柱形圖、折線圖、散點(diǎn)圖、直方圖、餅圖和地圖。
整合數(shù)據(jù):Kibana能夠更好地處理海量數(shù)據(jù),并據(jù)此創(chuàng)建柱形圖、折線圖、散點(diǎn)圖、直方圖、餅圖和地圖。
讓更多團(tuán)隊(duì)成員受益:強(qiáng)大的數(shù)據(jù)庫(kù)可視化接口讓各業(yè)務(wù)崗位都能夠從數(shù)據(jù)集合受益。
接口靈活,分享更容易:使用Kibana可以更加方便地創(chuàng)建、保存、分享數(shù)據(jù),并將可視化數(shù)據(jù)快速交流。
配置簡(jiǎn)單:Kibana的配置和啟用非常簡(jiǎn)單,用戶體驗(yàn)非常友好。Kibana自帶Web服務(wù)器,可以快速啟動(dòng)運(yùn)行。
**可視化多數(shù)據(jù)源*8:Kibana可以非常方便地把來(lái)自Logstash、ES-Hadoop、Beats或第三方技術(shù)的數(shù)據(jù)整合到Elasticsearch,支持的第三方技術(shù)包括Apache Flume、Fluentd等。
簡(jiǎn)單數(shù)據(jù)導(dǎo)出:Kibana可以方便地導(dǎo)出感興趣的數(shù)據(jù),與其它數(shù)據(jù)集合并融合后快速建模分析,發(fā)現(xiàn)新結(jié)果。
五、ELK優(yōu)點(diǎn)
- 處理方式靈活。 Elasticsearch是全文索引,具有強(qiáng)大的搜索能力。
2.配置相對(duì)簡(jiǎn)單。 Kibana的配置非常簡(jiǎn)單,Elasticsearch則全部使用Json接口,配置也不復(fù)雜,Logstash的配置使用模塊的方式,配置也相對(duì)簡(jiǎn)單。
3.檢索性能高。 ELK架構(gòu)通??梢赃_(dá)到百億級(jí)數(shù)據(jù)的查詢秒級(jí)響應(yīng)。
4.集群線性擴(kuò)展。 Elasticsearch本身沒(méi)有單點(diǎn)的概念,自動(dòng)默認(rèn)集群模式,Elasticsearch和Logstash都可以
5.靈活擴(kuò)展。
6.頁(yè)面美觀。 Kibana的前端設(shè)計(jì)美觀,且操作簡(jiǎn)單。
六、Filebeat 簡(jiǎn)介
Filebeat由兩個(gè)主要組成部分組成:prospector(探勘者)和 harvesters(礦車(chē))。這些組件一起工作來(lái)讀取文件并將事件數(shù)據(jù)發(fā)送到指定的output。
1、prospector: 負(fù)責(zé)找到所有需要進(jìn)行讀取的數(shù)據(jù)源
2、harvesters:負(fù)責(zé)讀取單個(gè)文件的內(nèi)容,并將內(nèi)容發(fā)送到output中,負(fù)責(zé)文件的打開(kāi)和關(guān)閉。
1、Logstash
作為數(shù)據(jù)收集引擎。它支持動(dòng)態(tài)的從各種數(shù)據(jù)源搜集數(shù)據(jù),并對(duì)數(shù)據(jù)進(jìn)行過(guò)濾、分析、豐富、統(tǒng)一格式等操作,然后存儲(chǔ)到用戶指定的位置,一般會(huì)發(fā)送給 Elasticsearch。
可以添加的其它組件:
2、Filebeat
輕量級(jí)的開(kāi)源日志文件數(shù)據(jù)搜集器。通常在需要采集數(shù)據(jù)的客戶端安裝 Filebeat,并指定目錄與日志格式,F(xiàn)ilebeat 就能快速收集數(shù)據(jù),并發(fā)送給 logstash 進(jìn)行解析,或是直接發(fā)給 Elasticsearch 存儲(chǔ),性能上相比運(yùn)行于 JVM 上的 logstash 優(yōu)勢(shì)明顯,是對(duì)它的替代。常應(yīng)用于 EFLK 架構(gòu)當(dāng)中。
3、Filebeat的工作方式
啟動(dòng)Filebeat時(shí),它將啟動(dòng)一個(gè)或多個(gè)輸入,這些輸入將在為日志數(shù)據(jù)指定的位置中查找。對(duì)于Filebeat所找到的每個(gè)日志,F(xiàn)ilebeat都會(huì)啟動(dòng)收集器。每個(gè)收集器都讀取單個(gè)日志以獲取新內(nèi)容,并將新日志數(shù)據(jù)發(fā)送到libbeat,libbeat將聚集事件,并將聚集的數(shù)據(jù)發(fā)送到為Filebeat配置的輸出。
4、Filebeat工作原理
Filebeat可以保持每個(gè)文件的狀態(tài),并且頻繁地把文件狀態(tài)從注冊(cè)表里更新到磁盤(pán)。這里所說(shuō)的文件狀態(tài)是用來(lái)記錄上一次Harvster讀取文件時(shí)讀取到的位置,以保證能把全部的日志數(shù)據(jù)都讀取出來(lái),然后發(fā)送給output。如果在某一時(shí)刻,作為output的ElasticSearch或者Logstash變成了不可用,F(xiàn)ilebeat將會(huì)把最后的文件讀取位置保存下來(lái),直到output重新可用的時(shí)候,快速地恢復(fù)文件數(shù)據(jù)的讀取。在Filebaet運(yùn)行過(guò)程中,每個(gè)Prospector的狀態(tài)信息都會(huì)保存在內(nèi)存里。如果Filebeat出行了重啟,完成重啟之后,會(huì)從注冊(cè)表文件里恢復(fù)重啟之前的狀態(tài)信息,讓FIlebeat繼續(xù)從之前已知的位置開(kāi)始進(jìn)行數(shù)據(jù)讀取。
5、Filebeat用途
-
適用于集群環(huán)境下,服務(wù)多,且部署在不同機(jī)器
1、為什么要用filebeat來(lái)收集日志?
因?yàn)閘ogstash是jvm跑的,資源消耗比較大,啟動(dòng)一個(gè)logstash就需要消耗500M左右的內(nèi)存(這就是為什么logstash啟動(dòng)特別慢的原因),而filebeat只需要10來(lái)M內(nèi)存資源。常用的ELK日志采集方案中,大部分的做法就是將所有節(jié)點(diǎn)的日志內(nèi)容通過(guò)filebeat發(fā)送到logstash,logstash根據(jù)配置文件進(jìn)行過(guò)濾。然后將過(guò)濾之后的文件輸送到elasticsearch中,通過(guò)kibana去展示。
2、filebeat結(jié)合logstash帶來(lái)好處
- 通過(guò) Logstash 具有基于磁盤(pán)的自適應(yīng)緩沖系統(tǒng),該系統(tǒng)將吸收傳入的吞吐量,從而減輕 Elasticsearch 持續(xù)寫(xiě)入數(shù)據(jù)的壓力
- 從其他數(shù)據(jù)源(例如數(shù)據(jù)庫(kù),S3對(duì)象存儲(chǔ)或消息傳遞隊(duì)列)中提取
- 將數(shù)據(jù)發(fā)送到多個(gè)目的地,例如S3,HDFS(Hadoop分布式文件系統(tǒng))或?qū)懭胛募?/li>
- 使用條件數(shù)據(jù)流邏輯組成更復(fù)雜的處理管道
緩存/消息隊(duì)列(redis、kafka、RabbitMQ等):可以對(duì)高并發(fā)日志數(shù)據(jù)進(jìn)行流量削峰和緩沖,這樣的緩沖可以一定程度的保護(hù)數(shù)據(jù)不丟失,還可以對(duì)整個(gè)架構(gòu)進(jìn)行應(yīng)用解耦。
Fluentd:是一個(gè)流行的開(kāi)源數(shù)據(jù)收集器。由于 logstash 太重量級(jí)的缺點(diǎn),Logstash 性能低、資源消耗比較多等問(wèn)題,隨后就有 Fluentd 的出現(xiàn)。相比較 logstash,F(xiàn)luentd 更易用、資源消耗更少、性能更高,在數(shù)據(jù)處理上更高效可靠,受到企業(yè)歡迎,成為 logstash 的一種替代方案,常應(yīng)用于 EFK 架構(gòu)當(dāng)中。在 Kubernetes 集群中也常使用 EFK 作為日志數(shù)據(jù)收集的方案。
在 Kubernetes 集群中一般是通過(guò) DaemonSet 來(lái)運(yùn)行 Fluentd,以便它在每個(gè) Kubernetes 工作節(jié)點(diǎn)上都可以運(yùn)行一個(gè) Pod。 它通過(guò)獲取容器日志文件、過(guò)濾和轉(zhuǎn)換日志數(shù)據(jù),然后將數(shù)據(jù)傳遞到 Elasticsearch 集群,在該集群中對(duì)其進(jìn)行索引和存儲(chǔ)。
3、Filebeat和Logstash的區(qū)別
Logstash | filebeat | |
---|---|---|
內(nèi)存 | 大 | 小 |
cpu | 大 | 小 |
插件 | 多 | 多 |
功能 | 從多種輸入端采集并實(shí)時(shí)解析和轉(zhuǎn)換數(shù)據(jù)并輸出到多種輸出端 | 傳輸 |
輕重 | 相對(duì)較重 | 輕量級(jí)二進(jìn)制文件 |
過(guò)濾能力 | 強(qiáng)大的過(guò)濾能力 | 有過(guò)濾能力但是弱 |
集群 | 單節(jié)點(diǎn) | 單節(jié)點(diǎn) |
輸出到多個(gè)接收方 | 支持 | 6.0之前支持 |
二次開(kāi)發(fā)或者擴(kuò)展開(kāi)發(fā) | 難 | 易 |
進(jìn)程 | 一臺(tái)服務(wù)器只允許一個(gè)logstash進(jìn)程,掛掉之后需要手動(dòng)拉起 |
七、Kafka簡(jiǎn)介
Kafka是一種消息隊(duì)列,主要用來(lái)處理大量數(shù)據(jù)狀態(tài)下的消息隊(duì)列,一般用來(lái)做日志的處理。既然是消息隊(duì)列,那么Kafka也就擁有消息隊(duì)列的相應(yīng)的特性了。
可以在系統(tǒng)中起到“肖峰填谷”的作用,也可以用于異構(gòu)、分布式系統(tǒng)中海量數(shù)據(jù)的異步化處理。
1、為什么需要消息隊(duì)列?
主要原因是由于在高并發(fā)環(huán)境下,同步請(qǐng)求來(lái)不及處理,請(qǐng)求往往會(huì)發(fā)生阻塞。比如大量的請(qǐng)求并發(fā)訪問(wèn)數(shù)據(jù)庫(kù),導(dǎo)致行鎖表鎖,最后請(qǐng)求線程會(huì)堆積過(guò)多,從而觸發(fā) too many connection 錯(cuò)誤,引發(fā)雪崩效應(yīng)。
我們使用消息隊(duì)列,通過(guò)異步處理請(qǐng)求,從而緩解系統(tǒng)的壓力。消息隊(duì)列常應(yīng)用于異步處理,流量削峰,應(yīng)用解耦,消息通訊等場(chǎng)景。
當(dāng)前比較常見(jiàn)的 MQ 中間件有 ActiveMQ、RabbitMQ、RocketMQ、Kafka 等。
2、消息隊(duì)列的好處
1、解耦合
耦合的狀態(tài)表示當(dāng)你實(shí)現(xiàn)某個(gè)功能的時(shí)候,是直接接入當(dāng)前接口,而利用消息隊(duì)列,可以將相應(yīng)的消息發(fā)送到消息隊(duì)列,這樣的話,如果接口出了問(wèn)題,將不會(huì)影響到當(dāng)前的功能。
2、異步處理
異步處理替代了之前的同步處理,異步處理不需要讓流程走完就返回結(jié)果,可以將消息發(fā)送到消息隊(duì)列中,然后返回結(jié)果,剩下讓其他業(yè)務(wù)處理接口從消息隊(duì)列中拉取消費(fèi)處理即可。
3、流量削峰
高流量的時(shí)候,使用消息隊(duì)列作為中間件可以將流量的高峰保存在消息隊(duì)列中,從而防止了系統(tǒng)的高請(qǐng)求,減輕服務(wù)器的請(qǐng)求處理壓力。
3、Kafka的特性
高吞吐量、低延遲:kafka每秒可以處理幾十萬(wàn)條消息,它的延遲最低只有幾毫秒
可擴(kuò)展性:kafka集群支持熱擴(kuò)展
持久性、可靠性:消息被持久化到本地磁盤(pán),并且支持?jǐn)?shù)據(jù)備份防止數(shù)據(jù)丟失
容錯(cuò)性:允許集群中節(jié)點(diǎn)失?。ㄈ舾北緮?shù)量為n,則允許n-1個(gè)節(jié)點(diǎn)失敗)
高并發(fā):支持?jǐn)?shù)千個(gè)客戶端同時(shí)讀寫(xiě)
4、Kafka作為存儲(chǔ)系統(tǒng)
1、任何允許發(fā)布與消費(fèi)它們分離的消息的消息隊(duì)列實(shí)際上充當(dāng)了正在進(jìn)行的消息的存儲(chǔ)系統(tǒng)。
2、Kafka的不同之處在于它是一個(gè)非常好的存儲(chǔ)系統(tǒng)。
3、寫(xiě)入Kafka的數(shù)據(jù)將寫(xiě)入磁盤(pán)并進(jìn)行復(fù)制以實(shí)現(xiàn)容錯(cuò)。Kafka允許生產(chǎn)者等待確認(rèn),以便在完全復(fù)制之前寫(xiě)入不被認(rèn)為是完整的,并且即使寫(xiě)入的服務(wù)器失敗也保證寫(xiě)入仍然存在。
4、磁盤(pán)結(jié)構(gòu)Kafka很好地使用了規(guī)模 - 無(wú)論服務(wù)器上有50 KB還是50 TB的持久數(shù)據(jù),Kafka都會(huì)執(zhí)行相同的操作。
八、Kafka消費(fèi)模式
Kafka的消費(fèi)模式主要有兩種:
1、一種是一對(duì)一的消費(fèi),也即點(diǎn)對(duì)點(diǎn)的通信,即一個(gè)發(fā)送一個(gè)接收。
2、第二種為一對(duì)多的消費(fèi),即一個(gè)消息發(fā)送到消息隊(duì)列,消費(fèi)者根據(jù)消息隊(duì)列的訂閱拉取消息消費(fèi)。
1、一對(duì)一
消息生產(chǎn)者發(fā)布消息到Queue(隊(duì)列)隊(duì)列中,通知消費(fèi)者從隊(duì)列中拉取消息進(jìn)行消費(fèi)。消息被消費(fèi)之后則刪除,Queue支持多個(gè)消費(fèi)者,但對(duì)于一條消息而言,只有一個(gè)消費(fèi)者可以消費(fèi),即一條消息只能被一個(gè)消費(fèi)者消費(fèi)。
2、一對(duì)多
這種模式也稱為發(fā)布/訂閱模式,即利用Topic(主題)存儲(chǔ)消息,消息生產(chǎn)者將消息發(fā)布到Topic中,同時(shí)有多個(gè)消費(fèi)者訂閱此topic,消費(fèi)者可以從中消費(fèi)消息,注意發(fā)布到Topic中的消息會(huì)被多個(gè)消費(fèi)者消費(fèi),消費(fèi)者消費(fèi)數(shù)據(jù)之后,數(shù)據(jù)不會(huì)被清除,Kafka會(huì)默認(rèn)保留一段時(shí)間,然后再刪除。
九、Kafka的基礎(chǔ)架構(gòu)
Kafka像其他Mq一樣,也有自己的基礎(chǔ)架構(gòu),主要存在生產(chǎn)者Producer、Kafka集群Broker、消費(fèi)者Consumer、注冊(cè)消息Zookeeper
Producer:Producer即生產(chǎn)者,消息的產(chǎn)生者,是消息的入口。
Broker:Broker是kafka實(shí)例,每個(gè)服務(wù)器上有一個(gè)或多個(gè)kafka的實(shí)例,我們姑且認(rèn)為每個(gè)broker對(duì)應(yīng)一臺(tái)服務(wù)器。每個(gè)kafka集群內(nèi)的broker都有一個(gè)不重復(fù)的編號(hào)
Topic:消息的主題,可以理解為消息的分類(lèi),kafka的數(shù)據(jù)就保存在topic。在每個(gè)broker上都可以創(chuàng)建多個(gè)topic。
Partition:Topic的分區(qū),每個(gè)topic可以有多個(gè)分區(qū),分區(qū)的作用是做負(fù)載,提高kafka的吞吐量。同一個(gè)topic在不同的分區(qū)的數(shù)據(jù)是不重復(fù)的,partition的表現(xiàn)形式就是一個(gè)一個(gè)的文件夾!
Replication:每一個(gè)分區(qū)都有多個(gè)副本,副本的作用是做備胎。當(dāng)主分區(qū)(Leader)故障的時(shí)候會(huì)選擇一個(gè)備胎(Follower)上位,成為L(zhǎng)eader。在kafka中默認(rèn)副本的最大數(shù)量是10個(gè),且副本的數(shù)量不能大于Broker的數(shù)量,follower和leader絕對(duì)是在不同的機(jī)器,同一機(jī)器對(duì)同一個(gè)分區(qū)也只可能存放一個(gè)副本(包括自己)。
Message:每一條發(fā)送的消息主體。
Consumer:消費(fèi)者,即消息的消費(fèi)方,是消息的出口。
Consumer Group:我們可以將多個(gè)消費(fèi)組組成一個(gè)消費(fèi)者組,在kafka的設(shè)計(jì)中同一個(gè)分區(qū)的數(shù)據(jù)只能被消費(fèi)者組中的某一個(gè)消費(fèi)者消費(fèi)。同一個(gè)消費(fèi)者組的消費(fèi)者可以消費(fèi)同一個(gè)topic的不同分區(qū)的數(shù)據(jù),這也是為了提高kafka的吞吐量!
*Zookeeper:kafka集群依賴zookeeper來(lái)保存集群的的元信息,來(lái)保證系統(tǒng)的可用性。
Leader:每個(gè)分區(qū)多個(gè)副本的主角色,生產(chǎn)者發(fā)送數(shù)據(jù)的對(duì)象,以及消費(fèi)者消費(fèi)數(shù)據(jù)的對(duì)象都是Leader。
Follower:每個(gè)分區(qū)多個(gè)副本的從角色,實(shí)時(shí)的從Leader中同步數(shù)據(jù),保持和Leader數(shù)據(jù)的同步,Leader發(fā)生故障的時(shí)候,某個(gè)Follower會(huì)成為新的Leader。
簡(jiǎn)單來(lái)說(shuō):
上述一個(gè)Topic會(huì)產(chǎn)生多個(gè)分區(qū)Partition,分區(qū)中分為L(zhǎng)eader和Follower,消息一般發(fā)送到Leader,F(xiàn)ollower通過(guò)數(shù)據(jù)的同步與Leader保持同步,消費(fèi)的話也是在Leader中發(fā)生消費(fèi),如果多個(gè)消費(fèi)者,則分別消費(fèi)Leader和各個(gè)Follower中的消息,當(dāng)Leader發(fā)生故障的時(shí)候,某個(gè)Follower會(huì)成為主節(jié)點(diǎn),此時(shí)會(huì)對(duì)齊消息的偏移量。
2、工作流程
producer就是生產(chǎn)者,是數(shù)據(jù)的入口。Producer在寫(xiě)入數(shù)據(jù)的時(shí)候永遠(yuǎn)的找leader,不會(huì)直接將數(shù)據(jù)寫(xiě)入follower
3、分區(qū)的原因
1、便在集群中擴(kuò)展,每個(gè)Partition(分區(qū))可以通過(guò)調(diào)整以適應(yīng)它所在的機(jī)器,而一個(gè)topic(消息主題)又可以有多個(gè)Partition組成,因此整個(gè)集群就可以適應(yīng)任意大小的數(shù)據(jù)了;
2、可以提高并發(fā),因?yàn)榭梢砸訮artition為單位讀寫(xiě)了。
4、分區(qū)目的
producer(生產(chǎn)者)采用push模式將數(shù)據(jù)發(fā)布到broker,每條消息追加到分區(qū)中,順序?qū)懭氪疟P(pán),所以保證同一分區(qū)內(nèi)的數(shù)據(jù)是有序的。
數(shù)據(jù)會(huì)寫(xiě)入到不同的分區(qū),分區(qū)的目的是
1、方便擴(kuò)展:因?yàn)橐粋€(gè)topic可以有多個(gè)partition,所以我們可以通過(guò)擴(kuò)展機(jī)器去輕松的應(yīng)對(duì)日益增長(zhǎng)的數(shù)據(jù)量。
2、抗高并發(fā):以partition為讀寫(xiě)單位,可以多個(gè)消費(fèi)者同時(shí)消費(fèi)數(shù)據(jù),提高了消息的處理效率。
十、Kafka原則
類(lèi)似于負(fù)載均衡,當(dāng)我們向某個(gè)服務(wù)器發(fā)送請(qǐng)求的時(shí)候,服務(wù)端可能會(huì)對(duì)請(qǐng)求做一個(gè)負(fù)載,將流量分發(fā)到不同的服務(wù)器,那在kafka中,如果某個(gè)topic有多個(gè)partition,producer又怎么知道該將數(shù)據(jù)發(fā)往哪個(gè)partition呢?kafka中有幾個(gè)原則:
1、partition在寫(xiě)入的時(shí)候可以指定需要寫(xiě)入的partition,如果有指定,則寫(xiě)入對(duì)應(yīng)的partition。
2、如果沒(méi)有指定partition,但是設(shè)置了數(shù)據(jù)的key,則會(huì)根據(jù)key的值hash出一個(gè)partition。
3、如果既沒(méi)指定partition,又沒(méi)有設(shè)置key,則會(huì)輪詢選出一個(gè)partition。
保證消息不丟失是一個(gè)消息隊(duì)列中間件的基本保證,那producer在向kafka寫(xiě)入消息的時(shí)候,怎么保證消息不丟失呢?
那就是通過(guò)ACK應(yīng)答機(jī)制!在生產(chǎn)者向隊(duì)列寫(xiě)入數(shù)據(jù)的時(shí)候可以設(shè)置參數(shù)來(lái)確定是否確認(rèn)kafka接收到數(shù)據(jù),這個(gè)參數(shù)可設(shè)置的值為0、1、all。
十一、ZooKeeper簡(jiǎn)介
ZooKeeper是一種為分布式應(yīng)用所設(shè)計(jì)的高可用、高性能且一致的開(kāi)源協(xié)調(diào)服務(wù),它提供了一項(xiàng)基本服務(wù):分布式鎖服務(wù)。分布式應(yīng)用可以基于它實(shí)現(xiàn)更高級(jí)的服務(wù),實(shí)現(xiàn)諸如同步服務(wù)、配置維護(hù)和集群管理或者命名的服務(wù)。
Zookeeper服務(wù)自身組成一個(gè)集群,2n+1個(gè)(奇數(shù))服務(wù)允許n個(gè)失效,集群內(nèi)一半以上機(jī)器可用,Zookeeper就可用。
假設(shè) 3臺(tái)機(jī)器組成的集群,可以有允許一臺(tái)失效,如果有2臺(tái)失效,這個(gè)集群就不可用,1<1.5,一般的搭建zookeeper集群時(shí),以奇數(shù)臺(tái)機(jī)器來(lái)搭建。目的:是為了提高容錯(cuò)能允許多損失一臺(tái)。
十二、 zookeeper工作機(jī)制
Zookeeper從設(shè)計(jì)模式角度來(lái)理解:是一個(gè)基于觀察者模式設(shè)計(jì)的分布式服務(wù)管理框架它負(fù)責(zé)存儲(chǔ)和管理大家都關(guān)心的數(shù)據(jù),然后接受觀察者的注冊(cè),一旦這些數(shù)據(jù)的狀態(tài)發(fā)生變化,Zookeeper就將負(fù)責(zé)通知已經(jīng)在Zookeeper上注冊(cè)的那些觀察者做出相應(yīng)的反應(yīng)。也就是說(shuō) Zookeeper =文件系統(tǒng)+通知機(jī)制。
十三、zookeeper特點(diǎn)
1、Zookeeper:一個(gè)領(lǐng)導(dǎo)者(Leader),多個(gè)跟隨者(Follower)組成的集群。
2、Zookeepe集群中只要有半數(shù)以上節(jié)點(diǎn)存活,Zookeeper集群就能正常服務(wù)。所以zookeeper適合安裝奇數(shù)臺(tái)服務(wù)器。
3、全局?jǐn)?shù)據(jù)一致:每個(gè)server保存一份相同的數(shù)據(jù)副本,client無(wú)論連接到哪個(gè)Server,數(shù)據(jù)都是一致的。
4、更新請(qǐng)求順序執(zhí)行,來(lái)自同一個(gè)client的更新請(qǐng)求按其發(fā)送順序依次執(zhí)行,即先進(jìn)先出。
5、數(shù)據(jù)更新原子性,一次數(shù)據(jù)更新要么成功,要么失敗。
6、實(shí)時(shí)性,在一定時(shí)間范圍內(nèi),client能讀到最新數(shù)據(jù)。]
十四、Zookeeper數(shù)據(jù)結(jié)構(gòu)
ZooKeeper數(shù)據(jù)模型的結(jié)構(gòu)與Linux文件系統(tǒng)很類(lèi)似,整體上可以看作是一棵樹(shù),每個(gè)節(jié)點(diǎn)稱做一個(gè)ZNode。每一個(gè)ZNode默認(rèn)能夠存儲(chǔ)1MB的數(shù)據(jù),每個(gè)ZNode都可以通過(guò)其路徑唯一標(biāo)識(shí)。
十五、zookeeper應(yīng)用場(chǎng)景
提供的服務(wù)包括:統(tǒng)一命名服務(wù)、統(tǒng)一配置管理、統(tǒng)一集群管理、服務(wù)器節(jié)點(diǎn)動(dòng)態(tài)上下線、軟負(fù)載均衡等。
1、統(tǒng)一命名服務(wù)
1、在分布式環(huán)境下,經(jīng)常需要對(duì)應(yīng)用/服務(wù)進(jìn)行統(tǒng)一命名,便于識(shí)別。例如:IP不容易記住,而域名容易記住。
2、統(tǒng)一配置管理
1、分布式環(huán)境下,配置文件同步非常常見(jiàn)。一般要求一個(gè)集群中,所有節(jié)點(diǎn)的配置信息是一致的,比如Kafka集群。對(duì)配置文件修改后,希望能夠快速同步到各個(gè)節(jié)點(diǎn)上。
2、配置管理可交由ZooKeeper實(shí)現(xiàn)??蓪⑴渲眯畔?xiě)入ZooKeeper上的一個(gè)Znode。各個(gè)客戶端服務(wù)器監(jiān)聽(tīng)這個(gè)Znode。一旦Znode中的數(shù)據(jù)被修改,ZooKeeper將通知各個(gè)客戶端服務(wù)器。
3、統(tǒng)一集群管理
1、分布式環(huán)境中,實(shí)時(shí)掌握每個(gè)節(jié)點(diǎn)的狀態(tài)是必要的??筛鶕?jù)節(jié)點(diǎn)實(shí)時(shí)狀態(tài)做出一些調(diào)整。
2、ZooKeeper可以實(shí)現(xiàn)實(shí)時(shí)監(jiān)控節(jié)點(diǎn)狀態(tài)變化??蓪⒐?jié)點(diǎn)信息寫(xiě)入ZooKeeper上的一個(gè)2Node。監(jiān)聽(tīng)這個(gè)DMode可獲取它的實(shí)時(shí)狀態(tài)變化。
4、服務(wù)器動(dòng)態(tài)上下線
客戶端能實(shí)時(shí)洞察到服務(wù)器上下線的變化。
5、軟負(fù)教均衡
在Zookeeper中記錄每臺(tái)服務(wù)器的訪問(wèn)數(shù),讓訪問(wèn)數(shù)最少的服務(wù)器去處理最新的客戶端請(qǐng)求。
十六、工作原理
Zookeeper的核心是原子廣播,這個(gè)機(jī)制保證了各個(gè)Server之間的同步。實(shí)現(xiàn)這個(gè)機(jī)制的協(xié)議叫做Zab協(xié)議。
Zab協(xié)議有兩種模式,它們分別是恢復(fù)模式(選主)和廣播模式(同步)。
恢復(fù)模式:當(dāng)服務(wù)啟動(dòng)或者在領(lǐng)導(dǎo)者崩潰后,Zab就進(jìn)入了恢復(fù)模式,恢復(fù)模式不接受客戶端請(qǐng)求,當(dāng)領(lǐng)導(dǎo)者被選舉出來(lái),且大多數(shù)Server完成了和leader的狀態(tài)同步以后,恢復(fù)模式就結(jié)束了。
狀態(tài)同步保證了leader和Server具有相同的系統(tǒng)狀態(tài)。
廣播模式:一旦Leader已經(jīng)和多數(shù)的Follower進(jìn)行了狀態(tài)同步后,他就可以開(kāi)始廣播消息了,
即進(jìn)入廣播狀態(tài)。這時(shí)候當(dāng)一個(gè)Server加入ZooKeeper服務(wù)中,它會(huì)在恢復(fù)模式下啟動(dòng),發(fā)現(xiàn)Leader,并和Leader進(jìn)行狀態(tài)同步。待到同步結(jié)束,它也參與消息廣播。
ZooKeeper的廣播狀態(tài)一直到Leader崩潰了或者Leader失去了大部分的Followers支持。
十七、zookeeper選舉機(jī)制
第一次啟動(dòng)選舉機(jī)制
1、服務(wù)器1啟動(dòng),發(fā)起一次選舉。服務(wù)器1投自己一票。
此時(shí)服務(wù)器1票數(shù)一票,不夠半數(shù)以上(3票),選舉無(wú)法完成,服務(wù)器1狀態(tài)保持為L(zhǎng)OOKING;
2、服務(wù)器2啟動(dòng),再發(fā)起一次選舉。
服務(wù)器1和2分別投自己一票并交換選票信息:
此時(shí)服務(wù)器1發(fā)現(xiàn)服務(wù)器2的myid比自己目前投票推舉的(服務(wù)器1)大,
更改選票為推舉服務(wù)器2。此時(shí)服務(wù)器1票數(shù)0票,服務(wù)器2票數(shù)2票,沒(méi)有半數(shù)以上結(jié)果,選舉無(wú)法完成,服務(wù)器1,2狀態(tài)保持LOOKING
3、服務(wù)器3啟動(dòng),發(fā)起一次選舉。此時(shí)服務(wù)器1和2都會(huì)更改選票為服務(wù)器3。
此次投票結(jié)果:服務(wù)器1為0票,服務(wù)器2為0票,服務(wù)器3為3票。此時(shí)服務(wù)器3的票數(shù)已經(jīng)超過(guò)半數(shù),
服務(wù)器3當(dāng)選Leader。服務(wù)器1,2更改狀態(tài)為FOLLOWING,服務(wù)器3更改狀態(tài)為L(zhǎng)EADING;
4、服務(wù)器4啟動(dòng),發(fā)起一次選舉。此時(shí)服務(wù)器1,2,3已經(jīng)不是LooKING狀態(tài),不會(huì)更改選票信息。
交換選票信息結(jié)果:服務(wù)器3為3票,服務(wù)器4為1票。
此時(shí)服務(wù)器4服從多數(shù),更改選票信息為服務(wù)器3,并更改狀態(tài)為FOLOWING;
5、服務(wù)器5啟動(dòng),同4一樣當(dāng)小弟。
非第一次啟動(dòng)選舉機(jī)制
當(dāng)ZooKeeper集群中的一臺(tái)服務(wù)器出現(xiàn)以下兩種情況之一時(shí),就會(huì)開(kāi)始進(jìn)入Leader選舉:
服務(wù)器初始化啟動(dòng)。
服務(wù)器運(yùn)行期間無(wú)法和Leader保持連接。
而當(dāng)一臺(tái)機(jī)器進(jìn)入Leader選舉流程時(shí),當(dāng)前集群也可能會(huì)處于以下兩種狀態(tài):
集群中本來(lái)就己經(jīng)存在一個(gè)Leader。
對(duì)于已經(jīng)存在Leader的情況,機(jī)器試圖去選舉Leader時(shí),會(huì)被告知當(dāng)前服務(wù)器的Leader信息,對(duì)于該機(jī)器來(lái)說(shuō),僅僅需要和Leader機(jī)器建立連接,并進(jìn)行狀態(tài)同步即可。
**集群中確實(shí)不存在Leader。**假設(shè)ZooKeeper由5臺(tái)服務(wù)器組成,SID分別為1、2、3、4、5,ZXID分別為8、8、8、7、,并且此時(shí)sID為3的服務(wù)器是。一時(shí)刻,3和5服務(wù)器出現(xiàn)故障,因此開(kāi)始進(jìn)行Leader選舉。
選舉Leader規(guī)則:
EPOCH大的直接勝出
EPOCH相同,事務(wù)id大的勝出
事務(wù)id相同,服務(wù)器id大的勝出
SID:服務(wù)器ID。用來(lái)唯一標(biāo)識(shí)一臺(tái)ZooKeeper集群中的機(jī)器,每臺(tái)機(jī)器不能重復(fù),和myid一致。
ZXID:事務(wù)ID。ZXID是一個(gè)事務(wù)ID,用來(lái)標(biāo)識(shí)一次服務(wù)器狀態(tài)的變更。
在某一時(shí)刻,集群中的每臺(tái)機(jī)器的zxID值不一定完全一致,這和ZooKeeper服務(wù)器對(duì)于客戶端"更新請(qǐng)求"的處理邏輯速度有關(guān)。
Bpoch:每個(gè)Leader任期的代號(hào)。沒(méi)有Leader時(shí)同一輪投票過(guò)程中的邏輯時(shí)鐘值是相同的。每投完一次票這個(gè)數(shù)據(jù)就會(huì)增加
十八、ELK+Filebeat+Kafka+Zookeeper架構(gòu)
每層實(shí)現(xiàn)的功能和含義分別介紹如下:
數(shù)據(jù)采集層
數(shù)據(jù)采集層位于最左邊的業(yè)務(wù)服務(wù)器集群上,
在每個(gè)業(yè)務(wù)服務(wù)器上面安裝了filebeat做日志收集,然后把采集到的原始日志發(fā)送到Kafka+zookeeper集群上。
消息隊(duì)列層
原始日志發(fā)送到Kafka+zookeeper集群上后,會(huì)進(jìn)行集中存儲(chǔ),
此時(shí),filbeat是消息的生產(chǎn)者,存儲(chǔ)的消息可以隨時(shí)被消費(fèi)。
數(shù)據(jù)分析層
Logstash作為消費(fèi)者,會(huì)去Kafka+zookeeper集群節(jié)點(diǎn)實(shí)時(shí)拉取原始日志,
然后將獲取到的原始日志根據(jù)規(guī)則進(jìn)行分析、清洗、過(guò)濾,最后將清洗好的日志轉(zhuǎn)發(fā)至Elasticsearch集群。
數(shù)據(jù)持久化存儲(chǔ)
Elasticsearch集群在接收到logstash發(fā)送過(guò)來(lái)的數(shù)據(jù)后,
執(zhí)行寫(xiě)磁盤(pán),建索引庫(kù)等操作,最后將結(jié)構(gòu)化的數(shù)據(jù)存儲(chǔ)到Elasticsearch集群上。
數(shù)據(jù)查詢、展示層
Kibana是一個(gè)可視化的數(shù)據(jù)展示平臺(tái),當(dāng)有數(shù)據(jù)檢索請(qǐng)求時(shí),
它從Elasticsearch集群上讀取數(shù)據(jù),然后進(jìn)行可視化出圖和多維度分析。
十九 部署ELK+Filebeat+Kafka+Zookeeper
ip | 部署的服務(wù) | 主機(jī)名 |
---|---|---|
192.168.0.102 | es+kafka+zookeeper+kibana | elk1 |
192.168.0.103 | es+kafka+zookeeper | elk2 |
192.168.0.104 | kafka+zookeeper+nginx+logstash+filebeat | client |
跑es集群的虛擬機(jī)要內(nèi)存4G 處理器4核
//前期準(zhǔn)備 關(guān)閉防火墻 selinux 修改主機(jī)名 安裝java環(huán)境 配置域名解析
[root@localhost ~]# systemctl status firewalld
● firewalld.service - firewalld - dynamic firewall daemon
Loaded: loaded (/usr/lib/systemd/system/firewalld.service; disabled; vendor preset: enabled)
Active: inactive (dead)
Docs: man:firewalld(1)
[root@localhost ~]# getenforce
Disabled
[root@elk1 ~]# vi /etc/hosts
[root@elk1 ~]# cat /etc/hosts
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4
::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.0.102 elk1
192.168.0.103 elk2
192.168.0.104 client
[root@localhost ~]# hostnamectl set-hostname elk1
[root@localhost ~]# bash
[root@elk1 ~]#
[root@elk1 ~]# yum -y install java
[root@elk1 ~]# java -version
openjdk version "1.8.0_312"
OpenJDK Runtime Environment (build 1.8.0_312-b07)
OpenJDK 64-Bit Server VM (build 25.312-b07, mixed mode)
[root@localhost ~]# systemctl status firewalld
● firewalld.service - firewalld - dynamic firewall daemon
Loaded: loaded (/usr/lib/systemd/system/firewalld.service; disabled; vendor preset: enabled)
Active: inactive (dead)
Docs: man:firewalld(1)
[root@localhost ~]# getenforce
Disabled
[root@elk2 ~]# cat /etc/hosts
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4
::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.0.102 elk1
192.168.0.103 elk2
192.168.0.104 client
[root@localhost ~]# hostnamectl set-hostname elk2
[root@localhost ~]# bash
[root@elk2 ~]#
[root@elk2 ~]# yum -y install java
[root@elk2 ~]# java -version
openjdk version "1.8.0_312"
OpenJDK Runtime Environment (build 1.8.0_312-b07)
OpenJDK 64-Bit Server VM (build 25.312-b07, mixed mode)
[root@localhost ~]# systemctl status firewalld
● firewalld.service - firewalld - dynamic firewall daemon
Loaded: loaded (/usr/lib/systemd/system/firewalld.service; disabled; vendor preset: enabled)
Active: inactive (dead)
Docs: man:firewalld(1)
[root@localhost ~]# getenforce
Disabled
[root@client ~]# cat /etc/hosts
127.0.0.1 localhost localhost.localdomain localhost4 localhost4.localdomain4
::1 localhost localhost.localdomain localhost6 localhost6.localdomain6
192.168.0.102 elk1
192.168.0.103 elk2
192.168.0.104 client
[root@localhost ~]# hostnamectl set-hostname client
[root@localhost ~]# bash
[root@client ~]#
[root@client ~]# yum -y install java
[root@client ~]# java -version
openjdk version "1.8.0_312"
OpenJDK Runtime Environment (build 1.8.0_312-b07)
OpenJDK 64-Bit Server VM (build 25.312-b07, mixed mode)
//部署elasticsearch 下面為需要下載的軟件包地址
https://phantomjs.org/download.html
https://nodejs.org/dist/v12.18.1/
https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.6.0-x86_64.rpm
https://codeload.github.com/mobz/elasticsearch-head/zip/refs/heads/master
[root@elk1 ~]# ls /opt/
elasticsearch-7.6.0-x86_64.rpm elasticsearch-head-master.zip node-v12.18.1-linux-x64.tar.gz phantomjs-2.1.1-linux-x86_64.tar.bz2
[root@elk1 opt]# rpm -ivh elasticsearch-7.6.0-x86_64.rpm
warning: elasticsearch-7.6.0-x86_64.rpm: Header V4 RSA/SHA512 Signature, key ID d88e42b4: NOKEY
Verifying... ################################# [100%]
Preparing... ################################# [100%]
Creating elasticsearch group... OK
Creating elasticsearch user... OK
Updating / installing...
1:elasticsearch-0:7.6.0-1 ################################# [100%]
### NOT starting on installation, please execute the following statements to configure elasticsearch service to start automatically using systemd
sudo systemctl daemon-reload
sudo systemctl enable elasticsearch.service
### You can start elasticsearch service by executing
sudo systemctl start elasticsearch.service
Created elasticsearch keystore in /etc/elasticsearch
[/usr/lib/tmpfiles.d/elasticsearch.conf:1] Line references path below legacy directory /var/run/, updating /var/run/elasticsearch → /run/elasticsearch; please update the tmpfiles.d/ drop-in file accordingly.
[root@elk1 opt]# cd /etc/elasticsearch/
[root@elk1 elasticsearch]#
[root@elk1 elasticsearch]# cp elasticsearch.yml elasticsearch.yml.bak
[root@elk1 elasticsearch]# vi elasticsearch.yml
[root@elk1 elasticsearch]#
[root@elk1 elasticsearch]# grep -v "^#" /etc/elasticsearch/elasticsearch.yml
cluster.name: sjy #集群名稱,各節(jié)點(diǎn)配成相同的集群名稱。
node.name: elk1 #節(jié)點(diǎn)名稱,各節(jié)點(diǎn)配置不同
path.data: /data/elk_data # 數(shù)據(jù)存儲(chǔ)目錄。
path.logs: /var/log/elasticsearch #日志存儲(chǔ)目錄。
bootstrap.memory_lock: false #內(nèi)存鎖定,是否禁用交換。
network.host: 0.0.0.0 #綁定節(jié)點(diǎn)IP。
http.port: 9200 # rest api端口。
discovery.zen.ping.unicast.hosts: ["elk1", "elk2"] #提供其他 Elasticsearch 服務(wù)節(jié)點(diǎn)的單點(diǎn)廣播發(fā)現(xiàn)功能。
cluster.initial_master_nodes: ["elk1"] #初始主節(jié)點(diǎn)
[root@elk1 elasticsearch]# mkdir -p /data/elk_data
[root@elk1 elasticsearch]# chown elasticsearch:elasticsearch /data/elk_data/
[root@elk1 opt]# systemctl daemon-reload
[root@elk1 elasticsearch]# systemctl start elasticsearch.service
[root@elk1 elasticsearch]# systemctl status elasticsearch.service
● elasticsearch.service - Elasticsearch
Loaded: loaded (/usr/lib/systemd/system/elasticsearch.service; disabled; vendor preset: disabled)
Active: active (running) since Wed 2023-07-19 10:06:19 CST; 49s ago
Docs: http://www.elastic.co
Main PID: 12546 (java)
Tasks: 50 (limit: 23009)
Memory: 1.1G
CGroup: /system.slice/elasticsearch.service
├─12546 /usr/share/elasticsearch/jdk/bin/java -Des.networkaddress.cache.ttl=60 -Des.networkaddress.cache.negative.ttl=10 -XX:+AlwaysPreTouch -Xss1m -Djava.awt.headless=true -Dfile.enco>
└─12640 /usr/share/elasticsearch/modules/x-pack-ml/platform/linux-x86_64/bin/controller
[root@elk1 elasticsearch]# systemctl enable --now elasticsearch.service
Synchronizing state of elasticsearch.service with SysV service script with /usr/lib/systemd/systemd-sysv-install.
Executing: /usr/lib/systemd/systemd-sysv-install enable elasticsearch
Created symlink /etc/systemd/system/multi-user.target.wants/elasticsearch.service → /usr/lib/systemd/system/elasticsearch.service.
[root@elk1 elasticsearch]# ss -antl
State Recv-Q Send-Q Local Address:Port Peer Address:Port Process
LISTEN 0 128 0.0.0.0:22 0.0.0.0:*
LISTEN 0 2048 *:9200 *:*
LISTEN 0 2048 *:9300 *:*
LISTEN 0 128 [::]:22 [::]:*
[root@elk2 ~]# ls /opt/
elasticsearch-7.6.0-x86_64.rpm elasticsearch-head-master.zip node-v12.18.1-linux-x64.tar.gz phantomjs-2.1.1-linux-x86_64.tar.bz2
[root@elk2 opt]# rpm -ivh elasticsearch-7.6.0-x86_64.rpm
warning: elasticsearch-7.6.0-x86_64.rpm: Header V4 RSA/SHA512 Signature, key ID d88e42b4: NOKEY
Verifying... ################################# [100%]
Preparing... ################################# [100%]
Creating elasticsearch group... OK
Creating elasticsearch user... OK
Updating / installing...
1:elasticsearch-0:7.6.0-1 ################################# [100%]
### NOT starting on installation, please execute the following statements to configure elasticsearch service to start automatically using systemd
sudo systemctl daemon-reload
sudo systemctl enable elasticsearch.service
### You can start elasticsearch service by executing
sudo systemctl start elasticsearch.service
Created elasticsearch keystore in /etc/elasticsearch
[/usr/lib/tmpfiles.d/elasticsearch.conf:1] Line references path below legacy directory /var/run/, updating /var/run/elasticsearch → /run/elasticsearch; please update the tmpfiles.d/ drop-in file accordingly.
[root@elk2 opt]# cd /etc/elasticsearch/
[root@elk2 elasticsearch]#
[root@elk2 elasticsearch]# cp elasticsearch.yml elasticsearch.yml.bak
[root@elk2 elasticsearch]# vi elasticsearch.yml
[root@elk2 elasticsearch]# grep -v "^#" /etc/elasticsearch/elasticsearch.yml
cluster.name: sjy
node.name: elk2
path.data: /data/elk_data
path.logs: /var/log/elasticsearch
bootstrap.memory_lock: false
network.host: 0.0.0.0
http.port: 9200
discovery.zen.ping.unicast.hosts: ["elk1", "elk2"]
cluster.initial_master_nodes: ["elk1"]
[root@elk2 elasticsearch]# mkdir -p /data/elk_data
[root@elk2 elasticsearch]# chown elasticsearch:elasticsearch /data/elk_data/
[root@elk2 opt]# systemctl daemon-reload
[root@elk2 elasticsearch]# systemctl start elasticsearch.service
[root@elk2 elasticsearch]# systemctl status elasticsearch.service
● elasticsearch.service - Elasticsearch
Loaded: loaded (/usr/lib/systemd/system/elasticsearch.service; disabled; vendor preset: disabled)
Active: active (running) since Wed 2023-07-19 10:13:27 CST; 7s ago
Docs: http://www.elastic.co
Main PID: 11400 (java)
Tasks: 48 (limit: 23009)
Memory: 1.2G
CGroup: /system.slice/elasticsearch.service
├─11400 /usr/share/elasticsearch/jdk/bin/java -Des.networkaddress.cache.ttl=60 -Des.networkaddress.cache.negative.ttl=10 -XX:+AlwaysPreTouch -Xss1m -Djava.awt.headless=true -Dfile.enco>
└─11495 /usr/share/elasticsearch/modules/x-pack-ml/platform/linux-x86_64/bin/controller
Jul 19 10:13:08 elk2 systemd[1]: Starting Elasticsearch...
Jul 19 10:13:09 elk2 elasticsearch[11400]: OpenJDK 64-Bit Server VM warning: Option UseConcMarkSweepGC was deprecated in version 9.0 and will likely be removed in a future release.
Jul 19 10:13:27 elk2 systemd[1]: Started Elasticsearch.
[root@elk2 elasticsearch]# systemctl enable --now elasticsearch.service
Synchronizing state of elasticsearch.service with SysV service script with /usr/lib/systemd/systemd-sysv-install.
Executing: /usr/lib/systemd/systemd-sysv-install enable elasticsearch
Created symlink /etc/systemd/system/multi-user.target.wants/elasticsearch.service → /usr/lib/systemd/system/elasticsearch.service.
[root@elk2 elasticsearch]# ss -antl
State Recv-Q Send-Q Local Address:Port Peer Address:Port Process
LISTEN 0 128 0.0.0.0:22 0.0.0.0:*
LISTEN 0 128 [::]:22 [::]:*
LISTEN 0 2048 *:9200 *:*
LISTEN 0 2048 *:9300 *:*
//訪問(wèn)測(cè)試
//安裝 Elasticsearch-head 插件 安裝 node phantomjs 安裝 Elasticsearch-head 數(shù)據(jù)可視化工具 在elk1上操作
現(xiàn)在nodev12.8.1需要編譯安裝
在解壓目錄下執(zhí)行./configure --prefix=/usr/local/node 后執(zhí)行make && make install
下面添加環(huán)境變量配置一致
[root@elk1 opt]# ls
elasticsearch-7.6.0-x86_64.rpm elasticsearch-head-master.zip node-v12.18.1-linux-x64.tar.gz phantomjs-2.1.1-linux-x86_64.tar.bz2
[root@elk1 opt]# yum -y install unzip bzip2
[root@elk1 opt]# tar xf node-v12.18.1-linux-x64.tar.gz
[root@elk1 opt]# mv node-v12.18.1-linux-x64 /usr/local/node
[root@elk1 opt]# echo "export PATH=$PATH:/usr/local/node/bin" >> ~/.bashrc
[root@elk1 opt]# source ~/.bashrc
[root@elk1 opt]# node -v
v12.18.1
[root@elk1 opt]# tar jxvf phantomjs-2.1.1-linux-x86_64.tar.bz2 -C /usr/local/src/
[root@elk1 opt]# cd /usr/local/src/phantomjs-2.1.1-linux-x86_64/bin/
[root@elk1 bin]# cp phantomjs /usr/local/bin/
[root@elk1 opt]# unzip elasticsearch-head-master.zip
//修改為淘寶的源
[root@elk1 opt]# npm config set registry http://registry.npm.taobao.org
[root@elk1 opt]# cd elasticsearch-head-master
[root@elk1 elasticsearch-head-master]# npm install
Failed at the phantomjs-prebuilt@2.1.16 install script.
npm ERR! This is probably not a problem with npm. There is likely additional logging output above.
npm ERR! A complete log of this run can be found in:
npm ERR! /root/.npm/_logs/2023-07-19T04_16_16_954Z-debug.log
//看到這個(gè)代表成功
//修改 Elasticsearch 主配置文件
[root@elk1 elasticsearch-head-master]# vi /etc/elasticsearch/elasticsearch.yml
[root@elk1 elasticsearch-head-master]# vi Gruntfile.js
connect: {
server: {
options: {
hostname: '192.168.0.102', //添加這行
port: 9100,
base: '.',
keepalive: true
}
}
}
[root@elk1 elasticsearch-head-master]# cd _site/
[root@elk1 _site]# vi app.js
init: function(parent) {
this._super();
this.prefs = services.Preferences.instance();
this.base_uri = this.config.base_uri || this.prefs.get("app-base_uri") || "http://192.168.0.102:9200"; //修改這里
if( this.base_uri.charAt( this.base_uri.length - 1 ) !== "/" ) {
// XHR request fails if the URL is not ending with a "/"
this.base_uri += "/";
}
在每個(gè)節(jié)點(diǎn)的/etc/elasticsearch/elasticsearch.yml/配置文件最后加上
http.cors.enabled: true //這兩行 #是否允許跨源 REST 請(qǐng)求,用于允許head插件訪問(wèn)ES。
http.cors.allow-origin: "*" #允許的源地址。
//啟動(dòng)
[root@elk1 bin]# systemctl restart elasticsearch
[root@elk1 bin]# /root/elasticsearch-head-master/node_modules/grunt/bin
[root@elk1 bin]# ./grunt server
>> Local Npm module "grunt-contrib-jasmine" not found. Is it installed?
Running "connect:server" (connect) task
Waiting forever...
Started connect web server on http://192.168.0.102:9100
//另起一個(gè)終端查看 也可使用nohup ./grunt server & 讓他在后臺(tái)運(yùn)行
[root@elk1 ~]# ss -antl
State Recv-Q Send-Q Local Address:Port Peer Address:Port Process
LISTEN 0 128 0.0.0.0:22 0.0.0.0:*
LISTEN 0 511 192.168.0.102:9100 0.0.0.0:*
LISTEN 0 2048 *:9200 *:*
LISTEN 0 2048 *:9300 *:*
LISTEN 0 128 [::]:22 [::]:*
//部署kabana
https://artifacts.elastic.co/downloads/kibana/kibana-7.6.0-x86_64.rpm
[root@elk1 opt]# ls
elasticsearch-7.6.0-x86_64.rpm elasticsearch-head-master elasticsearch-head-master.zip kibana-7.6.0-x86_64.rpm node-v12.18.1-linux-x64.tar.gz phantomjs-2.1.1-linux-x86_64.tar.bz2
[root@elk1 opt]# rpm -ivh kibana-7.6.0-x86_64.rpm
[root@elk1 opt]# vi /etc/kibana/kibana.yml //修改以下內(nèi)容
server.port: 5601 #kibana服務(wù)端口,默認(rèn)5601
server.host: "192.168.0.102" #kibana主機(jī)IP地址,默認(rèn)localhost
server.name: "elk-application" #kibana訪問(wèn)域名
elasticsearch.hosts: ["http://192.168.0.102:9200"] #es地址
i18n.locale: "zh-CN" #頁(yè)面使用中文
[root@elk1 opt]# systemctl daemon-reload
[root@elk1 opt]# systemctl restart kibana
[root@elk1 opt]# systemctl enable kibana
[root@elk1 opt]# systemctl status kibana
● kibana.service - Kibana
Loaded: loaded (/etc/systemd/system/kibana.service; enabled; vendor preset: disabled)
Active: active (running) since Wed 2023-07-19 15:16:32 CST; 1min 30s ago
Main PID: 13519 (node)
Tasks: 11 (limit: 23009)
Memory: 440.2M
CGroup: /system.slice/kibana.service
└─13519 /usr/share/kibana/bin/../node/bin/node /usr/share/kibana/bin/../src/cli -c /etc/kibana/kibana.yml
Jul 19 15:16:32 elk1 systemd[1]: Started Kibana.
Jul 19 15:17:24 elk1 kibana[13519]: {"type":"log","@timestamp":"2023-07-19T07:17:24Z","tags":["info","plugins-service"],"pid":13519,"message":"Plugin \"case\" is disabled."}
[root@elk1 opt]# ss -antl
State Recv-Q Send-Q Local Address:Port Peer Address:Port Process
LISTEN 0 128 0.0.0.0:22 0.0.0.0:*
LISTEN 0 511 192.168.0.102:5601 0.0.0.0:*
LISTEN 0 511 192.168.0.102:9100 0.0.0.0:*
LISTEN 0 2048 *:9200 *:*
LISTEN 0 2048 *:9300 *:*
LISTEN 0 128 [::]:22 [::]:*
測(cè)試
//部署zookeeper
http://archive.apache.org/dist/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz
[root@elk1 opt]# ls
elasticsearch-7.6.0-x86_64.rpm elasticsearch-head-master.zip node-v12.18.1-linux-x64.tar.gz zookeeper-3.4.14.tar.gz
elasticsearch-head-master kibana-7.6.0-x86_64.rpm phantomjs-2.1.1-linux-x86_64.tar.bz2
[root@elk1 opt]# tar xf zookeeper-3.4.14.tar.gz -C /data/
[root@elk1 opt]# mv /data/zookeeper-3.4.14 /data/zookeeper
[root@elk1 opt]# mkdir /data/zookeeper/{data,logs}
[root@elk1 opt]# ls /data/zookeeper/
bin data ivy.xml logs README.md zookeeper-3.4.14.jar zookeeper-3.4.14.jar.sha1 zookeeper-docs zookeeper-recipes
build.xml dist-maven lib NOTICE.txt README_packaging.txt zookeeper-3.4.14.jar.asc zookeeper-client zookeeper-it zookeeper-server
conf ivysettings.xml LICENSE.txt pom.xml src zookeeper-3.4.14.jar.md5 zookeeper-contrib zookeeper-jute
[root@elk1 opt]# cd /data/zookeeper/conf/
[root@elk1 conf]# cp zoo_sample.cfg zoo.cfg
[root@elk1 conf]# vi zoo.cfg
[root@elk1 conf]# grep -v "^#" zoo.cfg
tickTime=2000 #ZK服務(wù)器之間或客戶端與服務(wù)器之間維持心跳的時(shí)間間隔。
initLimit=10 #允許follower(相對(duì)于Leaderer言的“客戶端”)連接并同步到Leader的初始化連接時(shí)間,以tickTime為單位。當(dāng)初始化連接時(shí)間超過(guò)該值,則表示連接失敗。
syncLimit=5 #Leader與Follower之間發(fā)送消息時(shí),請(qǐng)求和應(yīng)答時(shí)間長(zhǎng)度。如果follower在設(shè)置時(shí)間內(nèi)不能與leader通信,那么此follower將會(huì)被丟棄
dataDir=/data/zookeeper/data #ZK數(shù)據(jù)存放目錄。
DataLogDir=/data/zookeeper/logs #ZK日志存放目錄。
clientPort=2181 #客戶端連接ZK服務(wù)的端口。
server.1=192.168.0.102:2888:3888 #2888是follower與leader交換信息的端口,3888是當(dāng)leader掛了時(shí)用來(lái)執(zhí)行選舉時(shí)服務(wù)器相互通信的端口。
server.2=192.168.0.103:2888:3888
server.3=192.168.0.104:2888:3888
[root@elk1 conf]# echo 1 > /data/zookeeper/data/myid
//其余兩個(gè)節(jié)點(diǎn)配置文件一樣 id文件不同 操作也一樣
[root@elk2 opt]# tar xf zookeeper-3.4.14.tar.gz -C /data/
[root@elk2 opt]# mv /data/zookeeper-3.4.14 /data/zookeeper
[root@elk2 opt]# mkdir /data/zookeeper/{data,logs}
[root@elk2 opt]# grep -v "^#" /data/zookeeper/conf/zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/zookeeper/data
DataLogDir=/data/zookeeper/logs
clientPort=2181
server.1=192.168.0.102:2888:3888
server.2=192.168.0.103:2888:3888
server.3=192.168.0.104:2888:3888
[root@elk2 opt]# echo 2 > /data/zookeeper/data/myid
[root@client opt]# mkdir /data
[root@client opt]# tar xf zookeeper-3.4.14.tar.gz -C /data/
[root@client opt]# mv /data/zookeeper-3.4.14 /data/zookeeper
[root@client opt]# mkdir /data/zookeeper/{data,logs}
[root@client opt]# grep -v "^#" /data/zookeeper/conf/zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/zookeeper/data
DataLogDir=/data/zookeeper/logs
clientPort=2181
server.1=192.168.0.102:2888:3888
server.2=192.168.0.103:2888:3888
server.3=192.168.0.104:2888:3888
[root@client opt]# echo 3 > /data/zookeeper/data/myid
//啟動(dòng)所有節(jié)點(diǎn)
zookeeper集群必須保證有兩個(gè)節(jié)點(diǎn)存活,也就是必須同時(shí)啟動(dòng)兩個(gè)節(jié)點(diǎn),否則集群將啟動(dòng)不成功,因此要修改好配置文件后,在統(tǒng)一啟動(dòng)
[root@elk1 zookeeper]# cd bin/
[root@elk1 bin]# ls
README.txt zkCleanup.sh zkCli.cmd zkCli.sh zkEnv.cmd zkEnv.sh zkServer.cmd zkServer.sh zkTxnLogToolkit.cmd zkTxnLogToolkit.sh
[root@elk1 bin]# ./zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /data/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[root@elk1 bin]# ./zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /data/zookeeper/bin/../conf/zoo.cfg
Mode: follower
[root@elk2 opt]# cd /data/zookeeper/bin/
[root@elk2 bin]# ./zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /data/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[root@elk2 bin]# ./zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /data/zookeeper/bin/../conf/zoo.cfg
Mode: follower
[root@client opt]# cd /data/zookeeper/bin/
[root@client bin]# ./zkServer.sh start
ZooKeeper JMX enabled by default
Using config: /data/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[root@client bin]# ./zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /data/zookeeper/bin/../conf/zoo.cfg
Mode: leader
//部署kafka
https://archive.apache.org/dist/kafka/2.0.0/kafka_2.11-2.0.0.tgz
[root@elk1 opt]# tar xf kafka_2.11-2.0.0.tgz -C /data/
[root@elk1 opt]# mv /data/kafka_2.11-2.0.0 /data/kafka
[root@elk1 opt]# cd /data/kafka/
[root@elk1 kafka]# grep -v "#" config/server.properties
broker.id=1
listeners=PLAINTEXT://192.168.0.102:9092
host.name=192.168.0.102
advertised.listeners=PLAINTEXT://192.168.0.102:9092
advertised.host.name=192.168.0.102
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/data
num.partitions=3
delete.topic.enable=true
auto.create.topics.enable=true
replica.fetch.max.bytes=5242880
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
message.max.byte=5242880
log.cleaner.enable=true
log.retention.hours=48
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.0.102:2181,192.168.0.103:2181,192.168.0.104:2181
zookeeper.connection.timeout.ms=60000
group.initial.rebalance.delay.ms=0
broker.id 每個(gè)server需要單獨(dú)配置broker id,如果不配置系統(tǒng)會(huì)自動(dòng)配置。
listeners 監(jiān)聽(tīng)地址,格式PLAINTEXT://IP:端口。
num.network.threads 接收和發(fā)送網(wǎng)絡(luò)信息的線程數(shù)。
num.io.threads 服務(wù)器用于處理請(qǐng)求的線程數(shù),其中可能包括磁盤(pán)I/O。
socket.send.buffer.bytes 套接字服務(wù)器使用的發(fā)送緩沖區(qū)(SO_SNDBUF)
socket.receive.buffer.bytes 套接字服務(wù)器使用的接收緩沖區(qū)(SO_RCVBUF)
socket.request.max.bytes 套接字服務(wù)器將接受的請(qǐng)求的最大大小(防止OOM)
log.dirs 日志文件目錄。
num.partitions partition數(shù)量。
num.recovery.threads.per.data.dir 在啟動(dòng)時(shí)恢復(fù)日志、關(guān)閉時(shí)刷盤(pán)日志每個(gè)數(shù)據(jù)目錄的線程的數(shù)量,默認(rèn)1。
offsets.topic.replication.factor 偏移量話題的復(fù)制因子(設(shè)置更高保證可用),為了保證有效的復(fù)制,偏移話題的復(fù)制因子是可配置的,在偏移話題的第一次請(qǐng)求的時(shí)候可用的broker的數(shù)量至少為復(fù)制因子的大小,否則要么話題創(chuàng)建失敗,要么復(fù)制因子取可用broker的數(shù)量和配置復(fù)制因子的最小值。
log.retention.hours 日志文件刪除之前保留的時(shí)間(單位小時(shí)),默認(rèn)168
log.segment.bytes 單個(gè)日志文件的大小,默認(rèn)1073741824
log.retention.check.interval.ms 檢查日志段以查看是否可以根據(jù)保留策略刪除它們的時(shí)間間隔。
zookeeper.connect ZK主機(jī)地址,如果zookeeper是集群則以逗號(hào)隔開(kāi)。
zookeeper.connection.timeout.ms 連接到Zookeeper的超時(shí)時(shí)間。
[root@elk1 kafka]# mkdir /data/kafka/data
[root@elk2 opt]# tar xf kafka_2.11-2.0.0.tgz -C /data/
[root@elk2 opt]# mv /data/kafka_2.11-2.0.0 /data/kafka
[root@elk2 opt]# vi /data/kafka/config/server.properties
broker.id=2
listeners=PLAINTEXT://192.168.0.103:9092
host.name=192.168.0.103
advertised.listeners=PLAINTEXT://192.168.0.103:9092
advertised.host.name=192.168.0.103
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/data
num.partitions=3
delete.topic.enable=true
auto.create.topics.enable=true
replica.fetch.max.bytes=5242880
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
message.max.byte=5242880
log.cleaner.enable=true
log.retention.hours=48
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.0.102:2181,192.168.0.103:2181,192.168.0.104:2181
zookeeper.connection.timeout.ms=60000
group.initial.rebalance.delay.ms=0
[root@elk2 opt]# mkdir /data/kafka/data
[root@client opt]# tar xf kafka_2.11-2.0.0.tgz -C /data/
[root@client opt]# mv /data/kafka_2.11-2.0.0 /data/kafka
[root@client opt]# vi /data/kafka/config/server.properties
broker.id=3
listeners=PLAINTEXT://192.168.0.104:9092
host.name=192.168.0.104
advertised.listeners=PLAINTEXT://192.168.0.104:9092
advertised.host.name=192.168.0.104
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/data
num.partitions=3
delete.topic.enable=true
auto.create.topics.enable=true
replica.fetch.max.bytes=5242880
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=3
message.max.byte=5242880
log.cleaner.enable=true
log.retention.hours=48
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.0.102:2181,192.168.0.103:2181,192.168.0.104:2181
zookeeper.connection.timeout.ms=60000
group.initial.rebalance.delay.ms=0
[root@client opt]# mkdir /data/kafka/data
//啟動(dòng)kafka
[root@elk1 kafka]# cd /data/kafka/bin/
[root@elk1 bin]# ls
connect-distributed.sh kafka-console-producer.sh kafka-log-dirs.sh kafka-run-class.sh kafka-verifiable-producer.sh zookeeper-shell.sh
connect-standalone.sh kafka-consumer-groups.sh kafka-mirror-maker.sh kafka-server-start.sh trogdor.sh
kafka-acls.sh kafka-consumer-perf-test.sh kafka-preferred-replica-election.sh kafka-server-stop.sh windows
kafka-broker-api-versions.sh kafka-delegation-tokens.sh kafka-producer-perf-test.sh kafka-streams-application-reset.sh zookeeper-security-migration.sh
kafka-configs.sh kafka-delete-records.sh kafka-reassign-partitions.sh kafka-topics.sh zookeeper-server-start.sh
kafka-console-consumer.sh kafka-dump-log.sh kafka-replica-verification.sh kafka-verifiable-consumer.sh zookeeper-server-stop.sh
[root@elk1 bin]# ./kafka-server-start.sh -daemon /data/kafka/config/server.properties
[root@elk1 bin]# ss -antl
State Recv-Q Send-Q Local Address:Port Peer Address:Port Process
LISTEN 0 128 0.0.0.0:22 0.0.0.0:*
LISTEN 0 511 192.168.0.102:5601 0.0.0.0:*
LISTEN 0 511 192.168.0.102:9100 0.0.0.0:*
LISTEN 0 50 [::ffff:192.168.0.102]:3888 *:*
LISTEN 0 2048 *:9200 *:*
LISTEN 0 2048 *:9300 *:*
LISTEN 0 128 [::]:22 [::]:*
LISTEN 0 50 *:39769 *:*
LISTEN 0 50 *:33531 *:*
LISTEN 0 50 [::ffff:192.168.0.102]:9092 *:*
LISTEN 0 50 *:2181 *:*
[root@elk2 opt]# cd /data/kafka/bin/
[root@elk2 bin]# ./kafka-server-start.sh -daemon /data/kafka/config/server.properties
[root@elk2 bin]# ss -antl
State Recv-Q Send-Q Local Address:Port Peer Address:Port Process
LISTEN 0 128 0.0.0.0:22 0.0.0.0:*
LISTEN 0 128 [::]:22 [::]:*
LISTEN 0 50 *:33723 *:*
LISTEN 0 50 *:2181 *:*
LISTEN 0 50 [::ffff:192.168.0.103]:3888 *:*
LISTEN 0 2048 *:9200 *:*
LISTEN 0 2048 *:9300 *:*
LISTEN 0 50 *:44597 *:*
[root@client opt]# cd /data/kafka/bin/
[root@client bin]# ./kafka-server-start.sh -daemon /data/kafka/config/server.properties
[root@client bin]# ss -antl
State Recv-Q Send-Q Local Address:Port Peer Address:Port Process
LISTEN 0 128 0.0.0.0:22 0.0.0.0:*
LISTEN 0 50 [::ffff:192.168.0.104]:3888 *:*
LISTEN 0 128 [::]:22 [::]:*
LISTEN 0 50 *:43359 *:*
LISTEN 0 50 *:38111 *:*
LISTEN 0 50 *:2181 *:*
LISTEN 0 50 [::ffff:192.168.0.104]:2888 *:*
//測(cè)試kafka與zookeeper連接
//創(chuàng)建一個(gè)topic
[root@elk1 bin]# ./kafka-topics.sh --create --zookeeper 192.168.0.102:2181,192.168.0.103:2181,192.168.0.104:2181 --replication-factor 1 --partitions 1 --topic testpic
Created topic "testpic".
//查看topic
[root@elk1 bin]# ./kafka-topics.sh --list --zookeeper 192.168.0.104:2181,192.168.0.103:2181,192.168.0.104:2181
testpic
//查看topic的描述信息
[root@elk1 bin]# ./kafka-topics.sh --describe --zookeeper 192.168.0.102:2181,192.168.0.103:2181,192.168.0.104:2181 --topic testpic
Topic:testpic PartitionCount:1 ReplicationFactor:1 Configs:
Topic: testpic Partition: 0 Leader: 1 Replicas: 1 Isr: 1
//使用kafka-console-producer控制臺(tái)生產(chǎn)數(shù)據(jù)
[root@elk1 bin]# ./kafka-console-producer.sh --broker-list 192.168.0.102:9092,192.168.0.103:9092,192.168.0.104:9092 --topic testpic
>test1
>yes
>test2
>test3
>test4
>test5
>test6
>test7
>test8
>test9
//使用kafka-console-consumer控制臺(tái)消費(fèi)數(shù)據(jù)
[root@elk1 bin]# ./kafka-console-consumer.sh --bootstrap-server 192.168.0.102:9092,192.168.0.103:9092,192.168.0.104:9092 --topic testpic --from-beginning
test1
yes
test2
test3
test4
test5
test6
test7
test8
test9
//刪除一個(gè)topic
[root@elk1 bin]# ./kafka-topics.sh --delete --zookeeper 192.168.0.102:2181 --topic testpic
Topic testpic is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
//配置filebeat收集nginx、tomcat日志存儲(chǔ)到kafka中 安裝配置nginx
[root@client ~]# yum -y install nginx
[root@client ~]# vi /etc/nginx/nginx.conf
http {
log_format main '{"時(shí)間":"$time_iso8601",'
'"客戶端外網(wǎng)地址":"$http_x_forwarded_for",'
'"客戶端內(nèi)網(wǎng)地址":"$remote_addr",'
'"狀態(tài)碼":$status,'
'"傳輸流量":$body_bytes_sent,'
'"跳轉(zhuǎn)來(lái)源":"$http_referer",'
'"URL":"$request",'
'"瀏覽器":"$http_user_agent",'
'"請(qǐng)求響應(yīng)時(shí)間":$request_time,'
'"后端地址":"$upstream_addr"}';
access_log /var/log/nginx/access.log main;
[root@client ~]# systemctl start nginx
[root@client ~]# systemctl enable nginx
Created symlink /etc/systemd/system/multi-user.target.wants/nginx.service → /usr/lib/systemd/system/nginx.service.
[root@client ~]# curl 127.0.0.1
[root@client ~]# tail /var/log/nginx/access.log
{"時(shí)間":"2023-07-19T17:53:02+08:00","客戶端外網(wǎng)地址":"-","客戶端內(nèi)網(wǎng)地址":"127.0.0.1","狀態(tài)碼":200,"傳輸流量":4057,"跳轉(zhuǎn)來(lái)源":"-","URL":"GET / HTTP/1.1","瀏覽器":"curl/7.61.1","請(qǐng)求響應(yīng)時(shí)間":0.000,"后端地址":"-"}
//安裝tomcat
https://downloads.apache.org/tomcat/tomcat-8/v8.5.90/bin/apache-tomcat-8.5.90.tar.gz
[root@client opt]# tar xf apache-tomcat-8.5.90.tar.gz -C /data/
[root@client opt]# mv /data/apache-tomcat-8.5.90 /data/tomcat
[root@client opt]# /data/tomcat/bin/startup.sh
Using CATALINA_BASE: /data/tomcat
Using CATALINA_HOME: /data/tomcat
Using CATALINA_TMPDIR: /data/tomcat/temp
Using JRE_HOME: /usr
Using CLASSPATH: /data/tomcat/bin/bootstrap.jar:/data/tomcat/bin/tomcat-juli.jar
Using CATALINA_OPTS:
Tomcat started.
//安裝filebeat
https://artifacts.elastic.co/downloads/beats/filebeat/filebeat-7.6.0-x86_64.rpm
https://artifacts.elastic.co/downloads/logstash/logstash-7.6.0.rpm
[root@client opt]# rpm -ivh filebeat-7.6.0-x86_64.rpm
warning: filebeat-7.6.0-x86_64.rpm: Header V4 RSA/SHA512 Signature, key ID d88e42b4: NOKEY
Verifying... ################################# [100%]
Preparing... ################################# [100%]
Updating / installing...
1:filebeat-7.6.0-1 ################################# [100%]
[root@client opt]# vi /etc/filebeat/filebeat.yml
filebeat.inputs:
- type: log #類(lèi)型為log
enabled: true
paths: #指定日志所在的路徑
- /var/log/nginx/access.log
json.keys_under_root: true #支持json格式的日志輸出
json.overwriite_keys: true
fields: #在日志中增加一個(gè)字段,字段為log_topic,值為nginx_access,logstash根據(jù)帶有這個(gè)字段的日志存儲(chǔ)到指定的es索引庫(kù)
log_topic: nginx-access
tail_files: true #開(kāi)啟日志監(jiān)控,從日志的最后一行開(kāi)始收集
- type: log
enabled: true
paths:
- /data/tomcat/logs/catalina.out
multiline.pattern: '^20' #收集tomcat錯(cuò)誤日志,從第一個(gè)20到下一個(gè)20之間的日志整合在一行中顯示
multiline.negate: true
multiline.match: after
fields:
log_topic: tomcat-cata
tail_files: true
output.kafka: #輸出到kafka系統(tǒng)
enabled: true
hosts: ["192.168.0.102:9092","192.168.0.103:9092","192.168.0.104:9092"] #kafka的地址
topic: '%{[fields][log_topic]}' #指定將日志存儲(chǔ)到kafka集群的哪個(gè)topic中,這里的topic值是引用在inputs中定義的fields,通過(guò)這種方式可以將不同路徑的日志分別存儲(chǔ)到不同的topic中
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
max_message_bytes: 1000000
[root@client opt]# systemctl start filebeat
[root@client opt]# systemctl enable filebeat
Synchronizing state of filebeat.service with SysV service script with /usr/lib/systemd/systemd-sysv-install.
Executing: /usr/lib/systemd/systemd-sysv-install enable filebeat
Created symlink /etc/systemd/system/multi-user.target.wants/filebeat.service → /usr/lib/systemd/system/filebeat.service.
[root@client opt]# systemctl status filebeat
● filebeat.service - Filebeat sends log files to Logstash or directly to Elasticsearch.
Loaded: loaded (/usr/lib/systemd/system/filebeat.service; enabled; vendor preset: disabled)
Active: active (running) since Wed 2023-07-19 18:01:48 CST; 22s ago
Docs: https://www.elastic.co/products/beats/filebeat
Main PID: 13616 (filebeat)
Tasks: 9 (limit: 10931)
Memory: 13.5M
CGroup: /system.slice/filebeat.service
└─13616 /usr/share/filebeat/bin/filebeat -e -c /etc/filebeat/filebeat.yml -path.home /usr/share/filebeat -path.config /etc/filebeat -path.data /var/lib/filebeat -path.logs /var/log/fi>
//產(chǎn)生程序日志數(shù)據(jù)觀察數(shù)據(jù)是否存儲(chǔ)kafka
[root@client opt]# yum -y install httpd-tools-2.4.37-41.module_el8.5.0+977+5653bbea.x86_64
[root@client opt]# ab -n 1000 -c 100 http://127.0.0.1/index.html
[root@client opt]# /data/tomcat/bin/shutdown.sh
[root@client opt]# /data/tomcat/bin/startup.sh
[root@elk1 bin]# ./kafka-topics.sh --list --zookeeper 192.168.0.102:2181,192.168.0.103:2181,192.168.0.104:2181
__consumer_offsets
nginx-access
tomcat-cata
//nginx-access以及tomcat-cata的topic已經(jīng)創(chuàng)建成功
//觀察kafka日志
[root@elk1 kafka]# tail -f logs/kafkaServer.out
[2023-07-19 18:05:49,763] INFO [ReplicaAlterLogDirsManager on broker 1] Added fetcher for partitions List() (kafka.server.ReplicaAlterLogDirsManager)
[2023-07-19 18:05:52,983] INFO Updated PartitionLeaderEpoch. New: {epoch:0, offset:0}, Current: {epoch:-1, offset:-1} for Partition: nginx-access-1. Cache now contains 0 entries. (kafka.server.epoch.LeaderEpochFileCache)
[2023-07-19 18:06:09,784] INFO [ReplicaFetcherManager on broker 1] Removed fetcher for partitions tomcat-cata-2 (kafka.server.ReplicaFetcherManager)
[2023-07-19 18:06:09,958] INFO [Log partition=tomcat-cata-2, dir=/data/kafka/data] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)
[2023-07-19 18:06:09,958] INFO [Log partition=tomcat-cata-2, dir=/data/kafka/data] Completed load of log with 1 segments, log start offset 0 and log end offset 0 in 2 ms (kafka.log.Log)
[2023-07-19 18:06:09,960] INFO Created log for partition tomcat-cata-2 in /data/kafka/data with properties {compression.type -> producer, message.format.version -> 2.0-IV1, file.delete.delay.ms -> 60000, max.message.bytes -> 1000012, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, message.downconversion.enable -> true, min.insync.replicas -> 1, segment.jitter.ms -> 0, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, cleanup.policy -> [delete], flush.ms -> 9223372036854775807, segment.ms -> 604800000, segment.bytes -> 1073741824, retention.ms -> 172800000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760, flush.messages -> 9223372036854775807}. (kafka.log.LogManager)
[2023-07-19 18:06:09,965] INFO [Partition tomcat-cata-2 broker=1] No checkpointed highwatermark is found for partition tomcat-cata-2 (kafka.cluster.Partition)
[2023-07-19 18:06:09,965] INFO Replica loaded for partition tomcat-cata-2 with initial high watermark 0 (kafka.cluster.Replica)
[2023-07-19 18:06:09,965] INFO [Partition tomcat-cata-2 broker=1] tomcat-cata-2 starts at Leader Epoch 0 from offset 0. Previous Leader Epoch was: -1 (kafka.cluster.Partition)
[2023-07-19 18:06:09,967] INFO [ReplicaAlterLogDirsManager on broker 1] Added fetcher for partitions List() (kafka.server.ReplicaAlterLogDirsManager)
//配置logstash從kafka中讀取數(shù)據(jù)并存儲(chǔ)到es集群
部署logstash,配置logstash從kafka中讀取topic數(shù)據(jù)并存儲(chǔ)es集群
//部署logstash
[root@client opt]# rpm -ivh logstash-7.6.0.rpm
warning: logstash-7.6.0.rpm: Header V4 RSA/SHA512 Signature, key ID d88e42b4: NOKEY
Verifying... ################################# [100%]
Preparing... ################################# [100%]
Updating / installing...
1:logstash-1:7.6.0-1 ################################# [100%]
Using provided startup.options file: /etc/logstash/startup.options
/usr/share/logstash/vendor/bundle/jruby/2.5.0/gems/pleaserun-0.0.30/lib/pleaserun/platform/base.rb:112: warning: constant ::Fixnum is deprecated
Successfully created system startup script for Logstash
//配置logstash從kafka讀取數(shù)據(jù)存儲(chǔ)到es集群
[root@client opt]# vi /etc/logstash/conf.d/in_kafka_to_es.conf
[root@client opt]# cat /etc/logstash/conf.d/in_kafka_to_es.conf
#從kafka中讀取日志數(shù)據(jù)
input { #數(shù)據(jù)源端
kafka { #類(lèi)型為kafka
bootstrap_servers => ["192.168.0.102:9092,192.168.0.103:9092,192.168.0.104:9092"] #kafka集群地址
topics => ["nginx-access","tomcat-cata"] #要讀取那些kafka topics
codec => "json" #處理json格式的數(shù)據(jù)
auto_offset_reset => "latest" #只消費(fèi)最新的kafka數(shù)據(jù)
}
}
#處理數(shù)據(jù),去掉沒(méi)用的字段
filter {
if[fields][log_topic] == "nginx-access" { #如果log_topic字段為nginx-access則進(jìn)行以下數(shù)據(jù)處理
json { #json格式數(shù)據(jù)處理
source => "message" #source等于message的
remove_field => ["@version","path","beat","input","log","offset","prospector","source","tags"] #刪除指定的字段
}
mutate { #修改數(shù)據(jù)
remove_field => ["_index","_id","_type","_version","_score","referer","agent"] #刪除沒(méi)用的字段
}
}
if[fields][log_topic] == "tomcat-cata" { #如果log_topic字段為tomcat-cata
grok { #解析格式
match => {
"message" => "(?<時(shí)間>20[0-9]{2}-[0-9]{2}-[0-9]{2} [0-9]{2}:[0-9]{2}:[0-9]{2}) \[(?<線程名稱>[^\s]{0,})\] (?<日志等級(jí)>\w+) (?<類(lèi)名稱>[^\s]{0,}) (?<日志詳情>[\W\w]+)" #將message的值增加上一些格式
}
}
mutate { #修改數(shù)據(jù)
remove_field => ["_index","_id","_type","_version","_score","referer","agent"] #刪除沒(méi)用的字段
}
}
}
#數(shù)據(jù)處理后存儲(chǔ)es集群
output { #目標(biāo)端
if[fields][log_topic] == "nginx-access" { #如果log_topic的字段值為nginx-access就存到下面的es集群里
elasticsearch {
action => "index" #類(lèi)型為索引
hosts => ["192.168.0.102:9200","192.168.0.103:9200"] #es集群地址
index => "nginx-access-%{+YYYY.MM.dd}" #存儲(chǔ)到es集群的哪個(gè)索引里
codec => "json" #處理json格式的解析
}
}
if[fields][log_topic] == "tomcat-cata" { #如果log_topic的字段值為tomcat-cata就存到下面的es集群里
elasticsearch {
action => "index" #類(lèi)型為索引
hosts => ["192.168.0.102:9200","192.168.0.103:9200"] #es集群地址
index => "tomcat-cata-%{+YYYY.MM.dd}" #存儲(chǔ)到es集群的哪個(gè)索引里
codec => "json" #處理json格式的解析
}
}
}
[root@client logstash]# /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/in_kafka_to_es.conf
//啟動(dòng)logstash并觀察日志
/etc/logstash/conf.d/in_kafka_to_es.conf
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
[INFO ] 2023-07-20 09:15:38.121 [Agent thread] agent - Pipelines running {:count=>1, :running_pipelines=>[:main], :non_running_pipelines=>[]}
[INFO ] 2023-07-20 09:15:38.320 [[main]<kafka] AppInfoParser - Kafka version: 2.3.0
[INFO ] 2023-07-20 09:15:38.320 [[main]<kafka] AppInfoParser - Kafka commitId: fc1aaa116b661c8a
[INFO ] 2023-07-20 09:15:38.320 [[main]<kafka] AppInfoParser - Kafka startTimeMs: 1689815738309
[INFO ] 2023-07-20 09:15:38.429 [Ruby-0-Thread-14: :1] KafkaConsumer - [Consumer clientId=logstash-0, groupId=logstash] Subscribed to topic(s): nginx-access, tomcat-cata
[INFO ] 2023-07-20 09:15:40.725 [Api Webserver] agent - Successfully started Logstash API endpoint {:port=>9600}
[INFO ] 2023-07-20 09:15:41.301 [Ruby-0-Thread-14: :1] Metadata - [Consumer clientId=logstash-0, groupId=logstash] Cluster ID: Y3evIgStTDCDwhstOK0HEw
[INFO ] 2023-07-20 09:15:41.314 [Ruby-0-Thread-14: :1] AbstractCoordinator - [Consumer clientId=logstash-0, groupId=logstash] Discovered group coordinator 192.168.0.104:9092 (id: 2147483644 rack: null)
[INFO ] 2023-07-20 09:15:41.361 [Ruby-0-Thread-14: :1] ConsumerCoordinator - [Consumer clientId=logstash-0, groupId=logstash] Revoking previously assigned partitions []
[INFO ] 2023-07-20 09:15:41.361 [Ruby-0-Thread-14: :1] AbstractCoordinator - [Consumer clientId=logstash-0, groupId=logstash] (Re-)joining group
[INFO ] 2023-07-20 09:15:44.785 [Ruby-0-Thread-14: :1] AbstractCoordinator - [Consumer clientId=logstash-0, groupId=logstash] Successfully joined group with generation 1
[INFO ] 2023-07-20 09:15:44.796 [Ruby-0-Thread-14: :1] ConsumerCoordinator - [Consumer clientId=logstash-0, groupId=logstash] Setting newly assigned partitions: tomcat-cata-0, tomcat-cata-1, tomcat-cata-2, nginx-access-0, nginx-access-1, nginx-access-2
[INFO ] 2023-07-20 09:15:45.016 [Ruby-0-Thread-14: :1] ConsumerCoordinator - [Consumer clientId=logstash-0, groupId=logstash] Found no committed offset for partition tomcat-cata-0
[INFO ] 2023-07-20 09:15:45.016 [Ruby-0-Thread-14: :1] ConsumerCoordinator - [Consumer clientId=logstash-0, groupId=logstash] Found no committed offset for partition tomcat-cata-1
[INFO ] 2023-07-20 09:15:45.016 [Ruby-0-Thread-14: :1] ConsumerCoordinator - [Consumer clientId=logstash-0, groupId=logstash] Found no committed offset for partition tomcat-cata-2
[INFO ] 2023-07-20 09:15:45.016 [Ruby-0-Thread-14: :1] ConsumerCoordinator - [Consumer clientId=logstash-0, groupId=logstash] Found no committed offset for partition nginx-access-0
[INFO ] 2023-07-20 09:15:45.016 [Ruby-0-Thread-14: :1] ConsumerCoordinator - [Consumer clientId=logstash-0, groupId=logstash] Found no committed offset for partition nginx-access-1
[INFO ] 2023-07-20 09:15:45.016 [Ruby-0-Thread-14: :1] ConsumerCoordinator - [Consumer clientId=logstash-0, groupId=logstash] Found no committed offset for partition nginx-access-2
[INFO ] 2023-07-20 09:15:45.084 [Ruby-0-Thread-14: :1] SubscriptionState - [Consumer clientId=logstash-0, groupId=logstash] Resetting offset for partition tomcat-cata-0 to offset 1.
[INFO ] 2023-07-20 09:15:45.084 [Ruby-0-Thread-14: :1] SubscriptionState - [Consumer clientId=logstash-0, groupId=logstash] Resetting offset for partition nginx-access-2 to offset 333.
[INFO ] 2023-07-20 09:15:45.299 [Ruby-0-Thread-14: :1] SubscriptionState - [Consumer clientId=logstash-0, groupId=logstash] Resetting offset for partition tomcat-cata-2 to offset 0.
[INFO ] 2023-07-20 09:15:45.299 [Ruby-0-Thread-14: :1] SubscriptionState - [Consumer clientId=logstash-0, groupId=logstash] Resetting offset for partition nginx-access-1 to offset 333.
[INFO ] 2023-07-20 09:15:46.512 [Ruby-0-Thread-14: :1] SubscriptionState - [Consumer clientId=logstash-0, groupId=logstash] Resetting offset for partition tomcat-cata-1 to offset 0.
[INFO ] 2023-07-20 09:15:46.512 [Ruby-0-Thread-14: :1] SubscriptionState - [Consumer clientId=logstash-0, groupId=logstash] Resetting offset for partition nginx-access-0 to offset 334.
查看elasticsearch集群是否增加了對(duì)應(yīng)的索引庫(kù)
在kibana上關(guān)聯(lián)elasticsearch索引庫(kù)瀏覽日志數(shù)據(jù)
填寫(xiě)索引名
采用通配符的方式
添加一個(gè)時(shí)間篩選字段
創(chuàng)建成功文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-600593.html
同樣的方式創(chuàng)建tomcat-cata索引
查詢nginx-access索引日志數(shù)據(jù)
查詢tomcat-cata索引日志數(shù)據(jù)
kafka zookeeper logstash的service文件文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-600593.html
[Unit]
Description=Kafka server daemon
Wants=network.target
[Service]
Type=forking
Environment="PATH=/usr/local/sbin/:/usr/local/bin:/usr/bin:/sbin:/bin:/usr/bin/java" //根據(jù)自己安裝的java配寫(xiě)路徑
ExecStart=/data/kafka/bin/kafka-server-start.sh -daemon /data/kafka/config/server.properties
ExecStop=/data/kafka/bin/kafka-server-stop.sh
ExecReload=/bin/kill -s HUP $MATNPID
PrivateTmp=true
[Install]
WantedBy=multi-user.target
[Unit]
Description=Zookeeper server daemon
Wants=network.target
[Service]
Type=forking
Environment="PATH=/usr/local/sbin/:/usr/local/bin:/usr/bin:/sbin:/bin:/usr/bin/java"
ExecStart=/data/zookeeper/bin/zkServer.sh start
ExecStop=/data/zookeeper/bin/zkServer.sh stop
ExecReload=/data/zookeeper/bin/zkServer.sh restart
[Install]
WantedBy=multi-user.target
[Unit]
Description=Logstash server daemon
Wants=network.target
[Service]
Type=forking
Environment="PATH=/usr/local/sbin/:/usr/local/bin:/usr/bin:/sbin:/bin:/usr/bin/java"
ExecStart=/usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/in_kafka_to_es.conf
ExecReload=/bin/kill -s HUP $MATNPID
PrivateTmp=true
[Install]
WantedBy=multi-user.target
到了這里,關(guān)于部署ELK+Kafka+Filebeat日志收集分析系統(tǒng)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!