国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

基于Flume+spark+Flask的分布式實(shí)時(shí)日志分析與入侵檢測(cè)系統(tǒng)

這篇具有很好參考價(jià)值的文章主要介紹了基于Flume+spark+Flask的分布式實(shí)時(shí)日志分析與入侵檢測(cè)系統(tǒng)。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

完整項(xiàng)目地址:https://download.csdn.net/download/lijunhcn/88463174

基于Flume+spark+Flask的分布式實(shí)時(shí)日志分析與入侵檢測(cè)系統(tǒng)

簡(jiǎn)介

LogVision是一個(gè)整合了web日志聚合、分發(fā)、實(shí)時(shí)分析、入侵檢測(cè)、數(shù)據(jù)存儲(chǔ)與可視化的日志分析解決方案。聚合采用Apache Flume,分發(fā)采用Apache Kafka,實(shí)時(shí)處理采用Spark Streaming,入侵檢測(cè)采用Spark MLlib,數(shù)據(jù)存儲(chǔ)使用HDFS與Redis,可視化采用Flask、SocketIO、Echarts、Bootstrap。

本文下述的使用方法均面向單機(jī)偽分布式環(huán)境,你可以根據(jù)需求進(jìn)行配置上的調(diào)整以適應(yīng)分布式部署。

項(xiàng)目結(jié)構(gòu)

  • flask:Flask Web后端
  • spark:日志分析與入侵檢測(cè)的實(shí)現(xiàn)
  • flume:Flume配置文件
  • log_gen:模擬日志生成器
  • datasets:測(cè)試日志數(shù)據(jù)集
  • images:README的圖片

依賴(lài)與版本

  • 編譯與Web端需要用到的:
    • Java 8, Scala 2.11.12, Python 3.8 (包依賴(lài)見(jiàn)requirements), sbt 1.3.8
  • 計(jì)算環(huán)境中需要用到的:
    • Java 8, Apache Flume 1.9.0, Kafka 2.4, Spark 2.4.5, ZooKeeper 3.5.7, Hadoop 2.9.2, Redis 5.0.8

使用說(shuō)明

在開(kāi)始之前,你需要修改源碼或配置文件中的IP為你自己的地址。具體涉及到flume配置文件、Spark主程序、Flask Web后端。

編譯Spark應(yīng)用

在安裝好Java8與Scala11的前提下,在spark目錄下,初始化sbt

sbt

退出sbt shell并使用sbt-assembly對(duì)Spark項(xiàng)目進(jìn)行編譯打包:

sbt assembly

然后將生成的jar包重命名為logvision.jar

環(huán)境準(zhǔn)備

你需要一個(gè)偽分布式環(huán)境(測(cè)試環(huán)境為CentOS 7),并完成了所有對(duì)應(yīng)版本組件依賴(lài)的配置與運(yùn)行。
使用flume目錄下的standalone.conf啟動(dòng)一個(gè)Flume Agent。
datasets文件夾中的learning-datasets提交如下路徑:

/home/logv/learning-datasets

datasets文件夾中的access_log提交如下路徑:

/home/logv/access_log
入侵檢測(cè)模型訓(xùn)練與測(cè)試

提交jar包至Spark集群并執(zhí)行入侵檢測(cè)模型的生成與測(cè)試:

spark-submit --class learning logvision.jar

你將可以看到如下結(jié)果:

兩個(gè)表格分別代表正常與異常數(shù)據(jù)集的入侵檢測(cè)結(jié)果,下面四個(gè)表格可用于判斷識(shí)別準(zhǔn)確率。如圖中所示250條正常測(cè)試數(shù)據(jù)被檢測(cè)為250條正常,識(shí)別率100%;250條異常測(cè)試數(shù)據(jù)被檢測(cè)為240條異常,10條正常,準(zhǔn)確率96%。

啟動(dòng)可視化后端

flask目錄下執(zhí)行如下命令,下載依賴(lài)包:

