国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Flink實時電商數(shù)倉(八)

這篇具有很好參考價值的文章主要介紹了Flink實時電商數(shù)倉(八)。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

用戶域登錄各窗口匯總表

  • 主要任務(wù):從kafka頁面日志主題讀取數(shù)據(jù),統(tǒng)計
    • 七日回流用戶:之前活躍的用戶,有一段時間不活躍了,之后又開始活躍,稱為回流用戶
    • 當(dāng)日獨立用戶數(shù):同一個用戶當(dāng)天重復(fù)登錄,只算作一個獨立用戶。

思路分析

  1. 讀取kafka頁面主題數(shù)據(jù)
  2. 轉(zhuǎn)換數(shù)據(jù)結(jié)構(gòu):String -> JSONObject
  3. 過濾數(shù)據(jù),uid不為null
    • 登錄的兩種情況
      • 用戶打開應(yīng)用后自動登錄
      • 用戶打印應(yīng)用后沒有登錄,瀏覽后跳轉(zhuǎn)到登錄頁面
    • 過濾條件:
      • uid不為null且last_page_id is null
      • last_page_id = login
  4. 設(shè)置水位線
  5. 按照uid分組
  6. 統(tǒng)計回流用戶數(shù)和獨立用戶數(shù)
  7. 開窗聚合
  8. 寫入doris

具體實現(xiàn)

  1. 設(shè)置端口、并行度、消費者組、kafka主題
  2. 讀取dwd頁面主題數(shù)據(jù)
    - stream.print()
  3. 對數(shù)據(jù)進(jìn)行清洗過濾:uid不為空
    • stream.flatMap()使用flatMap過濾
    • new FlatMapFunction<>(){}在該方法內(nèi)部轉(zhuǎn)換為JSONObject, 并且獲取uid和lastPageId, try-catch這段代碼
    • 判斷是否滿足思路分析中的條件,如果中途發(fā)生異常,直接catch后打印到控制臺清理掉即可。
  4. 先注冊水位線
    • jsonObjStream.assignTimestampAndWatermark
    • new SerializableTimestampAssigner<>, 提取數(shù)據(jù)中的ts
  5. 按照uid分組
    • stream.keyby()按照uid進(jìn)行分組
  6. 判斷獨立用戶和回流用戶
    • 創(chuàng)建UserLoginBean, 使用狀態(tài)保存用戶的登錄信息
    • 在open方法中,getRuntimeContext().getState(new ValueStateDescriptor<>("last_login_dt",String.class))創(chuàng)建狀態(tài)記錄用戶上一次的登錄時間
    • processElement方法中比較當(dāng)前登錄的日期和狀態(tài)存儲的日期
      • 如果lastLoginDt==null是新用戶
      • 如果不為空,判斷上次登錄時間和當(dāng)前時間的差值是否大于7天;如果大于7天,說明是回流用戶。
      • 如果小于7天,還需要判斷上次登錄時間是否是今天,如果不是今天,則說明該用戶本次是獨立用戶。
  7. 開窗聚合
    • 使用滾動窗口開窗聚合
    • reduce算子中寫聚合邏輯
    • process算子中獲取窗口信息
  8. 寫入doris
    • 創(chuàng)建doris sink,寫出到doris

核心代碼

public static void main(String[] args) {
        new DwsUserUserLoginWindow().start(10024,4,"dws_user_user_login_window", Constant.TOPIC_DWD_TRAFFIC_PAGE);
    }

    @Override
    public void handle(StreamExecutionEnvironment env, DataStreamSource<String> stream) {
        //1.讀取dwd頁面數(shù)據(jù)
        //stream.print();

        //2. 對數(shù)據(jù)進(jìn)行清洗過濾
        SingleOutputStreamOperator<JSONObject> jsonObjStream = etl(stream);

        //3. 注冊水位線
        SingleOutputStreamOperator<JSONObject> withWatermarkStream = addWatermark(jsonObjStream);

        //4. 按照uid分組
        KeyedStream<JSONObject, String> keyedStream = getKeyedStream(withWatermarkStream);

        //5. 判斷獨立用戶和回流用戶
        SingleOutputStreamOperator<UserLoginBean> processedStream = getUserLoginBeanStream(keyedStream);

        //processedStream.print();

        //開窗聚合
        SingleOutputStreamOperator<UserLoginBean> reducedStream = getReducedStream(processedStream);

        //reducedStream.print();

        //寫入Doris
        reducedStream.map(new DorisMapFunction<>())
                .sinkTo(FlinkSinkUtil.getDorisSink(Constant.DWS_USER_USER_LOGIN_WINDOW));


    }

