完整項(xiàng)目地址:https://download.csdn.net/download/lijunhcn/88463174
基于Flume+spark+Flask的分布式實(shí)時(shí)日志分析與入侵檢測(cè)系統(tǒng)
簡(jiǎn)介
LogVision是一個(gè)整合了web日志聚合、分發(fā)、實(shí)時(shí)分析、入侵檢測(cè)、數(shù)據(jù)存儲(chǔ)與可視化的日志分析解決方案。聚合采用Apache Flume,分發(fā)采用Apache Kafka,實(shí)時(shí)處理采用Spark Streaming,入侵檢測(cè)采用Spark MLlib,數(shù)據(jù)存儲(chǔ)使用HDFS與Redis,可視化采用Flask、SocketIO、Echarts、Bootstrap。
本文下述的使用方法均面向單機(jī)偽分布式環(huán)境,你可以根據(jù)需求進(jìn)行配置上的調(diào)整以適應(yīng)分布式部署。
項(xiàng)目結(jié)構(gòu)
- flask:Flask Web后端
- spark:日志分析與入侵檢測(cè)的實(shí)現(xiàn)
- flume:Flume配置文件
- log_gen:模擬日志生成器
- datasets:測(cè)試日志數(shù)據(jù)集
- images:README的圖片
依賴(lài)與版本
- 編譯與Web端需要用到的:
- Java 8, Scala 2.11.12, Python 3.8 (包依賴(lài)見(jiàn)requirements), sbt 1.3.8
- 計(jì)算環(huán)境中需要用到的:
- Java 8, Apache Flume 1.9.0, Kafka 2.4, Spark 2.4.5, ZooKeeper 3.5.7, Hadoop 2.9.2, Redis 5.0.8
使用說(shuō)明
在開(kāi)始之前,你需要修改源碼或配置文件中的IP為你自己的地址。具體涉及到flume配置文件、Spark主程序、Flask Web后端。
編譯Spark應(yīng)用
在安裝好Java8與Scala11的前提下,在spark
目錄下,初始化sbt
:
sbt
退出sbt shell
并使用sbt-assembly
對(duì)Spark項(xiàng)目進(jìn)行編譯打包:
sbt assembly
然后將生成的jar
包重命名為logvision.jar
。
環(huán)境準(zhǔn)備
你需要一個(gè)偽分布式環(huán)境(測(cè)試環(huán)境為CentOS 7),并完成了所有對(duì)應(yīng)版本組件依賴(lài)的配置與運(yùn)行。
使用flume
目錄下的standalone.conf
啟動(dòng)一個(gè)Flume Agent。
將datasets
文件夾中的learning-datasets
提交如下路徑:
/home/logv/learning-datasets
將datasets
文件夾中的access_log
提交如下路徑:
/home/logv/access_log
入侵檢測(cè)模型訓(xùn)練與測(cè)試
提交jar
包至Spark集群并執(zhí)行入侵檢測(cè)模型的生成與測(cè)試:
spark-submit --class learning logvision.jar
你將可以看到如下結(jié)果:
兩個(gè)表格分別代表正常與異常數(shù)據(jù)集的入侵檢測(cè)結(jié)果,下面四個(gè)表格可用于判斷識(shí)別準(zhǔn)確率。如圖中所示250條正常測(cè)試數(shù)據(jù)被檢測(cè)為250條正常,識(shí)別率100%;250條異常測(cè)試數(shù)據(jù)被檢測(cè)為240條異常,10條正常,準(zhǔn)確率96%。
啟動(dòng)可視化后端
在flask
目錄下執(zhí)行如下命令,下載依賴(lài)包:
pip3 install -r requirements.txt
啟動(dòng)Flask Web:
python3 app.py
啟動(dòng)實(shí)時(shí)日志生成器
log_gen
中的實(shí)時(shí)日志生成器可根據(jù)傳入?yún)?shù)(每次寫(xiě)入行數(shù)、寫(xiě)入間隔時(shí)間)將樣本日志中的特定行塊追加至目標(biāo)日志中,以模擬實(shí)時(shí)日志的生成過(guò)程,供后續(xù)實(shí)時(shí)處理。
java log_gen [日志源] [目標(biāo)文件] [每次追加的行數(shù)] [時(shí)間間隔(秒)]
提交至環(huán)境,編譯并運(yùn)行,每2秒將/home/logv/access_log
文件中的5行追加至/home/logSrc
中:
javac log_gen.java
java log_gen /home/logv/access_log /home/logSrc 5 2
啟動(dòng)分析任務(wù)
提交jar
包至Spark集群并執(zhí)行實(shí)時(shí)分析任務(wù):
spark-submit --class streaming logvision.jar
查看可視化結(jié)果
至此你已經(jīng)完成了后端組件的配置,通過(guò)瀏覽器訪問(wèn)Web端主機(jī)的5000
端口可以查看到實(shí)時(shí)日志分析的可視化結(jié)果:
歡迎界面:文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-795624.html
部分源碼:文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-795624.html
# coding=utf-8
import ast
import time
from kafka import KafkaConsumer
import redis
import requests
from threading import Lock, Thread
from flask import Flask, render_template, session, request
from flask_socketio import SocketIO, emit
async_mode = None
app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret!'
socketio = SocketIO(app, async_mode=async_mode)
thread = None
thread_lock = Lock()
# 配置項(xiàng)目
time_interval = 1
kafka_bootstrap_servers = "10.0.0.222:9092"
redis_con_pool = redis.ConnectionPool(host='10.0.0.222', port=6379, decode_responses=True)
# 頁(yè)面路由與對(duì)應(yīng)頁(yè)面的ws接口
# 系統(tǒng)時(shí)間
@socketio.on('connect', namespace='/sys_time')
def sys_time():
def loop():
while True:
socketio.sleep(time_interval)
current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
socketio.emit('sys_time',
{'data': current_time},
namespace='/sys_time')
socketio.start_background_task(target=loop)
# 歡迎頁(yè)面
@app.route('/')
@app.route('/welcome')
def welcome():
return render_template('index.html', async_mode=socketio.async_mode)
# 實(shí)時(shí)日志流
@socketio.on('connect', namespace='/log_stream')
def log_stream():
def loop():
socketio.sleep(time_interval)
consumer = KafkaConsumer("raw_log", bootstrap_servers=kafka_bootstrap_servers)
cache = ""
for msg in consumer:
cache += bytes.decode(msg.value) + "\n"
if len(cache.split("\n")) == 25:
socketio.emit('log_stream',
{'data': cache},
namespace='/log_stream')
cache = ""
socketio.start_background_task(target=loop)
# 實(shí)時(shí)日志分析頁(yè)面
@app.route('/analysis')
def analysis():
return render_template('analysis.html', async_mode=socketio.async_mode)
# 實(shí)時(shí)計(jì)數(shù)器
@socketio.on('connect', namespace='/count_board')
def count_board():
def loop():
while True:
socketio.sleep(time_interval)
redis_con = redis.Redis(connection_pool=redis_con_pool)
res = redis_con.zrange("statcode", 0, 40, withscores=True)
# 總請(qǐng)求數(shù)(日志行數(shù))
host_count = redis_con.zscore("line", "count")
# 成功請(qǐng)求數(shù)(狀態(tài)碼屬于normal的個(gè)數(shù))
normal = ["200", "201", "202", "203", "204", "205", "206", "207"]
success_count = 0
for i in res:
if i[0] in normal:
success_count += int(i[1])
# 其他請(qǐng)求數(shù)(其他狀態(tài)碼個(gè)數(shù))
other_count = 0
for i in res:
other_count += int(i[1])
other_count -= success_count
# 訪客數(shù)(不同的IP個(gè)數(shù))
visitor_count = redis_con.zcard("host")
# 資源數(shù)(不同的url個(gè)數(shù))
url_count = redis_con.zcard("url")
# 流量大?。╞ytes的和,MB)
traffic_sum = int(redis_con.zscore("traffic", "sum"))
# 日志大?。∕B)
log_size = int(redis_con.zscore("size", "sum"))
socketio.emit('count_board',
{'host_count': host_count,
'success_count': success_count,
'other_count': other_count,
'visitor_count': visitor_count,
'url_count': url_count,
'traffic_sum': traffic_sum,
'log_size': log_size},
namespace='/count_board')
socketio.start_background_task(target=loop)
# 實(shí)時(shí)熱門(mén)位置
@socketio.on('connect', namespace='/hot_geo')
def hot_geo():
def loop():
while True:
socketio.sleep(2)
redis_con = redis.Redis(connection_pool=redis_con_pool)
res = redis_con.zrevrange("host", 0, 50, withscores=True)
data = []
for i in res:
# 調(diào)用接口獲取地理坐標(biāo)
req = requests.get("http://api.map.baidu.com/location/ip",
{'ak': '0jKbOcwqK7dGZiYIhSai5rsxTnQZ4UQt',
'ip': i[0],
'coor': 'bd09ll'})
body = eval(req.text)
# 僅顯示境內(nèi)定位
if body['status'] == 0:
coor_x = body['content']['point']['x']
coor_y = body['content']['point']['y']
data.append({"name": i[0], "value": [coor_x, coor_y, i[1]]})
socketio.emit('hot_geo',
{'data': data},
namespace='/hot_geo')
socketio.start_background_task(target=loop)
# 實(shí)時(shí)熱門(mén)資源排名
@socketio.on('connect', namespace='/hot_url')
def hot_url():
def loop():
while True:
socketio.sleep(time_interval)
redis_con = redis.Redis(connection_pool=redis_con_pool)
res = redis_con.zrevrange("url", 0, 9, withscores=True)
data = []
no = 1
for i in res:
data.append({"no": no, "url": i[0], "count": i[1]})
no += 1
socketio.emit('hot_url',
{'data': data},
namespace='/hot_url')
socketio.start_background_task(target=loop)
# 實(shí)時(shí)熱門(mén)IP排名
@socketio.on('connect', namespace='/hot_ip')
def hot_ip():
def loop():
while True:
socketio.sleep(time_interval)
redis_con = redis.Redis(connection_pool=redis_con_pool)
res = redis_con.zrevrange("host", 0, 13, withscores=True)
data = []
no = 1
for i in res:
# 調(diào)用接口獲取地理坐標(biāo)
req = requests.get("http://api.map.baidu.com/location/ip",
{'ak': '0jKbOcwqK7dGZiYIhSai5rsxTnQZ4UQt',
'ip': i[0],
'coor': 'bd09ll'})
body = eval(req.text)
# 僅顯示境內(nèi)定位
if body['status'] == 0:
address = body['content']['address']
data.append({"no": no, "ip": i[0], "address": address, "count": i[1]})
no += 1
socketio.emit('hot_ip',
{'data': data},
namespace='/hot_ip')
socketio.start_background_task(target=loop)
# 實(shí)時(shí)狀態(tài)碼比例
@socketio.on('connect', namespace='/status_code_pie')
def status_code_pie():
def loop():
while True:
socketio.sleep(time_interval)
redis_con = redis.Redis(connection_pool=redis_con_pool)
res = redis_con.zrevrange("statcode", 0, 100, withscores=True)
data = []
legend = []
for i in res:
if i[0] != 'foo':
data.append({"value": i[1], "name": i[0]})
legend.append(i[0])
socketio.emit('status_code_pie',
{'legend': legend, 'data': data},
namespace='/status_code_pie')
socketio.start_background_task(target=loop)
# 實(shí)時(shí)請(qǐng)求方式比例
@socketio.on('connect', namespace='/req_method_pie')
def req_method_pie():
def loop():
while True:
socketio.sleep(time_interval)
redis_con = redis.Redis(connection_pool=redis_con_pool)
res = redis_con.zrevrange("reqmt", 0, 100, withscores=True)
data = []
legend = []
for i in res:
if i[0] != 'foo':
data.append({"value": i[1], "name": i[0]})
legend.append(i[0])
socketio.emit('req_method_pie',
{'legend': legend, 'data': data},
namespace='/req_method_pie')
socketio.start_background_task(target=loop)
# 實(shí)時(shí)請(qǐng)求計(jì)數(shù)(按時(shí)間順序)
@socketio.on('connect', namespace='/req_count_timeline')
def req_count_timeline():
def loop():
while True:
socketio.sleep(time_interval)
redis_con = redis.Redis(connection_pool=redis_con_pool)
res = dict(redis_con.zrange("datetime", 0, 10000000, withscores=True))
data = []
date = []
# 按時(shí)間排序
for i in sorted(res):
datetime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(i) / 1000))
data.append(res[i])
date.append(datetime)
socketio.emit('req_count_timeline',
{"data": data, "date": date},
namespace='/req_count_timeline')
socketio.start_background_task(target=loop)
# IP請(qǐng)求數(shù)排序
@socketio.on('connect', namespace='/ip_ranking')
def timestamp_count_timeline():
def loop():
while True:
socketio.sleep(time_interval)
redis_con = redis.Redis(connection_pool=redis_con_pool)
res = redis_con.zrevrange("host", 0, 50, withscores=True)
ip = []
count = []
for i in res:
ip.append(i[0])
count.append(i[1])
socketio.emit('ip_ranking',
{"ip": ip, "count": count},
namespace='/ip_ranking')
socketio.start_background_task(target=loop)
@app.route('/id')
def id():
return render_template("id.html", async_mode=socketio.async_mode)
# 異常請(qǐng)求計(jì)數(shù)
@socketio.on('connect', namespace='/bad_count')
def bad_count():
def loop():
while True:
socketio.sleep(time_interval)
redis_con = redis.Redis(connection_pool=redis_con_pool)
res = int(redis_con.zscore("bad", "bad"))
socketio.emit('bad_count',
{"data": res},
namespace='/bad_count')
socketio.start_background_task(target=loop)
# 正常請(qǐng)求計(jì)數(shù)
@socketio.on('connect', namespace='/good_count')
def bad_count():
def loop():
while True:
socketio.sleep(time_interval)
redis_con = redis.Redis(connection_pool=redis_con_pool)
res = int(redis_con.zscore("good", "good"))
socketio.emit('good_count',
{"data": res},
namespace='/good_count')
socketio.start_background_task(target=loop)
# 正常請(qǐng)求地理標(biāo)記
@socketio.on('connect', namespace='/good_geo')
def good_geo():
def loop():
while True:
socketio.sleep(time_interval)
consumer = KafkaConsumer("good_result", bootstrap_servers=kafka_bootstrap_servers)
data = []
for msg in consumer:
result = ast.literal_eval(bytes.decode(msg.value))
for record in result:
if record['host'] != "foo":
# 調(diào)用接口獲取地理坐標(biāo)
req = requests.get("http://api.map.baidu.com/location/ip",
{'ak': '0jKbOcwqK7dGZiYIhSai5rsxTnQZ4UQt',
'ip': record['host'],
'coor': 'bd09ll'})
body = eval(req.text)
# 僅顯示境內(nèi)定位
if body['status'] == 0:
coor_x = body['content']['point']['x']
coor_y = body['content']['point']['y']
datetime = time.strftime("%Y-%m-%d %H:%M:%S",
time.localtime(int(record['timestamp']) / 1000))
data.append({"name": record['host'], "value": [coor_x, coor_y,
record['url'],
datetime,
record['req_method'],
record['protocol'],
record['status_code']]})
socketio.emit('good_geo',
{"data": data},
namespace='/good_geo')
socketio.start_background_task(target=loop)
# 異常請(qǐng)求地理標(biāo)記
@socketio.on('connect', namespace='/bad_geo')
def bad_geo():
def loop():
while True:
socketio.sleep(time_interval)
consumer = KafkaConsumer("bad_result", bootstrap_servers=kafka_bootstrap_servers)
data = []
for msg in consumer:
result = ast.literal_eval(bytes.decode(msg.value))
for record in result:
if record['host'] != "foo":
# 調(diào)用接口獲取地理坐標(biāo)
req = requests.get("http://api.map.baidu.com/location/ip",
{'ak': '0jKbOcwqK7dGZiYIhSai5rsxTnQZ4UQt',
'ip': record['host'],
'coor': 'bd09ll'})
body = eval(req.text)
# 僅顯示境內(nèi)定位
if body['status'] == 0:
coor_x = body['content']['point']['x']
coor_y = body['content']['point']['y']
datetime = time.strftime("%Y-%m-%d %H:%M:%S",
time.localtime(int(record['timestamp']) / 1000))
data.append({"name": record['host'], "value": [coor_x, coor_y,
record['url'],
datetime,
record['req_method'],
record['protocol'],
record['status_code']]})
socketio.emit('bad_geo',
{"data": data},
namespace='/bad_geo')
socketio.start_background_task(target=loop)
# 實(shí)時(shí)入侵分類(lèi)計(jì)數(shù)(按時(shí)間順序)
@socketio.on('connect', namespace='/url_cate_count_timeline')
def url_cate_count_timeline():
def loop():
while True:
socketio.sleep(time_interval)
redis_con = redis.Redis(connection_pool=redis_con_pool)
good_res = dict(redis_con.zrange("goodts", 0, 10000000, withscores=True))
bad_res = dict(redis_con.zrange("badts", 0, 10000000, withscores=True))
# 求正常和異常結(jié)果的時(shí)間戳的并集,并排序。再生成對(duì)應(yīng)的正常和異常計(jì)數(shù)
date = []
date_ts = []
good_date = []
bad_date = []
good_data = []
bad_data = []
# 求并集并排序
for i in good_res:
good_date.append(i)
for j in bad_res:
bad_date.append(j)
for k in sorted(list(set(good_date) | set(bad_date))):
date_ts.append(k)
# 生成對(duì)應(yīng)的計(jì)數(shù)
for t in date_ts:
if t in good_res:
good_data.append(good_res[t])
else:
good_data.append(0)
if t in bad_res:
bad_data.append(bad_res[t])
else:
bad_data.append(0)
# 時(shí)間戳轉(zhuǎn)字符串
for ts in date_ts:
date.append(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(ts) / 1000)))
socketio.emit('url_cate_count_timeline',
{"date": date, "good_data": good_data, "bad_data": bad_data},
namespace='/url_cate_count_timeline')
socketio.start_background_task(target=loop)
# 實(shí)時(shí)異常請(qǐng)求概覽
@socketio.on('connect', namespace='/bad_detail')
def bad_detail():
def loop():
while True:
socketio.sleep(time_interval)
consumer = KafkaConsumer("bad_result", bootstrap_servers=kafka_bootstrap_servers)
data = []
for msg in consumer:
result = ast.literal_eval(bytes.decode(msg.value))
for record in result:
if record['host'] != "foo":
# 調(diào)用接口獲取地理坐標(biāo)
req = requests.get("http://api.map.baidu.com/location/ip",
{'ak': '0jKbOcwqK7dGZiYIhSai5rsxTnQZ4UQt',
'ip': record['host'],
'coor': 'bd09ll'})
body = eval(req.text)
# 僅顯示境內(nèi)定位
if body['status'] == 0:
address = body['content']['address']
datetime = time.strftime("%Y-%m-%d %H:%M:%S",
time.localtime(int(record['timestamp']) / 1000))
data.append({"host": record['host'], "address": address, "url": record['url'],
"datetime": datetime, "req_method": record['req_method'],
"protocol": record['protocol'], "status_code": record['status_code'],
"pred": record['prediction'], 'prob': record['probability']['values']})
socketio.emit('bad_detail',
{"data": data},
namespace='/bad_detail')
socketio.start_background_task(target=loop)
# 實(shí)時(shí)正常請(qǐng)求概覽
@socketio.on('connect', namespace='/good_detail')
def good_detail():
def loop():
while True:
socketio.sleep(time_interval)
consumer = KafkaConsumer("good_result", bootstrap_servers=kafka_bootstrap_servers)
data = []
for msg in consumer:
result = ast.literal_eval(bytes.decode(msg.value))
for record in result:
if record['host'] != "foo":
# 調(diào)用接口獲取地理坐標(biāo)
req = requests.get("http://api.map.baidu.com/location/ip",
{'ak': '0jKbOcwqK7dGZiYIhSai5rsxTnQZ4UQt',
'ip': record['host'],
'coor': 'bd09ll'})
body = eval(req.text)
# 僅顯示境內(nèi)定位
if body['status'] == 0:
address = body['content']['address']
datetime = time.strftime("%Y-%m-%d %H:%M:%S",
time.localtime(int(record['timestamp']) / 1000))
data.append({"host": record['host'], "address": address, "url": record['url'],
"datetime": datetime, "req_method": record['req_method'],
"protocol": record['protocol'], "status_code": record['status_code'],
"pred": record['prediction'], 'prob': record['probability']['values']})
socketio.emit('good_detail',
{"data": data},
namespace='/good_detail')
socketio.start_background_task(target=loop)
@app.route('/about')
def about():
return render_template("about.html", async_mode=socketio.async_mode)
if __name__ == '__main__':
socketio.run(app, host="0.0.0.0", port=5000, debug=True)
到了這里,關(guān)于基于Flume+spark+Flask的分布式實(shí)時(shí)日志分析與入侵檢測(cè)系統(tǒng)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!