pip3 install -r requirements.txt

啟動(dòng)Flask Web:

python3 app.py
啟動(dòng)實(shí)時(shí)日志生成器

log_gen中的實(shí)時(shí)日志生成器可根據(jù)傳入?yún)?shù)(每次寫(xiě)入行數(shù)、寫(xiě)入間隔時(shí)間)將樣本日志中的特定行塊追加至目標(biāo)日志中,以模擬實(shí)時(shí)日志的生成過(guò)程,供后續(xù)實(shí)時(shí)處理。

java log_gen [日志源] [目標(biāo)文件] [每次追加的行數(shù)] [時(shí)間間隔(秒)]

提交至環(huán)境,編譯并運(yùn)行,每2秒將/home/logv/access_log文件中的5行追加至/home/logSrc中:

javac log_gen.java
java log_gen /home/logv/access_log /home/logSrc 5 2
啟動(dòng)分析任務(wù)

提交jar包至Spark集群并執(zhí)行實(shí)時(shí)分析任務(wù):

spark-submit --class streaming logvision.jar
查看可視化結(jié)果

至此你已經(jīng)完成了后端組件的配置,通過(guò)瀏覽器訪問(wèn)Web端主機(jī)的5000端口可以查看到實(shí)時(shí)日志分析的可視化結(jié)果:
歡迎界面:

部分源碼:文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-795624.html

# coding=utf-8
import ast
import time
from kafka import KafkaConsumer
import redis
import requests

from threading import Lock, Thread
from flask import Flask, render_template, session, request
from flask_socketio import SocketIO, emit

async_mode = None
app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret!'
socketio = SocketIO(app, async_mode=async_mode)

thread = None
thread_lock = Lock()

# 配置項(xiàng)目
time_interval = 1
kafka_bootstrap_servers = "10.0.0.222:9092"
redis_con_pool = redis.ConnectionPool(host='10.0.0.222', port=6379, decode_responses=True)


# 頁(yè)面路由與對(duì)應(yīng)頁(yè)面的ws接口
# 系統(tǒng)時(shí)間
@socketio.on('connect', namespace='/sys_time')
def sys_time():
    def loop():
        while True:
            socketio.sleep(time_interval)
            current_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
            socketio.emit('sys_time',
                          {'data': current_time},
                          namespace='/sys_time')

    socketio.start_background_task(target=loop)


# 歡迎頁(yè)面
@app.route('/')
@app.route('/welcome')
def welcome():
    return render_template('index.html', async_mode=socketio.async_mode)


# 實(shí)時(shí)日志流
@socketio.on('connect', namespace='/log_stream')
def log_stream():
    def loop():
        socketio.sleep(time_interval)
        consumer = KafkaConsumer("raw_log", bootstrap_servers=kafka_bootstrap_servers)
        cache = ""
        for msg in consumer:
            cache += bytes.decode(msg.value) + "\n"
            if len(cache.split("\n")) == 25:
                socketio.emit('log_stream',
                              {'data': cache},
                              namespace='/log_stream')
                cache = ""

    socketio.start_background_task(target=loop)


# 實(shí)時(shí)日志分析頁(yè)面
@app.route('/analysis')
def analysis():
    return render_template('analysis.html', async_mode=socketio.async_mode)


