1、背景
日志是非常重要的信息資源。它們記錄了應(yīng)用程序的運(yùn)行狀態(tài)、錯誤和異常情況,幫助我們了解系統(tǒng)的健康狀況以及發(fā)現(xiàn)潛在的問題。為了高效地管理和分析日志數(shù)據(jù),許多組織采用了Elasticsearch、Logstash和Kibana(ELK)堆棧作為日志收集和分析的解決方案。
開發(fā)一個實時監(jiān)控和告警腳本,專門用于監(jiān)控ELK平臺中的錯誤日志,并及時發(fā)送告警通知給相關(guān)人員。該系統(tǒng)將通過掃描Elasticsearch中的日志數(shù)據(jù),篩選出等級為ERROR的錯誤日志,并根據(jù)預(yù)設(shè)的告警規(guī)則進(jìn)行處理。
2、目的
使用Python從Elasticsearch中查詢特定級別為ERROR的錯誤日志,并通過釘釘機(jī)器人實現(xiàn)告警聚合和發(fā)送,以提高錯誤日志的處理效率和及時響應(yīng)能力。
為什么開發(fā)這個腳本?
因為目前我們這邊沒有監(jiān)控日志的信息,出現(xiàn)問題不能及時發(fā)現(xiàn) 和預(yù)知
優(yōu)勢
1、消息進(jìn)行聚合,每個項目的多條告警信息,匯總一條發(fā)送。突破釘釘機(jī)器人每分鐘只能發(fā)送20條的限制
2、告警信息you太多的重復(fù),進(jìn)行去重處理,添加告警次數(shù)發(fā)送。防止被釘釘限流
3、原理
- 使用Python的Elasticsearch庫連接到Elasticsearch集群。
- 構(gòu)建Elasticsearch查詢DSL(領(lǐng)域?qū)S谜Z言),過濾出級別為ERROR的日志記錄。
- 執(zhí)行查詢并獲取結(jié)果。
- 對查詢結(jié)果進(jìn)行聚合,統(tǒng)計每個項目的錯誤次數(shù)。
- 根據(jù)聚合結(jié)果,生成告警消息的Markdown格式內(nèi)容。
- 使用釘釘機(jī)器人發(fā)送告警消息到指定的釘釘群。
4、流程
- 導(dǎo)入必要的Python庫,包括
elasticsearch
和requests
。 - 創(chuàng)建Elasticsearch連接,指定Elasticsearch集群的主機(jī)和端口。
- 構(gòu)建Elasticsearch查詢DSL,設(shè)置查詢條件為日志級別為ERROR。
- 執(zhí)行查詢,獲取查詢結(jié)果。
- 對查詢結(jié)果進(jìn)行處理,聚合每個項目的錯誤次數(shù)。
- 根據(jù)聚合結(jié)果生成告警消息的Markdown內(nèi)容。
- 使用釘釘機(jī)器人API發(fā)送告警消息到指定的釘釘群。
5、實現(xiàn)代碼
# -*- coding: utf-8 -*-
# @Time : 2023/6/17 18:11
# @Author : 南宮乘風(fēng)
# @Email : 1794748404@qq.com
# @File : all_es.py
# @Software: PyCharm
from collections import Counter
from datetime import datetime, timedelta
import requests
from elasticsearch import Elasticsearch
from monitor.es_ding import send_pretty_message
# Elasticsearch客戶端實例
es = Elasticsearch(hosts=['http://172.18.xxx.xxxx:9200'], http_auth=('elastic', 'xxxxx'),
sniff_on_start=True, # 連接前測試
sniff_on_connection_fail=True, # 節(jié)點無響應(yīng)時刷新節(jié)點
sniff_timeout=300, # 設(shè)置超時時間
headers={'Content-Type': 'application/json'})
def format_timestamp(timestamp):
"""格式化時間為Elasticsearch接受的字符串格式"""
return timestamp.strftime("%Y-%m-%d %H:%M:%S")
def search_errors():
"""執(zhí)行查詢,獲取錯誤日志數(shù)據(jù)"""
current_time = datetime.now()
one_minute_ago = current_time - timedelta(minutes=10)
current_time_str = format_timestamp(current_time)
one_minute_ago_str = format_timestamp(one_minute_ago)
index = 'app-prod-*' # 替換為實際的索引名稱
query = {
"query": {
"bool": {
"filter": [
{
"range": {
"@timestamp": {
"gte": one_minute_ago_str,
"lt": current_time_str,
"format": "yyyy-MM-dd HH:mm:ss",
"time_zone": "+08:00"
}
}
},
{
"match": {
"loglevel": "ERROR" #匹配項目錯誤等級
}
},
{
"bool": {
"must_not": [
{
"match": {
"projectname": "fox-data-spiderman" # 需要屏蔽的項目
}
}
]
}
}
]
}
},
"_source": [ ## 輸出的字段
"date",
"projectname",
"threadname",
"msg"
],
"from": 0,
"size": 10000, # 返回查詢的條數(shù)
}
result = es.search(index=index, body=query)
total_documents = result["hits"]["total"]["value"]
print(f"總共匹配到 {total_documents} 條文檔")
result = result['hits']['hits']
all_result = []
for i in result:
all_result.append(i['_source'])
msg_counter = Counter(d['msg'] for d in all_result if 'msg' in d)
results = []
for d in all_result:
if 'msg' in d and d['msg'] in msg_counter:
count = msg_counter[d['msg']]
del msg_counter[d['msg']]
d['count'] = count
d['msg'] = d['msg'][:100] + ('...' if len(d['msg']) > 100 else '')
results.append(d)
return results
def aggregate_errors(results):
"""按項目名稱聚合錯誤日志"""
aggregated_data = {}
for d in results:
projectname = d.get('projectname')
if projectname:
if projectname not in aggregated_data:
aggregated_data[projectname] = []
aggregated_data[projectname].append({'date': d.get('date'), 'msg': d.get('msg'), 'count': d.get('count')})
return aggregated_data
def generate_summary(projectname, messages):
"""生成Markdown格式的消息摘要"""
markdown_text = f'### {projectname} \n\n'
for message in messages:
markdown_text += f"**時間:** {message['date']}\n\n"
markdown_text += f"**告警次數(shù):** <font color='red'><b>{message['count']}</b></font>\n\n"
markdown_text += f"{message['msg']}\n\n---\n\n"
return markdown_text
def send_message_summary(projectname, messages):
"""發(fā)送摘要消息給釘釘機(jī)器人"""
summary = generate_summary(projectname, messages)
data = {
'msgtype': 'markdown',
'markdown': {
'title': f'{projectname}消息告警',
'text': summary
}
}
webhook_url = 'https://oapi.dingtalk.com/robot/send?access_token=xxxxxxxxxxxxxxxxx' # 替換為實際的Webhook URL
response = requests.post(webhook_url, json=data)
if response.status_code == 200:
print('消息發(fā)送成功')
else:
print('消息發(fā)送失敗')
if __name__ == '__main__':
errors = search_errors()
aggregated_errors = aggregate_errors(errors)
for projectname, messages in aggregated_errors.items():
print(f"{projectname}:")
print(messages)
6、Crontab添加定時任務(wù)
也可以用采用:Jenkins與GitLab的定時任務(wù)工作流程
https://blog.csdn.net/heian_99/article/details/131164591?spm=1001.2014.3001.5501
#日志
*/2 * * * * cd /python_app/elasticsearch; /opt/anaconda3/envs/py38/bin/python -u es_monitor.py >> es_error_info.log 2>&1
該定時任務(wù)的含義是每隔2分鐘執(zhí)行一次指定目錄下的 es_monitor.py 腳本,并將輸出信息追加到 es_error_info.log 文件中。這樣可以定期監(jiān)控 Elasticsearch 的錯誤日志,并記錄相關(guān)信息以便后續(xù)查看和分析。文章來源:http://www.zghlxwxcb.cn/news/detail-506846.html
7、總結(jié)
本博客,為我們構(gòu)建了一個完整的應(yīng)用日志監(jiān)控和告警系統(tǒng),通過ELK技術(shù)棧和釘釘機(jī)器人的結(jié)合,使得我們能夠及時發(fā)現(xiàn)和處理應(yīng)用中的錯誤,提高了團(tuán)隊的工作效率和系統(tǒng)的穩(wěn)定性。文章來源地址http://www.zghlxwxcb.cn/news/detail-506846.html
到了這里,關(guān)于提高錯誤日志處理效率!使用Python和釘釘機(jī)器人實現(xiàn)自動告警聚合的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!