国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

【Flink實(shí)戰(zhàn)】Flink 商品銷量統(tǒng)計(jì)-實(shí)戰(zhàn)Bahir Connetor實(shí)戰(zhàn)存儲(chǔ) 數(shù)據(jù)到Redis6.X

這篇具有很好參考價(jià)值的文章主要介紹了【Flink實(shí)戰(zhàn)】Flink 商品銷量統(tǒng)計(jì)-實(shí)戰(zhàn)Bahir Connetor實(shí)戰(zhàn)存儲(chǔ) 數(shù)據(jù)到Redis6.X。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

?? 作者 :“大數(shù)據(jù)小禪”

?? 文章簡(jiǎn)介 :Flink 商品銷量統(tǒng)計(jì)-實(shí)戰(zhàn)Bahir Connetor實(shí)戰(zhàn)存儲(chǔ) 數(shù)據(jù)到Redis6.X

?? 歡迎小伙伴們 點(diǎn)贊??、收藏?、留言??


Flink怎么操作Redis

  • Flink怎么操作redis?

    • 方式一:自定義sink
    • 方式二:使用connector
  • Redis Sink 核心是RedisMapper 是一個(gè)接口,使用時(shí)要編寫自己的redis操作類實(shí)現(xiàn)這個(gè)接口中的三個(gè)方法

    • getCommandDescription 選擇對(duì)應(yīng)的數(shù)據(jù)結(jié)構(gòu)和key名稱配置
    • getKeyFromData 獲取key
    • getValueFromData 獲取value
  • 使用

    • 添加依賴
    <dependency>
        <groupId>org.apache.bahir</groupId>
        <artifactId>flink-connector-redis_2.11</artifactId>
        <version>1.0</version>
    </dependency>
    
  • 編碼

    public class MyRedisSink implements RedisMapper<Tuple2<String, Integer>> {
        @Override
        public RedisCommandDescription getCommandDescription() {
            return new RedisCommandDescription(RedisCommand.HSET, "VIDEO_ORDER_COUNTER");
        }
    
        @Override
        public String getKeyFromData(Tuple2<String, Integer> value) {
            return value.f0;
        }
    
        @Override
        public String getValueFromData(Tuple2<String, Integer> value) {
            return value.f1.toString();
        }
    }
    

Flink 商品銷量統(tǒng)計(jì)-轉(zhuǎn)換-分組-聚合-存儲(chǔ)自定義的Redis Sink實(shí)戰(zhàn)

  • Redis環(huán)境說(shuō)明 redis6

    • 使用docker部署redis6.x 看個(gè)人主頁(yè)docker相關(guān)文章

      docker run -d  -p 6379:6379 redis
      
  • 編碼實(shí)戰(zhàn)

數(shù)據(jù)源

public class VideoOrderSource extends RichParallelSourceFunction<VideoOrder> {


    private volatile Boolean flag = true;

    private Random random = new Random();

    private static List<String> list = new ArrayList<>();
    static {
        list.add("spring boot2.x課程");
        list.add("微服務(wù)SpringCloud課程");
        list.add("RabbitMQ消息隊(duì)列");
        list.add("Kafka課程");
        list.add("小滴課堂面試專題第一季");
        list.add("Flink流式技術(shù)課程");
        list.add("工業(yè)級(jí)微服務(wù)項(xiàng)目大課訓(xùn)練營(yíng)");
        list.add("Linux課程");
    }


    /**
     * run 方法調(diào)用前 用于初始化連接
     * @param parameters
     * @throws Exception
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        System.out.println("-----open-----");
    }

    /**
     * 用于清理之前
     * @throws Exception
     */
    @Override
    public void close() throws Exception {
        System.out.println("-----close-----");
    }


    /**
     * 產(chǎn)生數(shù)據(jù)的邏輯
     * @param ctx
     * @throws Exception
     */
    @Override
    public void run(SourceContext<VideoOrder> ctx) throws Exception {

        while (flag){
            Thread.sleep(1000);
            String id = UUID.randomUUID().toString();
            int userId = random.nextInt(10);
            int money = random.nextInt(100);
            int videoNum = random.nextInt(list.size());
            String title = list.get(videoNum);
            VideoOrder videoOrder = new VideoOrder(id,title,money,userId,new Date());

            ctx.collect(videoOrder);
        }


    }