# 實(shí)時(shí)計(jì)數(shù)器
@socketio.on('connect', namespace='/count_board')
def count_board():
    def loop():
        while True:
            socketio.sleep(time_interval)
            redis_con = redis.Redis(connection_pool=redis_con_pool)
            res = redis_con.zrange("statcode", 0, 40, withscores=True)

            # 總請(qǐng)求數(shù)(日志行數(shù))
            host_count = redis_con.zscore("line", "count")

            # 成功請(qǐng)求數(shù)(狀態(tài)碼屬于normal的個(gè)數(shù))
            normal = ["200", "201", "202", "203", "204", "205", "206", "207"]
            success_count = 0
            for i in res:
                if i[0] in normal:
                    success_count += int(i[1])

            # 其他請(qǐng)求數(shù)(其他狀態(tài)碼個(gè)數(shù))
            other_count = 0
            for i in res:
                other_count += int(i[1])
            other_count -= success_count

            # 訪客數(shù)(不同的IP個(gè)數(shù))
            visitor_count = redis_con.zcard("host")

            # 資源數(shù)(不同的url個(gè)數(shù))
            url_count = redis_con.zcard("url")

            # 流量大?。╞ytes的和,MB)
            traffic_sum = int(redis_con.zscore("traffic", "sum"))

            # 日志大?。∕B)
            log_size = int(redis_con.zscore("size", "sum"))

            socketio.emit('count_board',
                          {'host_count': host_count,
                           'success_count': success_count,
                           'other_count': other_count,
                           'visitor_count': visitor_count,
                           'url_count': url_count,
                           'traffic_sum': traffic_sum,
                           'log_size': log_size},
                          namespace='/count_board')

    socketio.start_background_task(target=loop)


# 實(shí)時(shí)熱門(mén)位置
@socketio.on('connect', namespace='/hot_geo')
def hot_geo():
    def loop():
        while True:
            socketio.sleep(2)
            redis_con = redis.Redis(connection_pool=redis_con_pool)
            res = redis_con.zrevrange("host", 0, 50, withscores=True)
            data = []

            for i in res:
                # 調(diào)用接口獲取地理坐標(biāo)
                req = requests.get("http://api.map.baidu.com/location/ip",
                                   {'ak': '0jKbOcwqK7dGZiYIhSai5rsxTnQZ4UQt',
                                    'ip': i[0],
                                    'coor': 'bd09ll'})
                body = eval(req.text)

                # 僅顯示境內(nèi)定位
                if body['status'] == 0:
                    coor_x = body['content']['point']['x']
                    coor_y = body['content']['point']['y']

                    data.append({"name": i[0], "value": [coor_x, coor_y, i[1]]})

            socketio.emit('hot_geo',
                          {'data': data},
                          namespace='/hot_geo')

    socketio.start_background_task(target=loop)


# 實(shí)時(shí)熱門(mén)資源排名
@socketio.on('connect', namespace='/hot_url')
def hot_url():
    def loop():
        while True:
            socketio.sleep(time_interval)
            redis_con = redis.Redis(connection_pool=redis_con_pool)
            res = redis_con.zrevrange("url", 0, 9, withscores=True)
            data = []
            no = 1

            for i in res:
                data.append({"no": no, "url": i[0], "count": i[1]})
                no += 1

            socketio.emit('hot_url',
                          {'data': data},
                          namespace='/hot_url')

    socketio.start_background_task(target=loop)


# 實(shí)時(shí)熱門(mén)IP排名
@socketio.on('connect', namespace='/hot_ip')
def hot_ip():
    def loop():
        while True:
            socketio.sleep(time_interval)
            redis_con = redis.Redis(connection_pool=redis_con_pool)
            res = redis_con.zrevrange("host", 0, 13, withscores=True)
            data = []
            no = 1

            for i in res:
                # 調(diào)用接口獲取地理坐標(biāo)
                req = requests.get("http://api.map.baidu.com/location/ip",
                                   {'ak': '0jKbOcwqK7dGZiYIhSai5rsxTnQZ4UQt',
                                    'ip': i[0],
                                    'coor': 'bd09ll'})
                body = eval(req.text)

                # 僅顯示境內(nèi)定位
                if body['status'] == 0:
                    address = body['content']['address']

                    data.append({"no": no, "ip": i[0], "address": address, "count": i[1]})
                    no += 1

            socketio.emit('hot_ip',
                          {'data': data},
                          namespace='/hot_ip')

    socketio.start_background_task(target=loop)


