国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink RedisSink

這篇具有很好參考價值的文章主要介紹了大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink RedisSink。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

一、添加Redis Connector依賴

具體版本根據(jù)實際情況確定

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-redis_2.11</artifactId>
    <version>1.1.5</version>
</dependency>

二、啟動redis

參見大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Redis 安裝與使用

三、編寫代碼

package com.lyh.flink06;

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;

public class SinkRedis {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);
        DataStreamSource<Integer> dataStreamSource = env.fromElements(1, 2, 3, 4, 5, 6);
        KeyedStream<Integer, Integer> keyedStream = dataStreamSource.keyBy(new KeySelector<Integer, Integer>() {
            @Override
            public Integer getKey(Integer key) throws Exception {
                return key.intValue();
            }
        });
        FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder()
                .setHost("hadoop100")
                .setPort(6379)
                .setMaxTotal(100)
                .setMaxIdle(10)
                .setMinIdle(2)
                .setTimeout(10*1000)
                .setDatabase(0)
                .setPassword("redis")
                .build();


        keyedStream.addSink(new RedisSink<>(conf, new RedisMapper<Integer>() {
            @Override
            public RedisCommandDescription getCommandDescription() {
                return new RedisCommandDescription(RedisCommand.SET);
            }

            @Override
            public String getKeyFromData(Integer integer) {
                return integer.toString();
            }

            @Override
            public String getValueFromData(Integer integer) {
                return integer.toString();
            }
        }));
        env.execute();
    }
}

可以根據(jù)要寫入的redis的不同數(shù)據(jù)類型進(jìn)行調(diào)整

四、查詢結(jié)果

大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink RedisSink,大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-FLINK,大數(shù)據(jù),flink
大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink RedisSink,大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-FLINK,大數(shù)據(jù),flink

大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink RedisSink,大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-FLINK,大數(shù)據(jù),flink文章來源地址http://www.zghlxwxcb.cn/news/detail-644878.html

