作者:來自 Elastic?Carly Richmond
來自不健康應(yīng)用程序服務(wù)的重復(fù)事件使日志搜索變得棘手。 查看如何使用 Logstash、Beats 和 Elastic Agent 處理重復(fù)項。
使用 Elasticsearch 進行日志重復(fù)數(shù)據(jù)刪除
SRE 每天都會被來自嘈雜應(yīng)用程序的大量日志淹沒。 Frederick P. Brooks 在他的開創(chuàng)性著作《人月神話》中說,“所有程序員都是樂觀主義者”。 這種樂觀態(tài)度體現(xiàn)在軟件工程師沒有采取控制措施來阻止其應(yīng)用程序在異常故障情況下發(fā)送連續(xù)日志。 在擁有集中式日志記錄平臺的大型組織中,大量事件被吸收到日志記錄平臺中,并占用大量存儲量和處理計算。 在人員方面,這讓 SRE 感到不知所措,并遭受警報疲勞,因為他們被消息浪潮吞沒,有點像這樣:
構(gòu)建包括微服務(wù)和關(guān)鍵應(yīng)用程序在內(nèi)的軟件的開發(fā)人員有責(zé)任確保他們不發(fā)送重復(fù)的日志事件,并且在正確的級別發(fā)送正確的日志事件。 然而,使用第三方解決方案或維護老化服務(wù)等情況意味著我們不能始終保證采用負責(zé)任的日志記錄實踐。 即使我們按照本文中關(guān)于從傳入日志事件中修剪字段的內(nèi)容刪除不必要的字段,我們?nèi)匀淮嬖诖鎯Υ罅恐貜?fù)事件的問題。 在這里,我們討論從有問題的服務(wù)中識別重復(fù)日志的挑戰(zhàn),以及如何使用 Elastic Beats、Logstash 和 Elastic Agent 刪除重復(fù)數(shù)據(jù)。
什么是重復(fù)日志條目?
在深入研究防止這些重復(fù)項進入你的日志平臺的各種方法之前,我們需要了解什么是重復(fù)項。 在我之前擔(dān)任軟件工程師時,我負責(zé)開發(fā)和維護龐大的微服務(wù)生態(tài)系統(tǒng)。 有些人考慮過重試邏輯,在一段時間后,將正常關(guān)閉服務(wù)并觸發(fā)適當(dāng)?shù)木瘓蟆?然而,并非所有服務(wù)都是為了妥善處理這些情況而構(gòu)建的。 服務(wù)配置錯誤也會導(dǎo)致事件重復(fù)。 無意中將生產(chǎn)日志級別從 WARN 更改為 TRACE 可能會導(dǎo)致日志記錄平臺必須處理更嚴重的事件量。
Elasticsearch 會自動為攝取的每個文檔生成唯一 ID,除非文檔在攝取時包含 _id 字段。 因此,如果你的服務(wù)發(fā)送重復(fù)的警報,你將面臨將同一事件存儲為具有不同 ID 的多個文檔的風(fēng)險。 另一個原因可能是用于日志收集的工具的重試機制。 一個值得注意的例子是 Filebeat,其中丟失的連接或關(guān)閉可能會導(dǎo)致 Filebeat 的重試機制重新發(fā)送事件,直到輸出確認收到事件。
Elasticsearch 中出現(xiàn)重復(fù)的原因是什么?
當(dāng) Filebeat 輸出被阻止時,F(xiàn)ilebeat 中的重試機制會嘗試重新發(fā)送事件,直到輸出確認這些事件。 如果輸出接收到事件,但無法確認它們,則數(shù)據(jù)可能會多次發(fā)送到輸出。 由于文檔 ID 通常由 Elasticsearch 在接收到來自 Beats 的數(shù)據(jù)后設(shè)置,因此重復(fù)事件將被索引為新文檔。
工具概述
在本博客中,我們將研究四種 Elastic 工具中可用的工具:
- Logstash 是一個免費、開放的 ETL 管道工具,允許您在無數(shù)源之間攝取、轉(zhuǎn)換和輸出數(shù)據(jù),包括從 Elasticsearch 攝取和輸出。 這些示例將用于展示 Elasticsearch 在生成帶有和不帶有特定 ID 的日志時的不同行為。更多描述關(guān)于如何使用 Logstash 的文章請閱讀 “Logstash:Logstash 入門教程 (一)”。
- Beats 是一系列輕量級托運程序,使我們不僅可以將給定源的事件攝取到 Elasticsearch 中,還可以將事件攝取到其他輸出中,包括 Kafka、Redis 或 Logstash。更多描述如何使用 Beats 的文章請詳細閱讀文章 “Beats:Beats 入門教程 (一)”。
- Ingest pipeline 允許將轉(zhuǎn)換和豐富應(yīng)用于攝取到 Elasticsearch 中的文檔。 這就像將 Logstash 的 filter 部分直接運行到 Elasticsearch 中,而不需要運行其他服務(wù)。 可以在 Stack Management > Ingest Pipelines?頁面中或通過 _ingest API 創(chuàng)建新管道,如文檔中所述。
- Elastic Agent 是一個單一代理,可以在你的主機上執(zhí)行,并使用各種受支持的集成將日志、指標和安全數(shù)據(jù)從多個服務(wù)和基礎(chǔ)設(shè)施發(fā)送到 Elasticsearch。 無論重復(fù)的原因是什么,Elastic 生態(tài)系統(tǒng)中都有幾種可能的行動方案。
沒有指定 ID 的攝取
默認方法是忽略并攝取所有事件。 當(dāng)文檔上未指定 ID 時,Elasticsearch 將為收到的每個文檔自動生成一個新 ID。 讓我們舉一個簡單的例子,可以在這個 GitHub 存儲庫中找到,使用簡單的 Express HTTP 服務(wù)器。 服務(wù)器在運行時公開一個端點,返回一條日志消息:
{"event":{"transaction_id":1,"data_set":"my-logging-app"},"message":"WARN: Unable to get an interesting response"}
使用 Logstash,我們可以每 60 秒輪詢一次端點 http://locahost:3000/ 并將結(jié)果發(fā)送到 Elasticsearch。 我們的 logstash.conf 如下所示:
logstash.conf
input {
http_poller {
urls => {
simple_server => "http://localhost:3000"
}
request_timeout => 60
schedule => { cron => "* * * * * UTC"}
codec => "json"
}
}
output {
elasticsearch {
cloud_id => "${ELASTIC_CLOUD_ID}"
cloud_auth => "${ELASTIC_CLOUD_AUTH}"
index => "my-logstash-index"
}
}
Logstash 將推送每個事件,并且事件上沒有任何 ID,Elasticsearch 將生成一個新的 _id 字段作為每個文檔的唯一標識符:
GET my-logstash-index/_search
{
"took": 0,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 11,
"relation": "eq"
},
"max_score": 1,
"hits": [
{
"_index": "my-logstash-index",
"_id": "-j83XYsBOwNNS8Sc0Bja",
"_score": 1,
"_source": {
"@version": "1",
"event": {
"transaction_id": 1,
"original": """{"event":{"transaction_id":1,"data_set":"my-logging-app"},"message":"WARN: Unable to get an interesting response"}""",
"data_set": "my-logging-app"
},
"message": "WARN: Unable to get an interesting response",
"@timestamp": "2023-10-23T15:47:00.528205Z"
}
},
{
"_index": "my-logstash-index",
"_id": "NT84XYsBOwNNS8ScuRlO",
"_score": 1,
"_source": {
"@version": "1",
"event": {
"transaction_id": 1,
"original": """{"event":{"transaction_id":1,"data_set":"my-logging-app"},"message":"WARN: Unable to get an interesting response"}""",
"data_set": "my-logging-app"
},
"message": "WARN: Unable to get an interesting response",
"@timestamp": "2023-10-23T15:48:00.314262Z"
}
},
// Other documents omitted
]
}
}
此行為對于 Beats、Ingest Pipelines 和 Elastic Agent 來說是一致的,因為它們將發(fā)送收到的所有事件,無需額外配置。
自帶?ID (Bring Your Own - ID)
使用現(xiàn)有 ID 為每個事件指定唯一 ID 會繞過上一節(jié)中討論的 Elasticsearch ID 生成步驟。 提取已存在此屬性的文檔將導(dǎo)致 Elasticsearch 檢查索引中是否存在具有此 ID 的文檔,如果存在則更新該文檔。 這確實會產(chǎn)生開銷,因為需要搜索索引以檢查是否存在具有相同 _id 的文檔。 擴展上面的 Logstash 示例,可以通過在 Elasticsearch 輸出插件中指定 document_id 選項來指定 Logstash 中的文檔 ID 值,該插件將用于將事件提取到 Elasticsearch 中:
# http_poller configuration omitted
output {
elasticsearch {
cloud_id => "${ELASTIC_CLOUD_ID}"
cloud_auth => "${ELASTIC_CLOUD_AUTH}"
index => "my-unique-logstash-index"
document_id => "%{[event][transaction_id]}"
}
}
這會將 _id 字段的值設(shè)置為 event.transaction_id 的值。 在我們的例子中,這意味著新文檔將在攝取時替換現(xiàn)有文檔,因為兩個文檔的 _id 均為 1:
GET my-unique-logstash-index/_search
{
"took": 48,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 1,
"relation": "eq"
},
"max_score": 1,
"hits": [
{
"_index": "my-unique-logstash-index",
"_id": "1",
"_score": 1,
"_source": {
"@timestamp": "2023-10-23T16:33:00.358585Z",
"message": "WARN: Unable to get an interesting response",
"@version": "1",
"event": {
"original": """{"event":{"transaction_id":1,"data_set":"my-logging-app"},"message":"WARN: Unable to get an interesting response"}""",
"data_set": "my-logging-app",
"transaction_id": 1
}
}
}
]
}
}
根據(jù)工具的不同,可以通過多種方式指定 ID,這將在后續(xù)部分中進一步討論。
Beats
對于 JSON 文檔(這是許多日志源的常見格式),如果你的事件確實有一個有用且有意義的 ID,可以用作文檔的唯一 ID 并防止重復(fù)條目,請使用decode_json_fields 處理器或 json .document_ID 輸入設(shè)置,如文檔中建議的那樣。 當(dāng)消息中的 JSON 字段中存在自然鍵時,此方法優(yōu)于生成密鑰。 這兩種設(shè)置均顯示在以下示例中:
filebeat.inputs:
- type: filestream
id: my-logging-app
paths:
- /var/tmp/other.log
- /var/log/*.log
json.document_id: "event.transaction_id"
# Alternative approach using decode_json_fields processor
processors:
- decode_json_fields:
document_id: "event.transaction_id"
fields: ["message"]
max_depth: 1
target: ""
更多關(guān)于如何使用 decode_json_fields,請參考 “Beats:使用 Filebeat 導(dǎo)入 JSON 格式的日志文件” 及 “Beats: 使用 Filebeat 進行日志結(jié)構(gòu)化 - Python”。
攝取管道
在這種情況下,可以使用 set processor 結(jié)合 copy_from 選項來設(shè)置 ID,以將值從唯一字段傳輸?shù)?Elasticsearch @metadata._id 屬性:
PUT _ingest/pipeline/test-pipeline
{
"processors": [
{
"set": {
"field": "_id",
"copy_from": "transaction_id"
}
}
]
}
Elastic Agent
Elastic Agent 有類似的方法,你可以使用 copy_fields 處理器將值復(fù)制到集成中的 @metadata._id 屬性:
- copy_fields:
fields:
- from: transaction_id
to: @metadata._id
fail_on_error: true
ignore_missing: true
當(dāng) fail_on_error 設(shè)置為 true 時,將通過恢復(fù)故障處理器應(yīng)用的更改來返回到先前的狀態(tài)。 同時,當(dāng) ignore_missing 設(shè)置為 false 時,只會對字段不存在的文檔觸發(fā)失敗。
自動生成的 ID
Logstash
使用事件字段子集上的指紋識別(fingerprinting)等技術(shù)生成唯一 ID。 通過對一組字段進行哈希處理,會生成一個唯一值,當(dāng)匹配時,將導(dǎo)致在 Elasticsearch 中攝取時更新原始文檔。 正如這篇關(guān)于使用 Logstash 處理重復(fù)項的文章特別概述的那樣,fingerprint filter plugin 可以配置為使用指定的哈希算法生成 ID 以字段 @metadata.fingerprint:
filter {
fingerprint {
source => ["event.start_date", "event.data_set", "message"]
target => "[@metadata][fingerprint]"
method => "SHA256"
}
}
output {
elasticsearch {
hosts => "my-elastic-cluster.com"
document_id => "%{[@metadata][fingerprint]}"
}
}
你可以詳細參考文章 “Logstash:運用 Elastic Stack 分析 CSDN 閱讀量” 來更進一步了解如何使用。
如果未指定,將使用默認哈希算法 SHA256 對組合 |event.start_date|start_date_value|event.data_set|data_set_value|message|message_value| 進行哈希處理。 如果我們想使用其他允許的算法選項之一,可以使用 method 選項指定。 這將導(dǎo)致 Elasticsearch 更新與生成的 _id 匹配的文檔:
GET my-fingerprinted-logstash-index/_search
{
"took": 8,
"timed_out": false,
"_shards": {
"total": 1,
"successful": 1,
"skipped": 0,
"failed": 0
},
"hits": {
"total": {
"value": 1,
"relation": "eq"
},
"max_score": 1,
"hits": [
{
"_index": "my-fingerprinted-logstash-index",
"_id": "b2faceea91b83a610bf64ac2b12e3d3b95527dc229118d8f819cdfaa4ba98af1",
"_score": 1,
"_source": {
"@timestamp": "2023-10-23T16:46:00.772480Z",
"message": "WARN: Unable to get an interesting response",
"@version": "1",
"event": {
"original": """{"event":{"transaction_id":1,"data_set":"my-logging-app"},"message":"WARN: Unable to get an interesting response"}""",
"data_set": "my-logging-app",
"transaction_id": 1
}
}
}
]
}
}
如果你的事件沒有單個有意義的標識字段,并且你愿意承擔(dān)生成 ID 的處理開銷,或者不同事件解析為相同生成的哈希時可能發(fā)生沖突,那么這可能是一個有用的選項。 類似的功能可用于其他工具,如后續(xù)部分所述。
Beats
Beats 和 Elastic Agent 的 add_id processor 將允許生成唯一的 Elasticsearch 兼容 ID。 默認情況下,該值將存儲在 @metadata._id 字段中,該字段是 Elasticsearch 文檔的 ID 字段。
filebeat.inputs:
- type: filestream
ID: my-logging-app
paths:
- /var/tmp/other.log
- /var/log/*.log
json.document_ID: "event.transaction_id"
processors:
- add_ID: ~
target_field: @metadata._id
或者,fingerprint processor生成由 | 操作符分隔的指定字段名稱和值對串聯(lián)而成的散列值。
filebeat.inputs:
- type: filestream
ID: my-logging-app
paths:
- /var/tmp/other.log
- /var/log/*.log
processors:
- fingerprint:
fields: ["event.start_date", "event.data_set", "message"]
target_field: "@metadata._id"
method: "sha256"
ignore_missing: false
在上面的示例中,默認哈希算法 sha256 將用于對組合 |event.start_date|start_date_value|event.data_set|data_set_value|message|message_value| 進行哈希處理。 如果我們想使用其他允許的算法選項之一,可以使用 method 選項指定。 錯誤處理也是一個重要的考慮因素,ignore_missing 選項可以提供幫助。 例如,如果給定文檔中不存在 event.start_date 字段,則當(dāng) ignore_missing 設(shè)置為 false 時將引發(fā)錯誤。 如果沒有顯式設(shè)置 ignore_missing,這是默認實現(xiàn),但通常通過將該值指定為 true 來忽略錯誤。
Elastic Agent
就像 Beats 一樣,Elastic Agent 有一個 add_id processor,可用于生成唯一 ID,如果未指定 target_field 屬性,則默認為 @metadata._id:
- add_id:
target_field: "@metadata._id"
另外,fingerprint processor 也可在 Elastic Agent 中使用,并且可應(yīng)用于包含高級配置部分(包括處理器選項)的任何集成部分。 處理器邏輯如下所示:
- fingerprint:
fields: ["event.start_date", "event.data_set", "message"]
target_field: "@metadata._id"
ignore_missing: false
method: "sha256"
以 Kafka 集成為例,上面的處理器片段可以應(yīng)用在高級配置部分的處理器段中,用于從 Kafka 代理收集日志:
就像 Beats 一樣,被哈希的值是由字段名稱和字段值串聯(lián)而成,并用 | 分隔。 例如 |field1|value1|field2|value2|。 然而,就像在 Beats 中一樣,與在 Logstash 中不同,盡管支持相同的編碼算法,但 method 值是小寫的。
攝取管道
在這里,我們將展示使用 _ingest API 創(chuàng)建帶有 fingerprint processor 的管道的示例請求。 請注意以下配置與我們的 Beats 處理器的相似之處:
PUT _ingest/pipeline/my-logging-app-pipeline
{
"description": "Event and field dropping for my-logging-app",
"processors": [
{
"fingerprint": {
fields: ["event.start_date", "event.data_set", "message"]
target_field: "@metadata._id"
ignore_missing: false
method: "SHA-256"
}
}
]
}
聚合事件
如果使用的工具支持的話,基于公共字段將事件聚合在一起是另一種選擇。 事件聚合需要權(quán)衡,因為該工具需要在內(nèi)存中保留多個事件來執(zhí)行聚合,而不是立即將事件轉(zhuǎn)發(fā)到輸出。 因此,Elastic 生態(tài)系統(tǒng)中唯一支持事件聚合的工具是 Logstash。
要在 Logstash 中實現(xiàn)基于聚合的方法,請使用 aggregation plugin。 在我們的例子中,不太可能發(fā)送特定的結(jié)束事件來區(qū)分重復(fù)項,這意味著需要按照以下示例指定 timeout 來控制批處理過程:
filter {
grok {
match => [ "message", %{NOTSPACE:event.start_date} "%{LOGLEVEL:loglevel} - %{NOTSPACE:user_ID} - %{GREEDYDATA:message}" ]
}
aggregate {
task_ID => "%{event.start_date}%{loglevel}%{user_ID}"
code => "map['error_count'] ||= 0; map['error_count'] += 1;"
push_map_as_event_on_timeout => true
timeout_task_ID_field => "user_id"
timeout => 600
timeout_tags => ['_aggregatetimeout']
timeout_code => "event.set('has_multiple_occurrences', event.get('error_count') > 1)"
}
}
上面的示例將在 600 秒或 10 分鐘后發(fā)送事件,并向事件添加 error_count 和 has_multiple_occurrences 屬性以指示聚合事件。 Push_map_as_event_on_timeout 選項將確保在每次超時時推送聚合結(jié)果,從而允許你減少警報量。 在確定數(shù)據(jù)的超時時,請考慮你的數(shù)據(jù)量并選擇盡可能低的超時,因為 Logstash 會將事件保留在內(nèi)存中,直到超時到期并推送聚合事件。
結(jié)論
日志量峰值可能會很快淹沒日志平臺和希望維護可靠應(yīng)用程序的 SRE 工程師。 我們已經(jīng)討論了使用 Elastic Beats、Logstash(可在此 GitHub 存儲庫中找到)和 Elastic Agent 處理重復(fù)事件的幾種方法。
當(dāng)使用 fingerprint processor 通過哈希算法生成 ID 或執(zhí)行聚合時,請仔細考慮用于平衡防止洪水和混淆指向你生態(tài)系統(tǒng)中大規(guī)模問題的合法流的屬性。 這兩種方法都有一定的開銷,要么是生成 ID 的處理,要么是存儲符合聚合條件的文檔的內(nèi)存開銷。
選擇一個選項實際上取決于您考慮重復(fù)的事件和性能權(quán)衡。 如前所述,當(dāng)你指定 ID 時,Elasticsearch 需要在將文檔添加到索引之前檢查是否存在與該 ID 匹配的文檔。 這會導(dǎo)致執(zhí)行 _id 存在檢查的攝取略有延遲。
使用哈希算法生成 ID 會增加額外的處理時間,因為在比較和可能攝取之前需要為每個事件生成 ID。 選擇不指定 ID 會繞過此檢查,因為 Elastic 將為你生成 ID,但會導(dǎo)致存儲所有事件,從而增加你的存儲空間。
放棄完整事件是一種合法的做法,本文未涉及。 如果您想刪除日志條目以減少數(shù)量,請查看這篇關(guān)于從傳入日志事件中修剪字段的文章。
如果此處未列出你最喜歡的重復(fù)事件刪除方法,請告訴我們!
更多閱讀:
-
Beats:如何避免重復(fù)的導(dǎo)入數(shù)據(jù)
-
Elasticsearch:消除 Elasticsearch 中的重復(fù)數(shù)據(jù)文章來源:http://www.zghlxwxcb.cn/news/detail-817035.html
原文:Log deduplication with Elasticsearch | Elastic Blog文章來源地址http://www.zghlxwxcb.cn/news/detail-817035.html
到了這里,關(guān)于使用 Elasticsearch 進行日志重復(fù)數(shù)據(jù)刪除的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!