国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink SQL編程實戰(zhàn) (熱門商品TOP N)

這篇具有很好參考價值的文章主要介紹了大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink SQL編程實戰(zhàn) (熱門商品TOP N)。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

一、需求描述

每隔30min 統(tǒng)計最近 1hour的熱門商品 top3, 并把統(tǒng)計的結(jié)果寫入到mysql中。

二、需求分析

  • 1.統(tǒng)計每個商品的點擊量, 開窗
  • 2.分組窗口分組
  • 3.over窗口

三、需求實現(xiàn)

3.1、創(chuàng)建數(shù)據(jù)源示例

input/UserBehavior.csv

543462,1715,1464116,pv,1511658000
662867,2244074,1575622,pv,1511658000
561558,3611281,965809,pv,1511658000
894923,3076029,1879194,pv,1511658000
834377,4541270,3738615,pv,1511658000
315321,942195,4339722,pv,1511658000
625915,1162383,570735,pv,1511658000
578814,176722,982926,pv,1511658000
873335,1256540,1451783,pv,1511658000
429984,4625350,2355072,pv,1511658000
866796,534083,4203730,pv,1511658000
937166,321683,2355072,pv,1511658000
156905,2901727,3001296,pv,1511658000
758810,5109495,1575622,pv,1511658000
107304,111477,4173315,pv,1511658000
452437,3255022,5099474,pv,1511658000
813974,1332724,2520771,buy,1511658000
524395,3887779,2366905,pv,1511658000

3.2、創(chuàng)建目標(biāo)表