# 實(shí)時(shí)狀態(tài)碼比例
@socketio.on('connect', namespace='/status_code_pie')
def status_code_pie():
    def loop():
        while True:
            socketio.sleep(time_interval)
            redis_con = redis.Redis(connection_pool=redis_con_pool)
            res = redis_con.zrevrange("statcode", 0, 100, withscores=True)
            data = []
            legend = []

            for i in res:
                if i[0] != 'foo':
                    data.append({"value": i[1], "name": i[0]})
                    legend.append(i[0])

            socketio.emit('status_code_pie',
                          {'legend': legend, 'data': data},
                          namespace='/status_code_pie')

    socketio.start_background_task(target=loop)


# 實(shí)時(shí)請(qǐng)求方式比例
@socketio.on('connect', namespace='/req_method_pie')
def req_method_pie():
    def loop():
        while True:
            socketio.sleep(time_interval)
            redis_con = redis.Redis(connection_pool=redis_con_pool)
            res = redis_con.zrevrange("reqmt", 0, 100, withscores=True)
            data = []
            legend = []

            for i in res:
                if i[0] != 'foo':
                    data.append({"value": i[1], "name": i[0]})
                    legend.append(i[0])

            socketio.emit('req_method_pie',
                          {'legend': legend, 'data': data},
                          namespace='/req_method_pie')

    socketio.start_background_task(target=loop)


# 實(shí)時(shí)請(qǐng)求計(jì)數(shù)(按時(shí)間順序)
@socketio.on('connect', namespace='/req_count_timeline')
def req_count_timeline():
    def loop():
        while True:
            socketio.sleep(time_interval)
            redis_con = redis.Redis(connection_pool=redis_con_pool)
            res = dict(redis_con.zrange("datetime", 0, 10000000, withscores=True))
            data = []
            date = []

            # 按時(shí)間排序
            for i in sorted(res):
                datetime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(i) / 1000))
                data.append(res[i])
                date.append(datetime)

            socketio.emit('req_count_timeline',
                          {"data": data, "date": date},
                          namespace='/req_count_timeline')

    socketio.start_background_task(target=loop)


# IP請(qǐng)求數(shù)排序
@socketio.on('connect', namespace='/ip_ranking')
def timestamp_count_timeline():
    def loop():
        while True:
            socketio.sleep(time_interval)
            redis_con = redis.Redis(connection_pool=redis_con_pool)
            res = redis_con.zrevrange("host", 0, 50, withscores=True)
            ip = []
            count = []

            for i in res:
                ip.append(i[0])
                count.append(i[1])

            socketio.emit('ip_ranking',
                          {"ip": ip, "count": count},
                          namespace='/ip_ranking')

    socketio.start_background_task(target=loop)


@app.route('/id')
def id():
    return render_template("id.html", async_mode=socketio.async_mode)


# 異常請(qǐng)求計(jì)數(shù)
@socketio.on('connect', namespace='/bad_count')
def bad_count():
    def loop():
        while True:
            socketio.sleep(time_interval)
            redis_con = redis.Redis(connection_pool=redis_con_pool)
            res = int(redis_con.zscore("bad", "bad"))

            socketio.emit('bad_count',
                          {"data": res},
                          namespace='/bad_count')

    socketio.start_background_task(target=loop)


# 正常請(qǐng)求計(jì)數(shù)
@socketio.on('connect', namespace='/good_count')
def bad_count():
    def loop():
        while True:
            socketio.sleep(time_interval)
            redis_con = redis.Redis(connection_pool=redis_con_pool)
            res = int(redis_con.zscore("good", "good"))

            socketio.emit('good_count',
                          {"data": res},
                          namespace='/good_count')

    socketio.start_background_task(target=loop)