到了這里,關(guān)于大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink RedisSink的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink定時器

    基于處理時間或者事件時間處理過一個元素之后, 注冊一個定時器, 然后指定的時間執(zhí)行. Context和OnTimerContext所持有的TimerService對象擁有以下方法: currentProcessingTime(): Long 返回當(dāng)前處理時間 currentWatermark(): Long 返回當(dāng)前watermark的時間戳 registerProcessingTimeTimer(timestamp: Long): Unit 會注

    2024年02月10日
    瀏覽(20)
  • 大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink惡意登錄監(jiān)控

    大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink惡意登錄監(jiān)控

    對于網(wǎng)站而言,用戶登錄并不是頻繁的業(yè)務(wù)操作。如果一個用戶短時間內(nèi)頻繁登錄失敗,就有可能是出現(xiàn)了程序的惡意攻擊,比如密碼暴力破解。 因此我們考慮,應(yīng)該對用戶的登錄失敗動作進(jìn)行統(tǒng)計,具體來說,如果同一用戶(可以是不同IP)在2秒之內(nèi)連續(xù)兩次登錄失敗,就

    2024年02月07日
    瀏覽(14)
  • 大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink狀態(tài)編程(上)

    大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink狀態(tài)編程(上)

    有狀態(tài)的計算是流處理框架要實現(xiàn)的重要功能,因為稍復(fù)雜的流處理場景都需要記錄狀態(tài),然后在新流入數(shù)據(jù)的基礎(chǔ)上不斷更新狀態(tài)。 SparkStreaming在狀態(tài)管理這塊做的不好, 很多時候需要借助于外部存儲(例如Redis)來手動管理狀態(tài), 增加了編程的難度。 Flink的狀態(tài)管理是它的優(yōu)

    2024年02月09日
    瀏覽(92)
  • 大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink時間滾動動窗口

    大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink時間滾動動窗口

    在流處理應(yīng)用中,數(shù)據(jù)是連續(xù)不斷的,因此我們不可能等到所有數(shù)據(jù)都到了才開始處理。當(dāng)然我們可以每來一個消息就處理一次,但是有時我們需要做一些聚合類的處理,例如:在過去的1分鐘內(nèi)有多少用戶點擊了我們的網(wǎng)頁。在這種情況下,我們必須定義一個窗口,用來收集

    2024年02月11日
    瀏覽(22)
  • 大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink狀態(tài)后端(下)

    大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink狀態(tài)后端(下)

    每傳入一條數(shù)據(jù),有狀態(tài)的算子任務(wù)都會讀取和更新狀態(tài)。由于有效的狀態(tài)訪問對于處理數(shù)據(jù)的低延遲至關(guān)重要,因此每個并行任務(wù)(子任務(wù))都會在本地維護(hù)其狀態(tài),以確保快速的狀態(tài)訪問。 狀態(tài)的存儲、訪問以及維護(hù),由一個可插入的組件決定,這個組件就叫做狀態(tài)后端(

    2024年02月09日
    瀏覽(21)
  • 大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink-Transform

    大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink-Transform

    轉(zhuǎn)換算子可以把一個或多個DataStream轉(zhuǎn)成一個新的DataStream.程序可以把多個復(fù)雜的轉(zhuǎn)換組合成復(fù)雜的數(shù)據(jù)流拓?fù)? 2.1、map(映射) 將數(shù)據(jù)流中的數(shù)據(jù)進(jìn)行轉(zhuǎn)換, 形成新的數(shù)據(jù)流,消費(fèi)一個元素并產(chǎn)出一個元素 2.2、filter(過濾) 根據(jù)指定的規(guī)則將滿足條件(true)的數(shù)據(jù)保留,不

    2024年02月13日
    瀏覽(18)
  • 大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink 網(wǎng)站UV統(tǒng)計

    在實際應(yīng)用中,我們往往會關(guān)注,到底有多少不同的用戶訪問了網(wǎng)站,所以另外一個統(tǒng)計流量的重要指標(biāo)是網(wǎng)站的獨(dú)立訪客數(shù)(Unique Visitor,UV)。 對于UserBehavior數(shù)據(jù)源來說,我們直接可以根據(jù)userId來區(qū)分不同的用戶。 將userid放到SET集合里面,統(tǒng)計集合長度,便可以統(tǒng)計到網(wǎng)

    2024年02月11日
    瀏覽(26)
  • 大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-FLINK-從kafka消費(fèi)數(shù)據(jù)

    大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Kafka安裝 運(yùn)行本段代碼,等待kafka產(chǎn)生數(shù)據(jù)進(jìn)行消費(fèi)。

    2024年02月14日
    瀏覽(23)
  • 大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink 海量數(shù)據(jù)實時去重

    大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink 海量數(shù)據(jù)實時去重

    大數(shù)據(jù)|阿里實時計算|Flink 借助redis的Set,需要頻繁連接Redis,如果數(shù)據(jù)量過大, 對redis的內(nèi)存也是一種壓力;使用Flink的MapState,如果數(shù)據(jù)量過大, 狀態(tài)后端最好選擇 RocksDBStateBackend; 使用布隆過濾器,布隆過濾器可以大大減少存儲的數(shù)據(jù)的數(shù)據(jù)量。 如果想判斷一個元素是不

    2024年02月07日
    瀏覽(20)
  • 大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink-Transform(上)

    大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink-Transform(上)

    轉(zhuǎn)換算子可以把一個或多個DataStream轉(zhuǎn)成一個新的DataStream.程序可以把多個復(fù)雜的轉(zhuǎn)換組合成復(fù)雜的數(shù)據(jù)流拓?fù)? 2.1、map(映射) 將數(shù)據(jù)流中的數(shù)據(jù)進(jìn)行轉(zhuǎn)換, 形成新的數(shù)據(jù)流,消費(fèi)一個元素并產(chǎn)出一個元素 2.2、filter(過濾) 根據(jù)指定的規(guī)則將滿足條件(true)的數(shù)據(jù)保留,不

    2024年02月14日
    瀏覽(17)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包