    /**
     * 控制任務(wù)取消
     */
    @Override
    public void cancel() {

        flag = false;
    }
}

保存的格式與存取的方法

public class VideoOrderCounterSink implements RedisMapper<Tuple2<String, Integer>> {


    /***
     * 選擇需要用到的命令,和key名稱
     * @return
     */
    @Override
    public RedisCommandDescription getCommandDescription() {
        return new RedisCommandDescription(RedisCommand.HSET, "VIDEO_ORDER_COUNTER");
    }

    /**
     * 獲取對(duì)應(yīng)的key或者filed
     *
     * @param data
     * @return
     */
    @Override
    public String getKeyFromData(Tuple2<String, Integer> data) {

        System.out.println("getKeyFromData=" + data.f0);
        return data.f0;
    }

    /**
     * 獲取對(duì)應(yīng)的值
     *
     * @param data
     * @return
     */
    @Override
    public String getValueFromData(Tuple2<String, Integer> data) {
        System.out.println("getValueFromData=" + data.f1.toString());
        return data.f1.toString();
    }
}

落地

public class Flink07RedisSinkApp {

    /**
     * source
     * transformation
     * sink
     *
     * @param args
     */
    public static void main(String[] args) throws Exception {

        //構(gòu)建執(zhí)行任務(wù)環(huán)境以及任務(wù)的啟動(dòng)的入口, 存儲(chǔ)全局相關(guān)的參數(shù)
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
        env.setParallelism(1);

        //數(shù)據(jù)源 source
//        DataStream<VideoOrder> ds = env.fromElements(
//                new VideoOrder("21312","java",32,5,new Date()),
//                new VideoOrder("314","java",32,5,new Date()),
//                new VideoOrder("542","springboot",32,5,new Date()),
//                new VideoOrder("42","redis",32,5,new Date()),
//                new VideoOrder("4252","java",32,5,new Date()),
//                new VideoOrder("42","springboot",32,5,new Date()),
//                new VideoOrder("554232","flink",32,5,new Date()),
//                new VideoOrder("23323","java",32,5,new Date())
//        );
        DataStream<VideoOrder> ds = env.addSource(new VideoOrderSource());



        //transformation
       DataStream<Tuple2<String,Integer>> mapDS =  ds.map(new MapFunction<VideoOrder, Tuple2<String,Integer>>() {
            @Override
            public Tuple2<String, Integer> map(VideoOrder value) throws Exception {
                return new Tuple2<>(value.getTitle(),1);
            }
        });



//        DataStream<Tuple2<String,Integer>> mapDS = ds.flatMap(new FlatMapFunction<VideoOrder, Tuple2<String,Integer>>() {
//            @Override
//            public void flatMap(VideoOrder value, Collector<Tuple2<String, Integer>> out) throws Exception {
//                out.collect(new Tuple2<>(value.getTitle(),1));
//            }
//        });


       //分組
        KeyedStream<Tuple2<String,Integer>,String> keyByDS = mapDS.keyBy(new KeySelector<Tuple2<String,Integer>, String>() {
            @Override
            public String getKey(Tuple2<String, Integer> value) throws Exception {
                return value.f0;
            }
        });

        //統(tǒng)計(jì)每組有多少個(gè)
        DataStream<Tuple2<String,Integer>> sumDS =  keyByDS.sum(1);

        //控制臺(tái)打印
        sumDS.print();

        //單機(jī)redis
        FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").setPort(6379).build();

        sumDS.addSink(new RedisSink<>(conf,new VideoOrderCounterSink()));


        //DataStream需要調(diào)用execute,可以取個(gè)名稱
        env.execute("custom redis sink job");
    }

}

【Flink實(shí)戰(zhàn)】Flink 商品銷量統(tǒng)計(jì)-實(shí)戰(zhàn)Bahir Connetor實(shí)戰(zhàn)存儲(chǔ) 數(shù)據(jù)到Redis6.X,Flink,flink,大數(shù)據(jù),原力計(jì)劃文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-742374.html

