一、需求描述
每隔30min 統(tǒng)計最近 1hour的熱門商品 top3, 并把統(tǒng)計的結(jié)果寫入到mysql中。
二、需求分析
- 1.統(tǒng)計每個商品的點擊量, 開窗
- 2.分組窗口分組
- 3.over窗口
三、需求實現(xiàn)
3.1、創(chuàng)建數(shù)據(jù)源示例
input/UserBehavior.csv
543462,1715,1464116,pv,1511658000
662867,2244074,1575622,pv,1511658000
561558,3611281,965809,pv,1511658000
894923,3076029,1879194,pv,1511658000
834377,4541270,3738615,pv,1511658000
315321,942195,4339722,pv,1511658000
625915,1162383,570735,pv,1511658000
578814,176722,982926,pv,1511658000
873335,1256540,1451783,pv,1511658000
429984,4625350,2355072,pv,1511658000
866796,534083,4203730,pv,1511658000
937166,321683,2355072,pv,1511658000
156905,2901727,3001296,pv,1511658000
758810,5109495,1575622,pv,1511658000
107304,111477,4173315,pv,1511658000
452437,3255022,5099474,pv,1511658000
813974,1332724,2520771,buy,1511658000
524395,3887779,2366905,pv,1511658000
3.2、創(chuàng)建目標(biāo)表
CREATE DATABASE flink_sql; //創(chuàng)建flink_sql庫
USE flink_sql;
DROP TABLE IF EXISTS `hot_item`;
CREATE TABLE `hot_item` (
`w_end` timestamp NOT NULL,
`item_id` bigint(20) NOT NULL,
`item_count` bigint(20) NOT NULL,
`rk` bigint(20) NOT NULL,
PRIMARY KEY (`w_end`,`rk`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
3.3、導(dǎo)入JDBC Connector依賴
<!-- 導(dǎo)入JDBC Connector依賴 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
3.4、代碼實現(xiàn)
package com.atguigu.flink.java.chapter_12;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
/**
* @Author lizhenchao@atguigu.cn
* @Date 2021/1/31 9:11
*/
public class Flink01_HotItem_TopN {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
// 使用sql從文件讀取數(shù)據(jù)
tenv.executeSql(
"create table user_behavior(" +
" user_id bigint, " +
" item_id bigint, " +
" category_id int, " +
" behavior string, " +
" ts bigint, " +
" event_time as to_timestamp(from_unixtime(ts, 'yyyy-MM-dd HH:mm:ss')), " +
" watermark for event_time as event_time - interval '5' second " +
")with(" +
" 'connector'='filesystem', " +
" 'path'='input/UserBehavior.csv', " +
" 'format'='csv')"
);
// 每隔 10m 統(tǒng)計一次最近 1h 的熱門商品 top
// 1. 計算每每個窗口內(nèi)每個商品的點擊量
Table t1 = tenv
.sqlQuery(
"select " +
" item_id, " +
" hop_end(event_time, interval '10' minute, interval '1' hour) w_end," +
" count(*) item_count " +
"from user_behavior " +
"where behavior='pv' " +
"group by hop(event_time, interval '10' minute, interval '1' hour), item_id"
);
tenv.createTemporaryView("t1", t1);
// 2. 按照窗口開窗, 對商品點擊量進行排名
Table t2 = tenv.sqlQuery(
"select " +
" *," +
" row_number() over(partition by w_end order by item_count desc) rk " +
"from t1"
);
tenv.createTemporaryView("t2", t2);
// 3. 取 top3
Table t3 = tenv.sqlQuery(
"select " +
" item_id, w_end, item_count, rk " +
"from t2 " +
"where rk<=3"
);
// 4. 數(shù)據(jù)寫入到mysql
// 4.1 創(chuàng)建輸出表
tenv.executeSql("create table hot_item(" +
" item_id bigint, " +
" w_end timestamp(3), " +
" item_count bigint, " +
" rk bigint, " +
" PRIMARY KEY (w_end, rk) NOT ENFORCED)" +
"with(" +
" 'connector' = 'jdbc', " +
" 'url' = 'jdbc:mysql://hadoop162:3306/flink_sql?useSSL=false', " +
" 'table-name' = 'hot_item', " +
" 'username' = 'root', " +
" 'password' = 'aaaaaa' " +
")");
// 4.2 寫入到輸出表
t3.executeInsert("hot_item");
}
}
執(zhí)行結(jié)果:文章來源:http://www.zghlxwxcb.cn/news/detail-728667.html
四、總結(jié)
Flink 使用 OVER 窗口條件和過濾條件相結(jié)合以進行 Top-N 查詢。利用 OVER 窗口的 PARTITION BY 子句的功能,F(xiàn)link 還支持逐組 Top-N 。 例如,每個類別中實時銷量最高的前五種產(chǎn)品。批處理表和流處理表都支持基于SQL的 Top-N 查詢。
流處理模式需注意: TopN 查詢的結(jié)果會帶有更新。 Flink SQL 會根據(jù)排序鍵對輸入的流進行排序;若 top N 的記錄發(fā)生了變化,變化的部分會以撤銷、更新記錄的形式發(fā)送到下游。 推薦使用一個支持更新的存儲作為 Top-N 查詢的 sink 。另外,若 top N 記錄需要存儲到外部存儲,則結(jié)果表需要擁有與 Top-N 查詢相同的唯一鍵。文章來源地址http://www.zghlxwxcb.cn/news/detail-728667.html
到了這里,關(guān)于大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink SQL編程實戰(zhàn) (熱門商品TOP N)的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!