# 正常請(qǐng)求地理標(biāo)記
@socketio.on('connect', namespace='/good_geo')
def good_geo():
    def loop():
        while True:
            socketio.sleep(time_interval)
            consumer = KafkaConsumer("good_result", bootstrap_servers=kafka_bootstrap_servers)
            data = []

            for msg in consumer:
                result = ast.literal_eval(bytes.decode(msg.value))
                for record in result:
                    if record['host'] != "foo":
                        # 調(diào)用接口獲取地理坐標(biāo)
                        req = requests.get("http://api.map.baidu.com/location/ip",
                                           {'ak': '0jKbOcwqK7dGZiYIhSai5rsxTnQZ4UQt',
                                            'ip': record['host'],
                                            'coor': 'bd09ll'})
                        body = eval(req.text)
                        # 僅顯示境內(nèi)定位
                        if body['status'] == 0:
                            coor_x = body['content']['point']['x']
                            coor_y = body['content']['point']['y']
                            datetime = time.strftime("%Y-%m-%d %H:%M:%S",
                                                     time.localtime(int(record['timestamp']) / 1000))

                            data.append({"name": record['host'], "value": [coor_x, coor_y,
                                                                           record['url'],
                                                                           datetime,
                                                                           record['req_method'],
                                                                           record['protocol'],
                                                                           record['status_code']]})
                            socketio.emit('good_geo',
                                          {"data": data},
                                          namespace='/good_geo')

    socketio.start_background_task(target=loop)


# 異常請(qǐng)求地理標(biāo)記
@socketio.on('connect', namespace='/bad_geo')
def bad_geo():
    def loop():
        while True:
            socketio.sleep(time_interval)
            consumer = KafkaConsumer("bad_result", bootstrap_servers=kafka_bootstrap_servers)
            data = []

            for msg in consumer:
                result = ast.literal_eval(bytes.decode(msg.value))
                for record in result:
                    if record['host'] != "foo":
                        # 調(diào)用接口獲取地理坐標(biāo)
                        req = requests.get("http://api.map.baidu.com/location/ip",
                                           {'ak': '0jKbOcwqK7dGZiYIhSai5rsxTnQZ4UQt',
                                            'ip': record['host'],
                                            'coor': 'bd09ll'})
                        body = eval(req.text)
                        # 僅顯示境內(nèi)定位
                        if body['status'] == 0:
                            coor_x = body['content']['point']['x']
                            coor_y = body['content']['point']['y']
                            datetime = time.strftime("%Y-%m-%d %H:%M:%S",
                                                     time.localtime(int(record['timestamp']) / 1000))

                            data.append({"name": record['host'], "value": [coor_x, coor_y,
                                                                           record['url'],
                                                                           datetime,
                                                                           record['req_method'],
                                                                           record['protocol'],
                                                                           record['status_code']]})
                            socketio.emit('bad_geo',
                                          {"data": data},
                                          namespace='/bad_geo')

    socketio.start_background_task(target=loop)


# 實(shí)時(shí)入侵分類(lèi)計(jì)數(shù)(按時(shí)間順序)
@socketio.on('connect', namespace='/url_cate_count_timeline')
def url_cate_count_timeline():
    def loop():
        while True:
            socketio.sleep(time_interval)
            redis_con = redis.Redis(connection_pool=redis_con_pool)
            good_res = dict(redis_con.zrange("goodts", 0, 10000000, withscores=True))
            bad_res = dict(redis_con.zrange("badts", 0, 10000000, withscores=True))

            # 求正常和異常結(jié)果的時(shí)間戳的并集,并排序。再生成對(duì)應(yīng)的正常和異常計(jì)數(shù)
            date = []
            date_ts = []
            good_date = []
            bad_date = []

            good_data = []
            bad_data = []
            # 求并集并排序
            for i in good_res:
                good_date.append(i)
            for j in bad_res:
                bad_date.append(j)
            for k in sorted(list(set(good_date) | set(bad_date))):
                date_ts.append(k)

            # 生成對(duì)應(yīng)的計(jì)數(shù)
            for t in date_ts:
                if t in good_res:
                    good_data.append(good_res[t])
                else:
                    good_data.append(0)
                if t in bad_res:
                    bad_data.append(bad_res[t])
                else:
                    bad_data.append(0)
            # 時(shí)間戳轉(zhuǎn)字符串
            for ts in date_ts:
                date.append(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(ts) / 1000)))

            socketio.emit('url_cate_count_timeline',
                          {"date": date, "good_data": good_data, "bad_data": bad_data},
                          namespace='/url_cate_count_timeline')

    socketio.start_background_task(target=loop)


