0 前言
?? 這兩年開始畢業(yè)設(shè)計和畢業(yè)答辯的要求和難度不斷提升,傳統(tǒng)的畢設(shè)題目缺少創(chuàng)新和亮點,往往達不到畢業(yè)答辯的要求,這兩年不斷有學弟學妹告訴學長自己做的項目系統(tǒng)達不到老師的要求。
為了大家能夠順利以及最少的精力通過畢設(shè),學長分享優(yōu)質(zhì)畢業(yè)設(shè)計項目,今天要分享的是
?? flink大數(shù)據(jù)淘寶用戶行為數(shù)據(jù)實時分析與可視化
??學長這里給一個題目綜合評分(每項滿分5分)
- 難度系數(shù):3分
- 工作量:3分
- 創(chuàng)新點:4分
1、環(huán)境準備
1.1 flink 下載相關(guān) jar 包
flink-sql 連接外部系統(tǒng)時,需要依賴特定的 jar 包,所以需要事先把這些 jar 包準備好。說明與下載入口
本項目使用到了以下的 jar 包 ,下載后直接放在了 flink/lib 里面。
需要注意的是 flink-sql 執(zhí)行時,是轉(zhuǎn)化為 flink-job 提交到集群執(zhí)行的,所以 flink 集群的每一臺機器都要添加以下的 jar 包。
外部 | 版本 | jar |
---|---|---|
kafka | 4.1 | flink-sql-connector-kafka_2.11-1.10.2.jar flink-json-1.10.2-sql-jar.jar |
elasticsearch | 7.6 | flink-sql-connector-elasticsearch7_2.11-1.10.2.jar |
mysql | 5.7 | flink-jdbc_2.11-1.10.2.jar mysql-connector-java-8.0.11.jar |
1.2 生成 kafka 數(shù)據(jù)
用戶行為數(shù)據(jù)來源: 阿里云天池公開數(shù)據(jù)集
網(wǎng)盤:https://pan.baidu.com/s/1wDVQpRV7giIlLJJgRZAInQ 提取碼:gja5
商品類目緯度數(shù)據(jù)來源: category.sql
數(shù)據(jù)生成器:datagen.py
有了數(shù)據(jù)文件之后,使用 python 讀取文件數(shù)據(jù),然后并發(fā)寫入到 kafka。
修改生成器中的 kafka 地址配置,然后運行 以下命令,開始不斷往 kafka 寫數(shù)據(jù)
# 5000 并發(fā)
nohup python3 datagen.py 5000 &
1.3 開發(fā)前的三個小 tip
-
生成器往 kafka 寫數(shù)據(jù),會自動創(chuàng)建主題,無需事先創(chuàng)建
-
flink 往 elasticsearch 寫數(shù)據(jù),會自動創(chuàng)建索引,無需事先創(chuàng)建
-
Kibana 使用索引模式從 Elasticsearch 索引中檢索數(shù)據(jù),以實現(xiàn)諸如可視化等功能。
使用的邏輯為:創(chuàng)建索引模式 》Discover (發(fā)現(xiàn)) 查看索引數(shù)據(jù) 》visualize(可視化)創(chuàng)建可視化圖表》dashboards(儀表板)創(chuàng)建大屏,即匯總多個可視化的圖表
2、flink-sql 客戶端編寫運行 sql
# 進入 flink-sql 客戶端, 需要指定剛剛下載的 jar 包目錄
./bin/sql-client.sh embedded -l lib
2.1 創(chuàng)建 kafka 數(shù)據(jù)源表
-- 創(chuàng)建 kafka 表, 讀取 kafka 數(shù)據(jù)
CREATE TABLE user_behavior (
user_id BIGINT,
item_id BIGINT,
category_id BIGINT,
behavior STRING,
ts TIMESTAMP(3),
proctime as PROCTIME(),
WATERMARK FOR ts as ts - INTERVAL '5' SECOND
) WITH (
'connector.type' = 'kafka',
'connector.version' = 'universal',
'connector.topic' = 'user_behavior',
'connector.startup-mode' = 'earliest-offset',
'connector.properties.zookeeper.connect' = '172.16.122.24:2181',
'connector.properties.bootstrap.servers' = '172.16.122.17:9092',
'format.type' = 'json'
);
SELECT * FROM user_behavior;
2.2 指標統(tǒng)計:每小時成交量
2.2.1 創(chuàng)建 es 結(jié)果表, 存放每小時的成交量
CREATE TABLE buy_cnt_per_hour (
hour_of_day BIGINT,
buy_cnt BIGINT
) WITH (
'connector.type' = 'elasticsearch',
'connector.version' = '7',
'connector.hosts' = 'http://172.16.122.13:9200',
'connector.index' = 'buy_cnt_per_hour',
'connector.document-type' = 'user_behavior',
'connector.bulk-flush.max-actions' = '1',
'update-mode' = 'append',
'format.type' = 'json'
);
2.2.2 執(zhí)行 sql ,統(tǒng)計每小時的成交量
INSERT INTO buy_cnt_per_hour
SELECT HOUR(TUMBLE_START(ts, INTERVAL '1' HOUR)), COUNT(*)
FROM user_behavior
WHERE behavior = 'buy'
GROUP BY TUMBLE(ts, INTERVAL '1' HOUR);
2.3 指標統(tǒng)計:每10分鐘累計獨立用戶數(shù)
2.3.1 創(chuàng)建 es 結(jié)果表,存放每10分鐘累計獨立用戶數(shù)
CREATE TABLE cumulative_uv (
time_str STRING,
uv BIGINT
) WITH (
'connector.type' = 'elasticsearch',
'connector.version' = '7',
'connector.hosts' = 'http://172.16.122.13:9200',
'connector.index' = 'cumulative_uv',
'connector.document-type' = 'user_behavior',
'update-mode' = 'upsert',
'format.type' = 'json'
);
2.3.2 創(chuàng)建視圖
CREATE VIEW uv_per_10min AS
SELECT
MAX(SUBSTR(DATE_FORMAT(ts, 'HH:mm'),1,4) || '0') OVER w AS time_str,
COUNT(DISTINCT user_id) OVER w AS uv
FROM user_behavior
WINDOW w AS (ORDER BY proctime ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW);
2.3.3 執(zhí)行 sql ,統(tǒng)計每10分鐘的累計獨立用戶數(shù)
INSERT INTO cumulative_uv
SELECT time_str, MAX(uv)
FROM uv_per_10min
GROUP BY time_str;
2.4 指標統(tǒng)計:商品類目銷量排行
2.4.1 創(chuàng)建商品類目維表
先在 mysql 創(chuàng)建一張商品類目的維表,然后配置 flink 讀取 mysql。
CREATE TABLE category_dim (
sub_category_id BIGINT,
parent_category_name STRING
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://172.16.122.25:3306/flink',
'connector.table' = 'category',
'connector.driver' = 'com.mysql.jdbc.Driver',
'connector.username' = 'root',
'connector.password' = 'root',
'connector.lookup.cache.max-rows' = '5000',
'connector.lookup.cache.ttl' = '10min'
);
2.4.1 創(chuàng)建 es 結(jié)果表,存放商品類目排行表
CREATE TABLE top_category (
category_name STRING,
buy_cnt BIGINT
) WITH (
'connector.type' = 'elasticsearch',
'connector.version' = '7',
'connector.hosts' = 'http://172.16.122.13:9200',
'connector.index' = 'top_category',
'connector.document-type' = 'user_behavior',
'update-mode' = 'upsert',
'format.type' = 'json'
);
2.4.2 創(chuàng)建視圖
CREATE VIEW rich_user_behavior AS
SELECT U.user_id, U.item_id, U.behavior, C.parent_category_name as category_name
FROM user_behavior AS U LEFT JOIN category_dim FOR SYSTEM_TIME AS OF U.proctime AS C
ON U.category_id = C.sub_category_id;
2.4.3 執(zhí)行 sql , 統(tǒng)計商品類目銷量排行
INSERT INTO top_category
SELECT category_name, COUNT(*) buy_cnt
FROM rich_user_behavior
WHERE behavior = 'buy'
GROUP BY category_name;
3、最終效果與體驗心得
3.1 最終效果
整個開發(fā)過程,只用到了 flink-sql ,無需寫 java 或者其它代碼,就完成了這樣一個實時報表。
3.2 體驗心得
3.2.1 執(zhí)行
-
flink-sql 的 ddl 語句不會觸發(fā) flink-job , 同時創(chuàng)建的表、視圖僅在會話級別有效。
-
對于連接表的 insert、select 等操作,則會觸發(fā)相應的流 job, 并自動提交到 flink 集群,無限地運行下去,直到主動取消或者 job 報錯。
-
flink-sql 客戶端關(guān)閉后,對于已經(jīng)提交到 flink 集群的 job 不會有任何影響。
本次開發(fā),執(zhí)行了 3 個 insert , 因此打開 flink 集群面板,可以看到有 3 個無限的流 job 。即使 kafka 數(shù)據(jù)全部寫入完畢,關(guān)閉 flink-sql 客戶端,這個 3 個 job 都不會停止。
3.2.2 存儲
-
flnik 本身不存儲業(yè)務數(shù)據(jù),只作為流批一體的引擎存在,所以主要的用法為讀取外部系統(tǒng)的數(shù)據(jù),處理后,再寫到外部系統(tǒng)。文章來源:http://www.zghlxwxcb.cn/news/detail-812057.html
-
flink 本身的元數(shù)據(jù),包括表、函數(shù)等,默認情況下只是存放在內(nèi)存里面,所以僅會話級別有效。但是,似乎可以存儲到 Hive Metastore 中,關(guān)于這一點就留到以后再實踐。文章來源地址http://www.zghlxwxcb.cn/news/detail-812057.html
4 最后
到了這里,關(guān)于大數(shù)據(jù)畢設(shè)分享 flink大數(shù)據(jù)淘寶用戶行為數(shù)據(jù)實時分析與可視化的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!