common模塊回顧
- app
- BaseApp: 作為其他子模塊中使用Flink - StreamAPI的父類,實現(xiàn)了StreamAPI中的通用邏輯,在其他子模塊中只需編寫關于數(shù)據(jù)處理的核心邏輯。
- BaseSQLApp: 作為其他子模塊中使用Flink- SQLAPI的父類。在里面設置了使用SQL API的環(huán)境、并行度、檢查點等固定邏輯。
- bean:存放其他子模塊中使用到的javaBean對象,因為如果一直使用jsonObject對象調(diào)用數(shù)據(jù)的話,需要使用類似
getString("字段名")
的方式,沒有直接使用javaBean對象那么方便。 - constant
- 存儲字符串常量
- 為了保證一致性,如果某個常量修改時,只需在這里修改即可對整個項目進行修改
- function
- DorisMapFunction:將javaBean對象轉(zhuǎn)換為對應的json字符串對象,并且將駝峰式命名方式修改為蛇形命名方式。便于寫入doris。
- util
- DateFormateUtil
- FlinkSinkUtil
- FlinkSourceUtil
- HBaseUtil
- IkUtil
- JdbcUtil
- SQLUtil
- getUpsertKafakaSQL: 一定要聲明主鍵,支持撤回流
- getDorisSinkSQL: 用于寫入Doris
dim層回顧
- Flink-cdc監(jiān)控mysql中的維度配置表
- 將監(jiān)控的數(shù)據(jù)流做成廣播流
- 將廣播流和讀取數(shù)據(jù)的主流進行connect
- 主流數(shù)據(jù)根據(jù)廣播流的配置信息進行分流,注意需要先提前緩存一次配置表信息
- 達到動態(tài)拆分數(shù)據(jù)表的效果
dwd層FlinkSQL回顧
- 注意join時會將所有數(shù)據(jù)都存儲到內(nèi)存中,需要考慮設置TTL
- 大表join小表時,可以考慮使用lookup join
- 如果數(shù)據(jù)流有明確的先后關系時,考慮使用Interval join
在支付成功模塊,由于訂單詳情表處理時已經(jīng)存在撤回流,但支付成功模塊也是使用left join方式調(diào)用訂單詳情數(shù)據(jù),會導致產(chǎn)生兩次撤回流。在后續(xù)dws層處理時,要注意對數(shù)據(jù)進行去重過濾。
dws層回顧
- 如何判斷使用FlinkSQL還是StreamAPI
- 如果比較標準化, 比如簡單的開窗聚合,一般使用FlinkSQL
- 如果需要使用狀態(tài)處理數(shù)據(jù),比如判斷是否為獨立用戶,使用StreamAPI
交易域sku粒度訂單下單各窗口匯總
-
需求分析:從Kafka訂單明細主題讀取數(shù)據(jù),過濾null數(shù)據(jù)并按照唯一鍵對數(shù)據(jù)去重,按照SKU維度分組,統(tǒng)計原始金額、活動減免金額、優(yōu)惠券減免金額和訂單金額,并關聯(lián)維度信息,將數(shù)據(jù)寫入Doris交易域SKU粒度下單各窗口匯總表
-
思路分析:文章來源:http://www.zghlxwxcb.cn/news/detail-773574.html
- 方案一:按照訂單ID進行分組,根據(jù)業(yè)務邏輯設置定時器取最后一個數(shù)據(jù)進行發(fā)送
- 方案二:將度量值存放到狀態(tài)中,每次新數(shù)據(jù)到達時,將新的度量值減去狀態(tài)中的度量值
-
具體實現(xiàn)文章來源地址http://www.zghlxwxcb.cn/news/detail-773574.html
- 因為需要使用狀態(tài),故使用BaseApp; 設置端口號10029,并發(fā)度4,消費者組為類名,消費者主題名稱為dwd訂單詳情
- 讀取dwd下單主題數(shù)據(jù),
stream.print()
- 過濾清洗:
- 去掉null數(shù)據(jù),
stream.flatMap(new FlatMapFunction<>())
- ts: 水位線,不能為空;進行位數(shù)的修正,如果是10位的,使用
jsonObj.put("ts", ts*1000)
- id: keyby的關鍵字,不能為空
- sku_id: group by的粒度關鍵字,也不能為空
- 去掉null數(shù)據(jù),
- 添加水位線
- 網(wǎng)絡延遲5L
- 添加數(shù)據(jù)的泛型,提取數(shù)據(jù)中的ts,作為水位線(注意觀察ts的位數(shù),需要為13位,毫秒級)
- 修正度量值,轉(zhuǎn)換數(shù)據(jù)結構
- 使用id關鍵字進行分組
- 使用process算子中的狀態(tài)來進行處理
stream.process(new KeyedProcessFunction<>)
,返回值為對應的javabean對象 - 在狀態(tài)中存儲上一次的度量值大小,只保存30秒
- 在
processElement()
方法中獲取狀態(tài)中的度量值,使用前需要判空,如果為空設置為0,之后才能進行數(shù)值計算。 - 創(chuàng)建對應的bean對象,度量值都減去狀態(tài)中的度量值和更新狀態(tài)中當前的度量值
- 分組開窗聚合
- 使用skuId進行keyby
- 分組后使用
window
算子進行開窗,設置窗口時間,注意Time屬于org.apache.flink.streaming.api.windowing.time.Time.seconds()
- 使用reduce算子進行聚合計算, 聚合時需要累積所有度量值
-
new ProcessWindowFunction()
獲取窗口信息, startTime, EndTime, curTime, 獲取到后寫入javaBean對象中
- 關聯(lián)維度信息
- 先分組聚合再關聯(lián)維度信息的原因:關聯(lián)維度信息需要join操作,是很耗費性能的大操作。先聚合數(shù)據(jù)能大幅度減少數(shù)據(jù)量。
- 啟動HBase,查看對的sku_info表中是否存儲著對應的維度信息
- 獲取外部連接,需要使用生命周期方法(open,close在整個算子執(zhí)行過程中只運行一次);對應的關聯(lián)維度信息,即RichMapFunction()
- 在
map
方法中使用HBase的API讀取表格數(shù)據(jù),使用讀取到的字段補全原本的信息
- 創(chuàng)建HBase的API:讀取表格數(shù)據(jù) get
- 獲取table
- 創(chuàng)建get對象
- 調(diào)用get方法
- 獲取數(shù)據(jù)寫入jsonObj
- 寫出到Doris
維度關聯(lián)優(yōu)化
- 旁路緩存:獨立緩存服務有(redis, memcache).
- 使用旁路緩存時要注意保持數(shù)據(jù)的一致性,如果數(shù)據(jù)發(fā)生修改和刪除,直接刪除redis中的數(shù)據(jù)。
同步+旁路緩存模式
- 引入Jedis相關依賴
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
</dependency>
- 創(chuàng)建Redis工具類RedisUtil
- 在RichMapFunction中的open和close方法中獲取和關閉HBase和Redisd的連接。
- 拼接對應的redisRowKey
- 讀取Redis緩存的數(shù)據(jù),jsonObj的字符串
- 判斷redis讀取到的數(shù)據(jù)是否為空
- 沒有數(shù)據(jù):需要讀取HBase;
jsonObj = HBaseUtil.getCells()
, 讀取到數(shù)據(jù)后,使用jedis.setex()
存儲到redis - redis有緩存,直接返回
- 沒有數(shù)據(jù):需要讀取HBase;
- 進行維度關聯(lián)
Dim層寫入HBase修正
- 在dim層將數(shù)據(jù)寫入HBase時,需要同時獲取Redis的連接。
- 判斷redis中的緩存是否發(fā)生變化
- 判斷數(shù)據(jù)類型是修改或刪除時,刪除Redis中對應的數(shù)據(jù)
- 拼寫數(shù)據(jù)的rowkey
- 使用
jedis.del(rediskey)
來刪除
到了這里,關于Flink實時電商數(shù)倉(十)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!