# 實(shí)時(shí)異常請(qǐng)求概覽
@socketio.on('connect', namespace='/bad_detail')
def bad_detail():
    def loop():
        while True:
            socketio.sleep(time_interval)
            consumer = KafkaConsumer("bad_result", bootstrap_servers=kafka_bootstrap_servers)
            data = []

            for msg in consumer:
                result = ast.literal_eval(bytes.decode(msg.value))
                for record in result:
                    if record['host'] != "foo":
                        # 調(diào)用接口獲取地理坐標(biāo)
                        req = requests.get("http://api.map.baidu.com/location/ip",
                                           {'ak': '0jKbOcwqK7dGZiYIhSai5rsxTnQZ4UQt',
                                            'ip': record['host'],
                                            'coor': 'bd09ll'})
                        body = eval(req.text)
                        # 僅顯示境內(nèi)定位
                        if body['status'] == 0:
                            address = body['content']['address']

                            datetime = time.strftime("%Y-%m-%d %H:%M:%S",
                                                     time.localtime(int(record['timestamp']) / 1000))

                            data.append({"host": record['host'], "address": address, "url": record['url'],
                                         "datetime": datetime, "req_method": record['req_method'],
                                         "protocol": record['protocol'], "status_code": record['status_code'],
                                         "pred": record['prediction'], 'prob': record['probability']['values']})

                            socketio.emit('bad_detail',
                                          {"data": data},
                                          namespace='/bad_detail')
    socketio.start_background_task(target=loop)


# 實(shí)時(shí)正常請(qǐng)求概覽
@socketio.on('connect', namespace='/good_detail')
def good_detail():
    def loop():
        while True:
            socketio.sleep(time_interval)
            consumer = KafkaConsumer("good_result", bootstrap_servers=kafka_bootstrap_servers)
            data = []

            for msg in consumer:
                result = ast.literal_eval(bytes.decode(msg.value))
                for record in result:
                    if record['host'] != "foo":
                        # 調(diào)用接口獲取地理坐標(biāo)
                        req = requests.get("http://api.map.baidu.com/location/ip",
                                           {'ak': '0jKbOcwqK7dGZiYIhSai5rsxTnQZ4UQt',
                                            'ip': record['host'],
                                            'coor': 'bd09ll'})
                        body = eval(req.text)
                        # 僅顯示境內(nèi)定位
                        if body['status'] == 0:
                            address = body['content']['address']

                            datetime = time.strftime("%Y-%m-%d %H:%M:%S",
                                                     time.localtime(int(record['timestamp']) / 1000))

                            data.append({"host": record['host'], "address": address, "url": record['url'],
                                         "datetime": datetime, "req_method": record['req_method'],
                                         "protocol": record['protocol'], "status_code": record['status_code'],
                                         "pred": record['prediction'], 'prob': record['probability']['values']})

                            socketio.emit('good_detail',
                                          {"data": data},
                                          namespace='/good_detail')
    socketio.start_background_task(target=loop)


@app.route('/about')
def about():
    return render_template("about.html", async_mode=socketio.async_mode)


if __name__ == '__main__':
    socketio.run(app, host="0.0.0.0", port=5000, debug=True)