CREATE DATABASE flink_sql; //創(chuàng)建flink_sql庫
USE flink_sql;
DROP TABLE IF EXISTS `hot_item`;
CREATE TABLE `hot_item` (
  `w_end` timestamp NOT NULL,
  `item_id` bigint(20) NOT NULL,
  `item_count` bigint(20) NOT NULL,
  `rk` bigint(20) NOT NULL,
  PRIMARY KEY (`w_end`,`rk`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

3.3、導(dǎo)入JDBC Connector依賴

<!-- 導(dǎo)入JDBC Connector依賴 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

3.4、代碼實現(xiàn)

package com.atguigu.flink.java.chapter_12;

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @Author lizhenchao@atguigu.cn
 * @Date 2021/1/31 9:11
 */
public class Flink01_HotItem_TopN {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        StreamTableEnvironment tenv = StreamTableEnvironment.create(env);


        // 使用sql從文件讀取數(shù)據(jù)
        tenv.executeSql(
            "create table user_behavior(" +
                "   user_id bigint, " +
                "   item_id bigint, " +
                "   category_id int, " +
                "   behavior string, " +
                "   ts bigint, " +
                "   event_time as to_timestamp(from_unixtime(ts, 'yyyy-MM-dd HH:mm:ss')), " +
                "   watermark for event_time as  event_time - interval '5' second " +
                ")with(" +
                "   'connector'='filesystem', " +
                "   'path'='input/UserBehavior.csv', " +
                "   'format'='csv')"
        );

        // 每隔 10m 統(tǒng)計一次最近 1h 的熱門商品 top

        // 1. 計算每每個窗口內(nèi)每個商品的點擊量
        Table t1 = tenv
            .sqlQuery(
                "select " +
                    "   item_id, " +
                    "   hop_end(event_time, interval '10' minute, interval '1' hour) w_end," +
                    "   count(*) item_count " +
                    "from user_behavior " +
                    "where behavior='pv' " +
                    "group by hop(event_time, interval '10' minute, interval '1' hour), item_id"
            );
        tenv.createTemporaryView("t1", t1);
        // 2. 按照窗口開窗, 對商品點擊量進行排名
        Table t2 = tenv.sqlQuery(
            "select " +
                "   *," +
                "   row_number() over(partition by w_end order by item_count desc) rk " +
                "from t1"
        );
        tenv.createTemporaryView("t2", t2);

        // 3. 取 top3
        Table t3 = tenv.sqlQuery(
            "select " +
                "   item_id, w_end, item_count, rk " +
                "from t2 " +
                "where rk<=3"
        );

        // 4. 數(shù)據(jù)寫入到mysql
        // 4.1 創(chuàng)建輸出表
        tenv.executeSql("create table hot_item(" +
                            "   item_id bigint, " +
                            "   w_end timestamp(3), " +
                            "   item_count bigint, " +
                            "   rk bigint, " +
                            "   PRIMARY KEY (w_end, rk) NOT ENFORCED)" +
                            "with(" +
                            "   'connector' = 'jdbc', " +
                            "   'url' = 'jdbc:mysql://hadoop162:3306/flink_sql?useSSL=false', " +
                            "   'table-name' = 'hot_item', " +
                            "   'username' = 'root', " +
                            "   'password' = 'aaaaaa' " +
                            ")");
        // 4.2 寫入到輸出表
        t3.executeInsert("hot_item");
    }
}

執(zhí)行結(jié)果:
大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink SQL編程實戰(zhàn) (熱門商品TOP N),大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-FLINK,大數(shù)據(jù),flink,sql

四、總結(jié)

Flink 使用 OVER 窗口條件和過濾條件相結(jié)合以進行 Top-N 查詢。利用 OVER 窗口的 PARTITION BY 子句的功能,F(xiàn)link 還支持逐組 Top-N 。 例如,每個類別中實時銷量最高的前五種產(chǎn)品。批處理表和流處理表都支持基于SQL的 Top-N 查詢。
流處理模式需注意: TopN 查詢的結(jié)果會帶有更新。 Flink SQL 會根據(jù)排序鍵對輸入的流進行排序;若 top N 的記錄發(fā)生了變化,變化的部分會以撤銷、更新記錄的形式發(fā)送到下游。 推薦使用一個支持更新的存儲作為 Top-N 查詢的 sink 。另外,若 top N 記錄需要存儲到外部存儲,則結(jié)果表需要擁有與 Top-N 查詢相同的唯一鍵。文章來源地址http://www.zghlxwxcb.cn/news/detail-728667.html

到了這里,關(guān)于大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink SQL編程實戰(zhàn) (熱門商品TOP N)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • 大數(shù)據(jù)開發(fā)之Spark(累加器、廣播變量、Top10熱門品類實戰(zhàn))

    大數(shù)據(jù)開發(fā)之Spark(累加器、廣播變量、Top10熱門品類實戰(zhàn))

    累加器:分布式共享只寫變量。(executor和executor之間不能讀數(shù)據(jù)) 累加器用來把executor端變量信息聚合到driver端。在driver中定義的一個變量,在executor端的每個task都會得到這個變量的一份新的副本,每個task更新這些副本的值后,傳回driver端進行合并計算。 1、累加器使用 1)

    2024年01月24日
    瀏覽(22)
  • 實戰(zhàn)Java springboot 采用Flink CDC操作SQL Server數(shù)據(jù)庫獲取增量變更數(shù)據(jù)

    目錄 前言: 1、springboot引入依賴: 2、yml配置文件 3、創(chuàng)建SQL server CDC變更數(shù)據(jù)監(jiān)聽器 4、反序列化數(shù)據(jù),轉(zhuǎn)為變更JSON對象 5、CDC 數(shù)據(jù)實體類 6、自定義ApplicationContextUtil 7、自定義sink 交由spring管理,處理變更數(shù)據(jù) ? ? ? ? 我的場景是從SQL Server數(shù)據(jù)庫獲取指定表的增量數(shù)據(jù),查

    2024年02月10日
    瀏覽(24)
  • flink sql 實戰(zhàn)實例 及延伸問題:聚合/數(shù)據(jù)傾斜/DAU/Hive流批一體 等

    flink sql 實戰(zhàn)實例 及延伸問題:聚合/數(shù)據(jù)傾斜/DAU/Hive流批一體 等

    ? 需求:上游是一個 kafka 數(shù)據(jù)源,數(shù)據(jù)內(nèi)容是用戶 QQ 等級變化明細(xì)數(shù)據(jù)(time,uid,level)。需要你求出當(dāng)前每個等級的用戶數(shù)。 ? 需求:數(shù)據(jù)源:用戶心跳日志(uid,time,type)。計算分 Android,iOS 的 DAU,最晚一分鐘輸出一次當(dāng)日零點累計到當(dāng)前的結(jié)果。 經(jīng)過測試 在fl

    2024年02月22日
    瀏覽(34)
  • 大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink窗口函數(shù)

    前面指定了窗口的分配器, 接著我們需要來指定如何計算, 這事由window function來負(fù)責(zé). 一旦窗口關(guān)閉, window function 去計算處理窗口中的每個元素. window function 可以是ReduceFunction,AggregateFunction,or ProcessWindowFunction中的任意一種. ReduceFunction,AggregateFunction更加高效, 原因就是Flink可以對

    2024年02月11日
    瀏覽(37)
  • 大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink營銷對賬

    大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink營銷對賬

    在電商網(wǎng)站中,訂單的支付作為直接與營銷收入掛鉤的一環(huán),在業(yè)務(wù)流程中非常重要。對于訂單而言,為了正確控制業(yè)務(wù)流程,也為了增加用戶的支付意愿,網(wǎng)站一般會設(shè)置一個支付失效時間,超過一段時間不支付的訂單就會被取消。另外,對于訂單的支付,我們還應(yīng)保證用

    2024年02月11日
    瀏覽(20)
  • 大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink 容錯機制

    大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink 容錯機制

    在分布式架構(gòu)中,當(dāng)某個節(jié)點出現(xiàn)故障,其他節(jié)點基本不受影響。在 Flink 中,有一套完整的容錯機制,最重要就是檢查點(checkpoint)。 在流處理中,我們可以用存檔讀檔的思路,把之前的計算結(jié)果做個保存,這樣重啟之后就可以繼續(xù)處理新數(shù)據(jù)、而不需要重新計算了。所以

    2024年02月07日
    瀏覽(22)
  • 大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink RedisSink

    大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink RedisSink

    具體版本根據(jù)實際情況確定 參見大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Redis 安裝與使用 可以根據(jù)要寫入的redis的不同數(shù)據(jù)類型進行調(diào)整

    2024年02月13日
    瀏覽(16)
  • 大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink定時器

    基于處理時間或者事件時間處理過一個元素之后, 注冊一個定時器, 然后指定的時間執(zhí)行. Context和OnTimerContext所持有的TimerService對象擁有以下方法: currentProcessingTime(): Long 返回當(dāng)前處理時間 currentWatermark(): Long 返回當(dāng)前watermark的時間戳 registerProcessingTimeTimer(timestamp: Long): Unit 會注

    2024年02月10日
    瀏覽(20)
  • 大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink惡意登錄監(jiān)控

    大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink惡意登錄監(jiān)控

    對于網(wǎng)站而言,用戶登錄并不是頻繁的業(yè)務(wù)操作。如果一個用戶短時間內(nèi)頻繁登錄失敗,就有可能是出現(xiàn)了程序的惡意攻擊,比如密碼暴力破解。 因此我們考慮,應(yīng)該對用戶的登錄失敗動作進行統(tǒng)計,具體來說,如果同一用戶(可以是不同IP)在2秒之內(nèi)連續(xù)兩次登錄失敗,就

    2024年02月07日
    瀏覽(14)
  • 大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink時間滾動動窗口

    大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink時間滾動動窗口

    在流處理應(yīng)用中,數(shù)據(jù)是連續(xù)不斷的,因此我們不可能等到所有數(shù)據(jù)都到了才開始處理。當(dāng)然我們可以每來一個消息就處理一次,但是有時我們需要做一些聚合類的處理,例如:在過去的1分鐘內(nèi)有多少用戶點擊了我們的網(wǎng)頁。在這種情況下,我們必須定義一個窗口,用來收集

    2024年02月11日
    瀏覽(22)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包