到了這里,關(guān)于【Flink實(shí)戰(zhàn)】Flink 商品銷量統(tǒng)計(jì)-實(shí)戰(zhàn)Bahir Connetor實(shí)戰(zhàn)存儲(chǔ) 數(shù)據(jù)到Redis6.X的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • python數(shù)據(jù)分析之產(chǎn)品銷量時(shí)序分析與商品關(guān)聯(lián)分析

    python數(shù)據(jù)分析之產(chǎn)品銷量時(shí)序分析與商品關(guān)聯(lián)分析

    這是我們之前的課后作業(yè),根據(jù)自己的想法對(duì)這個(gè)數(shù)據(jù)進(jìn)行分析,只要求寫出五個(gè)點(diǎn)出來(lái)就可以了,因此我就對(duì)這些數(shù)據(jù)進(jìn)行了分析一番。涉及的python知識(shí)點(diǎn)還是挺多的,包括了python連接數(shù)據(jù)庫(kù),SQL提取數(shù)據(jù)并保存為csv格式,pandas處理數(shù)據(jù),matplotlib畫圖以及購(gòu)物籃分析與關(guān)聯(lián)

    2024年02月07日
    瀏覽(23)
  • 淘寶商品API使用示例:如何通過(guò)調(diào)用外部API來(lái)獲取淘寶商品價(jià)格銷量主圖詳情數(shù)據(jù)

    淘寶商品API使用示例:如何通過(guò)調(diào)用外部API來(lái)獲取淘寶商品價(jià)格銷量主圖詳情數(shù)據(jù)

    淘寶上的商品信息量非常之大,商品的詳情信息也很齊全。如何通過(guò)調(diào)用外部API來(lái)實(shí)現(xiàn)批量獲取商品價(jià)格銷量主圖詳情等信息呢?上周剛好完成了一個(gè)完整的淘寶商品采集項(xiàng)目,今天特來(lái)分享一下。 接口名稱:item_get 請(qǐng)求地址:https://api-test.cn/taobao/item_get result_type:[json,jso

    2024年02月10日
    瀏覽(30)
  • 電商數(shù)據(jù)平臺(tái)西域根據(jù)ID取商品詳情API接口采集產(chǎn)品詳情數(shù)據(jù)、價(jià)格 、銷量數(shù)據(jù)操作指南

    電商數(shù)據(jù)平臺(tái)西域根據(jù)ID取商品詳情API接口采集產(chǎn)品詳情數(shù)據(jù)、價(jià)格 、銷量數(shù)據(jù)操作指南

    公共參數(shù) 請(qǐng)求地址: 注冊(cè)調(diào)用key請(qǐng)求接入 名稱 類型 必須 描述 key String 是 調(diào)用key(必須以GET方式拼接在URL中) secret String 是 調(diào)用密鑰 api_name String 是 API接口名稱(包括在請(qǐng)求地址中)[item_search,item_get,item_search_shop等] cache String 否 [yes,no]默認(rèn)yes,將調(diào)用緩存的數(shù)據(jù),速度比較

    2024年02月07日
    瀏覽(32)
  • 快手商品詳情商品價(jià)格、銷量、庫(kù)存、sku信息

    ? ? ?商品詳情數(shù)據(jù)API是用來(lái)獲取快手商品詳情頁(yè)數(shù)據(jù)的接口,請(qǐng)求參數(shù)為商品ID,這是每個(gè)商品唯一性的標(biāo)識(shí)。返回參數(shù)有商品標(biāo)題、商品標(biāo)題、商品簡(jiǎn)介、價(jià)格、掌柜昵稱、庫(kù)存、寶貝鏈接、寶貝圖片、商品SKU等 公共參數(shù) 請(qǐng)求地址:?獲取key和密鑰 名稱 類型 必須 描述 k

    2024年01月23日
    瀏覽(20)
  • 淘寶APP商品詳情接口(商品信息,價(jià)格銷量,優(yōu)惠券信息,詳情圖等)

    淘寶APP商品詳情接口(商品信息,價(jià)格銷量,優(yōu)惠券信息,詳情圖等)

    淘寶APP商品詳情接口(商品信息接口,價(jià)格銷量接口,優(yōu)惠券信息接口,詳情圖接口等)代碼對(duì)接如下: 公共參數(shù) 名稱 類型 必須 描述 key String 是 調(diào)用key(必須以GET方式拼接在URL中),點(diǎn)擊獲取請(qǐng)key和secret secret String 是 調(diào)用密鑰 api_name String 是 API接口名稱(包括在請(qǐng)求地址

    2024年02月12日
    瀏覽(28)
  • 淘寶/天貓獲取商品銷量詳情 API 返回值說(shuō)明

    淘寶/天貓獲取商品銷量詳情 API 返回值說(shuō)明

    taobao.item_get_sales 公共參數(shù) 名稱 類型 必須 描述 key String 是 調(diào)用key(必須以GET方式拼接在URL中) secret String 是 調(diào)用密鑰 api_name String 是 API接口名稱(包括在請(qǐng)求地址中)[item_search,item_get,item_search_shop等] cache String 否 [yes,no]默認(rèn)yes,將調(diào)用緩存的數(shù)據(jù),速度比較快 result_type St

    2024年02月09日
    瀏覽(34)
  • item_get_sales-獲取商品銷量詳情

    item_get_sales-獲取商品銷量詳情

    一、接口參數(shù)說(shuō)明: item_get_sales-獲取商品銷量詳情,點(diǎn)擊更多API調(diào)試,請(qǐng)移步注冊(cè)API賬號(hào)點(diǎn)擊獲取測(cè)試key和secret 公共參數(shù) 請(qǐng)求地址: https://api-gw.onebound.cn/taobao/item_get_sales 名稱 類型 必須 描述 key String 是 調(diào)用key(點(diǎn)擊獲取測(cè)試key和secret) secret String 是 調(diào)用密鑰 api_name String

    2024年02月13日
    瀏覽(20)
  • python 爬蟲(chóng)某東網(wǎng)商品信息 | 沒(méi)想到銷量最高的是

    python 爬蟲(chóng)某東網(wǎng)商品信息 | 沒(méi)想到銷量最高的是

    哈嘍大家好,我是咸魚 好久沒(méi)更新 python 爬蟲(chóng)相關(guān)的文章了,今天我們使用 selenium 模塊來(lái)簡(jiǎn)單寫個(gè)爬蟲(chóng)程序——爬取某東網(wǎng)商品信息 網(wǎng)址鏈接:https://www.jd.com/ 完整源碼在文章最后 我們需要找到網(wǎng)頁(yè)上元素的位置信息(xpth 路徑) 我們首先需要知道搜索框和搜索按鈕的位置

    2024年02月08日
    瀏覽(32)
  • 爬蟲(chóng)之牛刀小試(十):爬取某寶手機(jī)商品的銷量,價(jià)格和店鋪

    爬蟲(chóng)之牛刀小試(十):爬取某寶手機(jī)商品的銷量,價(jià)格和店鋪

    首先淘寶需要登錄,這一點(diǎn)如果用selenium如何解決,只能手動(dòng)登錄?如果不用selenium,用cookies登錄也可。但是驗(yàn)證碼又是一個(gè)問(wèn)題,現(xiàn)在的驗(yàn)證碼五花八門,難以處理。 我們回到正題,假設(shè)你已經(jīng)登錄上淘寶了,接著我們需要找到輸入框和搜索按鈕,輸入“手機(jī)”,點(diǎn)擊搜索

    2024年04月10日
    瀏覽(21)
  • 電商API接口的應(yīng)用||大數(shù)據(jù)電商數(shù)倉(cāng)分析項(xiàng)目||電商熱門商品統(tǒng)計(jì)

    如何定義熱門商品? 簡(jiǎn)單模型:直接通過(guò)用戶對(duì)商品的點(diǎn)擊量來(lái)衡量商品熱度。 復(fù)雜模型:依據(jù)各類別權(quán)重(后續(xù)補(bǔ)充) 如何獲取區(qū)域? 通過(guò)用戶點(diǎn)擊日志,獲取訪問(wèn)IP,進(jìn)而獲取區(qū)域信息。 通過(guò)數(shù)據(jù)庫(kù)中的訂單關(guān)聯(lián)用戶表,獲取用戶的地域信息 如何去除爬蟲(chóng)水軍(商家

    2024年04月28日
    瀏覽(27)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包