用戶域登錄各窗口匯總表
- 主要任務(wù):從kafka頁面日志主題讀取數(shù)據(jù),統(tǒng)計
- 七日回流用戶:之前活躍的用戶,有一段時間不活躍了,之后又開始活躍,稱為回流用戶
- 當(dāng)日獨立用戶數(shù):同一個用戶當(dāng)天重復(fù)登錄,只算作一個獨立用戶。
思路分析
- 讀取kafka頁面主題數(shù)據(jù)
- 轉(zhuǎn)換數(shù)據(jù)結(jié)構(gòu):
String -> JSONObject
- 過濾數(shù)據(jù),uid不為null
- 登錄的兩種情況
- 用戶打開應(yīng)用后自動登錄
- 用戶打印應(yīng)用后沒有登錄,瀏覽后跳轉(zhuǎn)到登錄頁面
- 過濾條件:
- uid不為null且last_page_id is null
- last_page_id = login
- 登錄的兩種情況
- 設(shè)置水位線
- 按照uid分組
- 統(tǒng)計回流用戶數(shù)和獨立用戶數(shù)
- 開窗聚合
- 寫入doris
具體實現(xiàn)
- 設(shè)置端口、并行度、消費者組、kafka主題
- 讀取dwd頁面主題數(shù)據(jù)
-stream.print()
- 對數(shù)據(jù)進(jìn)行清洗過濾:uid不為空
-
stream.flatMap()
使用flatMap過濾 -
new FlatMapFunction<>(){}
在該方法內(nèi)部轉(zhuǎn)換為JSONObject
, 并且獲取uid和lastPageId, try-catch這段代碼 - 判斷是否滿足思路分析中的條件,如果中途發(fā)生異常,直接catch后打印到控制臺清理掉即可。
-
- 先注冊水位線
jsonObjStream.assignTimestampAndWatermark
-
new SerializableTimestampAssigner<>
, 提取數(shù)據(jù)中的ts
- 按照uid分組
-
stream.keyby()
按照uid進(jìn)行分組
-
- 判斷獨立用戶和回流用戶
- 創(chuàng)建
UserLoginBean
, 使用狀態(tài)保存用戶的登錄信息 - 在open方法中,
getRuntimeContext().getState(new ValueStateDescriptor<>("last_login_dt",String.class))
創(chuàng)建狀態(tài)記錄用戶上一次的登錄時間 - 在
processElement
方法中比較當(dāng)前登錄的日期和狀態(tài)存儲的日期- 如果
lastLoginDt==null
是新用戶 - 如果不為空,判斷上次登錄時間和當(dāng)前時間的差值是否大于7天;如果大于7天,說明是回流用戶。
- 如果小于7天,還需要判斷上次登錄時間是否是今天,如果不是今天,則說明該用戶本次是獨立用戶。
- 如果
- 創(chuàng)建
- 開窗聚合
- 使用滾動窗口開窗聚合
- 在
reduce
算子中寫聚合邏輯 - 在
process
算子中獲取窗口信息
- 寫入doris
- 創(chuàng)建
doris sink
,寫出到doris
- 創(chuàng)建
核心代碼
public static void main(String[] args) {
new DwsUserUserLoginWindow().start(10024,4,"dws_user_user_login_window", Constant.TOPIC_DWD_TRAFFIC_PAGE);
}
@Override
public void handle(StreamExecutionEnvironment env, DataStreamSource<String> stream) {
//1.讀取dwd頁面數(shù)據(jù)
//stream.print();
//2. 對數(shù)據(jù)進(jìn)行清洗過濾
SingleOutputStreamOperator<JSONObject> jsonObjStream = etl(stream);
//3. 注冊水位線
SingleOutputStreamOperator<JSONObject> withWatermarkStream = addWatermark(jsonObjStream);
//4. 按照uid分組
KeyedStream<JSONObject, String> keyedStream = getKeyedStream(withWatermarkStream);
//5. 判斷獨立用戶和回流用戶
SingleOutputStreamOperator<UserLoginBean> processedStream = getUserLoginBeanStream(keyedStream);
//processedStream.print();
//開窗聚合
SingleOutputStreamOperator<UserLoginBean> reducedStream = getReducedStream(processedStream);
//reducedStream.print();
//寫入Doris
reducedStream.map(new DorisMapFunction<>())
.sinkTo(FlinkSinkUtil.getDorisSink(Constant.DWS_USER_USER_LOGIN_WINDOW));
}
[gitee倉庫地址:(https://gitee.com/langpaian/gmall2023-realtime)文章來源地址http://www.zghlxwxcb.cn/news/detail-773577.html
文章來源:http://www.zghlxwxcb.cn/news/detail-773577.html
到了這里,關(guān)于Flink實時電商數(shù)倉(八)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!