[gitee倉庫地址:(https://gitee.com/langpaian/gmall2023-realtime)文章來源地址http://www.zghlxwxcb.cn/news/detail-773577.html

到了這里,關(guān)于Flink實時電商數(shù)倉(八)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • Flink CDC實時同步PG數(shù)據(jù)庫

    JDK:1.8 Flink:1.16.2 Scala:2.11 Hadoop:3.1.3 github地址:https://github.com/rockets0421/FlinkCDC-PG.git? 1、更改配置文件postgresql.conf # 更改wal日志方式為logical wal_level = logical # minimal, replica, or logical # 更改solts最大數(shù)量(默認(rèn)值為10),flink-cdc默認(rèn)一張表占用一個slots max_replication_slots = 20 # m

    2024年02月13日
    瀏覽(35)
  • [大數(shù)據(jù) Flink,Java實現(xiàn)不同數(shù)據(jù)庫實時數(shù)據(jù)同步過程]

    目錄 ??前言: ??實現(xiàn)Mysql同步Es的過程包括以下步驟: ??配置Mysql數(shù)據(jù)庫連接 ??在Flink的配置文件中,添加Mysql數(shù)據(jù)庫的連接信息。可以在flink-conf.yaml文件中添加如下配置: ??在Flink程序中,使用JDBCInputFormat來連接Mysql數(shù)據(jù)庫,并定義查詢語句,獲取需要同步的數(shù)據(jù)。具體代

    2024年02月10日
    瀏覽(22)
  • 實時Flink的數(shù)據(jù)庫與Kafka集成優(yōu)化案例

    在現(xiàn)代數(shù)據(jù)處理系統(tǒng)中,實時數(shù)據(jù)處理和分析是至關(guān)重要的。Apache Flink是一個流處理框架,可以用于實時數(shù)據(jù)處理和分析。在許多場景下,F(xiàn)link需要與數(shù)據(jù)庫和Kafka等消息系統(tǒng)進(jìn)行集成,以實現(xiàn)更高效的數(shù)據(jù)處理。本文將討論Flink與數(shù)據(jù)庫和Kafka集成的優(yōu)化案例,并提供實際示

    2024年02月20日
    瀏覽(29)
  • Flink 實時數(shù)倉 (一) --------- 數(shù)據(jù)采集層

    Flink 實時數(shù)倉 (一) --------- 數(shù)據(jù)采集層

    1. 普通實時計算與實時數(shù)倉比較 普通的實時計算優(yōu)先考慮時效性,所以從數(shù)據(jù)源采集經(jīng)過實時計算直接得到結(jié)果。如此做時效性更好,但是弊端是由于計算過程中的中間結(jié)果沒有沉淀下來,所以當(dāng)面對大量實時需求的時候,計算的復(fù)用性較差,開發(fā)成本隨著需求增加直線上升

    2024年02月06日
    瀏覽(29)
  • OceanBase X Flink 基于原生分布式數(shù)據(jù)庫構(gòu)建實時計算解決方案

    OceanBase X Flink 基于原生分布式數(shù)據(jù)庫構(gòu)建實時計算解決方案

    摘要:本文整理自 OceanBase 架構(gòu)師周躍躍,在 Flink Forward Asia 2022 實時湖倉專場的分享。本篇內(nèi)容主要分為四個部分: 分布式數(shù)據(jù)庫 OceanBase 關(guān)鍵技術(shù)解讀 生態(tài)對接以及典型應(yīng)用場景 OceanBase X Flink 在游戲行業(yè)實踐 未來展望 點擊查看原文視頻 演講PPT 作為一款歷經(jīng) 12 年的純自研

    2024年02月13日
    瀏覽(26)
  • Flink+Doris 實時數(shù)倉

    Flink+Doris 實時數(shù)倉

    Doris基本原理 Doris基本架構(gòu)非常簡單,只有FE(Frontend)、BE(Backend)兩種角色,不依賴任何外部組件,對部署和運維非常友好。架構(gòu)圖如下 可以 看到Doris 的數(shù)倉架構(gòu)十分簡潔,不依賴 Hadoop 生態(tài)組件,構(gòu)建及運維成本較低。 FE(Frontend)以 Java 語言為主,主要功能職責(zé): 接收用戶

    2024年02月07日
    瀏覽(20)
  • Flink CDC和Flink SQL構(gòu)建實時數(shù)倉Flink寫入Doris

    Flink CDC和Flink SQL構(gòu)建實時數(shù)倉Flink寫入Doris

    軟件環(huán)境 Flink1.13.3 Scala 2.12 doris 0.14 一、MySQL 開啟binlog日志、創(chuàng)建用戶 1.開啟bin log MySQL 8.0默認(rèn)開啟了binlog,可以通過代碼show variables like \\\"%log_bin%\\\";查詢是否開啟了,show variables like \\\"%server_id%\\\";查詢服務(wù)器ID。 上圖分別顯示了bin long是否開啟以及bin log所在的位置。 2.創(chuàng)建用戶 C

    2024年02月02日
    瀏覽(17)
  • 實時數(shù)倉|基于Flink1.11的SQL構(gòu)建實時數(shù)倉探索實踐

    實時數(shù)倉主要是為了解決傳統(tǒng)數(shù)倉數(shù)據(jù)時效性低的問題,實時數(shù)倉通常會用在實時的 OLAP 分析、實時的數(shù)據(jù)看板、業(yè)務(wù)指標(biāo)實時監(jiān)控等場景。雖然關(guān)于實時數(shù)倉的架構(gòu)及技術(shù)選型與傳統(tǒng)的離線數(shù)倉會存在差異,但是關(guān)于數(shù)倉建設(shè)的基本方法論是一致的。本文會分享基于 Flink

    2024年02月16日
    瀏覽(22)
  • Flink實時數(shù)倉同步:拉鏈表實戰(zhàn)詳解

    Flink實時數(shù)倉同步:拉鏈表實戰(zhàn)詳解

    在大數(shù)據(jù)領(lǐng)域,業(yè)務(wù)數(shù)據(jù)通常最初存儲在關(guān)系型數(shù)據(jù)庫,例如MySQL。然而,為了滿足日常分析和報表等需求,大數(shù)據(jù)平臺會采用多種不同的存儲方式來容納這些業(yè)務(wù)數(shù)據(jù)。這些存儲方式包括離線倉庫、實時倉庫等,根據(jù)不同的業(yè)務(wù)需求和數(shù)據(jù)特性進(jìn)行選擇。 舉例來說,假設(shè)業(yè)

    2024年01月20日
    瀏覽(45)
  • flink 實時數(shù)倉構(gòu)建與開發(fā)[記錄一些坑]

    flink 實時數(shù)倉構(gòu)建與開發(fā)[記錄一些坑]

    1、業(yè)務(wù)庫使用pg數(shù)據(jù)庫, 業(yè)務(wù)數(shù)據(jù)可以改動任意時間段數(shù)據(jù) 2、監(jiān)聽采集業(yè)務(wù)庫數(shù)據(jù),實時捕捉業(yè)務(wù)庫數(shù)據(jù)變更,同時實時變更目標(biāo)表和報表數(shù)據(jù) 實時數(shù)據(jù)流圖與分層設(shè)計說明 1、debezium采集pg庫表數(shù)據(jù)同步到kafka 【kafka模式】 2、flink 消費kafka寫入pg或kafka 【upset-kafka,新版k

    2024年02月16日
    瀏覽(29)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包