到了這里,關(guān)于基于Flume+spark+Flask的分布式實(shí)時(shí)日志分析與入侵檢測(cè)系統(tǒng)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶(hù)投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 基于文心一言AI大模型,編寫(xiě)一段python3程序以獲取華為分布式塊存儲(chǔ)REST接口的實(shí)時(shí)數(shù)據(jù)

    基于文心一言AI大模型,編寫(xiě)一段python3程序以獲取華為分布式塊存儲(chǔ)REST接口的實(shí)時(shí)數(shù)據(jù)

    本文嘗試基于文心一言AI大模型,編寫(xiě)一段python3程序以獲取華為分布式塊存儲(chǔ)REST接口的實(shí)時(shí)數(shù)據(jù)。 一、用文心一言AI大模型將需求轉(zhuǎn)化為樣例代碼 1、第一次對(duì)話(huà):“python3寫(xiě)一段從rest服務(wù)器獲取數(shù)據(jù)的樣例代碼” 同時(shí)生成了以下注解? 這段代碼首先定義了一個(gè)函數(shù)? get_da

    2024年02月03日
    瀏覽(26)
  • 分布式計(jì)算框架:Spark、Dask、Ray
分布式計(jì)算哪家強(qiáng):Spark、Dask、Ray

    分布式計(jì)算框架:Spark、Dask、Ray 分布式計(jì)算哪家強(qiáng):Spark、Dask、Ray

    目錄 什么是分布式計(jì)算 分布式計(jì)算哪家強(qiáng):Spark、Dask、Ray 2 選擇正確的框架 2.1 Spark 2.2 Dask 2.3 Ray 分布式計(jì)算是一種計(jì)算方法,和集中式計(jì)算是相對(duì)的。 隨著計(jì)算技術(shù)的發(fā)展, 有些應(yīng)用需要非常巨大的計(jì)算能力才能完成,如果采用集中式計(jì)算,需要耗費(fèi)相當(dāng)長(zhǎng)的時(shí)間來(lái)完成

    2024年02月11日
    瀏覽(102)
  • Spark單機(jī)偽分布式環(huán)境搭建、完全分布式環(huán)境搭建、Spark-on-yarn模式搭建

    Spark單機(jī)偽分布式環(huán)境搭建、完全分布式環(huán)境搭建、Spark-on-yarn模式搭建

    搭建Spark需要先配置好scala環(huán)境。三種Spark環(huán)境搭建互不關(guān)聯(lián),都是從零開(kāi)始搭建。 如果將文章中的配置文件修改內(nèi)容復(fù)制粘貼的話(huà),所有配置文件添加的內(nèi)容后面的注釋記得刪除,可能會(huì)報(bào)錯(cuò)。保險(xiǎn)一點(diǎn)刪除最好。 上傳安裝包解壓并重命名 rz上傳 如果沒(méi)有安裝rz可以使用命

    2024年02月06日
    瀏覽(106)
  • spark分布式解壓工具

    ? spark解壓縮工具,目前支持tar、gz、zip、bz2、7z壓縮格式,默認(rèn)解壓到當(dāng)前路下,也支持自定義的解壓輸出路徑。另外支持多種提交模式,進(jìn)行解壓任務(wù),可通過(guò)自定義配置文件,作為spark任務(wù)的資源設(shè)定 2.1 使用hadoop的FileSystem類(lèi),對(duì)tos文件的進(jìn)行讀取、查找、寫(xiě)入等操作

    2024年02月02日
    瀏覽(96)
  • 分布式內(nèi)存計(jì)算Spark環(huán)境部署與分布式內(nèi)存計(jì)算Flink環(huán)境部署

    分布式內(nèi)存計(jì)算Spark環(huán)境部署與分布式內(nèi)存計(jì)算Flink環(huán)境部署

    目錄 分布式內(nèi)存計(jì)算Spark環(huán)境部署 1.? 簡(jiǎn)介 2.? 安裝 2.1【node1執(zhí)行】下載并解壓 2.2【node1執(zhí)行】修改配置文件名稱(chēng) 2.3【node1執(zhí)行】修改配置文件,spark-env.sh 2.4 【node1執(zhí)行】修改配置文件,slaves 2.5【node1執(zhí)行】分發(fā) 2.6【node2、node3執(zhí)行】設(shè)置軟鏈接 2.7【node1執(zhí)行】啟動(dòng)Spark集群

    2024年02月08日
    瀏覽(126)
  • Spark分布式內(nèi)存計(jì)算框架

    Spark分布式內(nèi)存計(jì)算框架

    目錄 一、Spark簡(jiǎn)介 (一)定義 (二)Spark和MapReduce區(qū)別 (三)Spark歷史 (四)Spark特點(diǎn) 二、Spark生態(tài)系統(tǒng) 三、Spark運(yùn)行架構(gòu) (一)基本概念 (二)架構(gòu)設(shè)計(jì) (三)Spark運(yùn)行基本流程 四、Spark編程模型 (一)核心數(shù)據(jù)結(jié)構(gòu)RDD (二)RDD上的操作 (三)RDD的特性 (四)RDD 的持

    2024年02月04日
    瀏覽(106)
  • Spark彈性分布式數(shù)據(jù)集

    Spark彈性分布式數(shù)據(jù)集

    1. Spark RDD是什么 RDD(Resilient Distributed Dataset,彈性分布式數(shù)據(jù)集)是一個(gè)不可變的分布式對(duì)象集合,是Spark中最基本的數(shù)據(jù)抽象。在代碼中RDD是一個(gè)抽象類(lèi),代表一個(gè)彈性的、不可變、可分區(qū)、里面的元素可并行計(jì)算的集合。 每個(gè)RDD都被分為多個(gè)分區(qū),這些分區(qū)運(yùn)行在集群中

    2024年02月13日
    瀏覽(95)
  • 分布式計(jì)算MapReduce | Spark實(shí)驗(yàn)

    分布式計(jì)算MapReduce | Spark實(shí)驗(yàn)

    題目1 輸入文件為學(xué)生成績(jī)信息,包含了必修課與選修課成績(jī),格式如下: 班級(jí)1, 姓名1, 科目1, 必修, 成績(jī)1 br (注: br 為換行符) 班級(jí)2, 姓名2, 科目1, 必修, 成績(jī)2 br 班級(jí)1, 姓名1, 科目2, 選修, 成績(jī)3 br ………., ………, ………, ………, ……… br 編寫(xiě)兩個(gè)Hadoop平臺(tái)上的MapRed

    2024年02月08日
    瀏覽(91)
  • 分布式搭建(hadoop+hive+spark)

    hadoop-master 192.168.43.141 hadoop-slave1 192.168.43.142 hadoop-slave2 192.168.43.143 鏈接:https://pan.baidu.com/s/1OwKLvZAaw8AtVaO_c6mvtw?pwd=1234 提取碼:1234 MYSQL5.6:wget http://repo.mysql.com/mysql-community-release-el6-5.noarch.rpm Scale:wget https://downloads.lightbend.com/scala/2.12.4/scala-2.12.4.tgz

    2024年02月12日
    瀏覽(21)
  • 【Spark分布式內(nèi)存計(jì)算框架——Spark 基礎(chǔ)環(huán)境】1. Spark框架概述

    【Spark分布式內(nèi)存計(jì)算框架——Spark 基礎(chǔ)環(huán)境】1. Spark框架概述

    第一章 說(shuō)明 整個(gè)Spark 框架分為如下7個(gè)部分,總的來(lái)說(shuō)分為Spark 基礎(chǔ)環(huán)境、Spark 離線分析和Spark實(shí)時(shí)分析三個(gè)大的方面,如下圖所示: 第一方面、Spark 基礎(chǔ)環(huán)境 主要講述Spark框架安裝部署及開(kāi)發(fā)運(yùn)行,如何在本地模式和集群模式運(yùn)行,使用spark-shell及IDEA開(kāi)發(fā)應(yīng)用程序,測(cè)試及

    2024年02月11日
    瀏覽(92)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包