?? 作者 :“大數(shù)據(jù)小禪”
?? 文章簡(jiǎn)介 :Flink 商品銷量統(tǒng)計(jì)-實(shí)戰(zhàn)Bahir Connetor實(shí)戰(zhàn)存儲(chǔ) 數(shù)據(jù)到Redis6.X
?? 歡迎小伙伴們 點(diǎn)贊??、收藏?、留言??
Flink怎么操作Redis
-
Flink怎么操作redis?
- 方式一:自定義sink
- 方式二:使用connector
-
Redis Sink 核心是RedisMapper 是一個(gè)接口,使用時(shí)要編寫自己的redis操作類實(shí)現(xiàn)這個(gè)接口中的三個(gè)方法
- getCommandDescription 選擇對(duì)應(yīng)的數(shù)據(jù)結(jié)構(gòu)和key名稱配置
- getKeyFromData 獲取key
- getValueFromData 獲取value
-
使用
- 添加依賴
<dependency> <groupId>org.apache.bahir</groupId> <artifactId>flink-connector-redis_2.11</artifactId> <version>1.0</version> </dependency>
-
編碼
public class MyRedisSink implements RedisMapper<Tuple2<String, Integer>> { @Override public RedisCommandDescription getCommandDescription() { return new RedisCommandDescription(RedisCommand.HSET, "VIDEO_ORDER_COUNTER"); } @Override public String getKeyFromData(Tuple2<String, Integer> value) { return value.f0; } @Override public String getValueFromData(Tuple2<String, Integer> value) { return value.f1.toString(); } }
Flink 商品銷量統(tǒng)計(jì)-轉(zhuǎn)換-分組-聚合-存儲(chǔ)自定義的Redis Sink實(shí)戰(zhàn)
-
Redis環(huán)境說(shuō)明 redis6
-
使用docker部署redis6.x 看個(gè)人主頁(yè)docker相關(guān)文章
docker run -d -p 6379:6379 redis
-
-
編碼實(shí)戰(zhàn)
數(shù)據(jù)源
public class VideoOrderSource extends RichParallelSourceFunction<VideoOrder> {
private volatile Boolean flag = true;
private Random random = new Random();
private static List<String> list = new ArrayList<>();
static {
list.add("spring boot2.x課程");
list.add("微服務(wù)SpringCloud課程");
list.add("RabbitMQ消息隊(duì)列");
list.add("Kafka課程");
list.add("小滴課堂面試專題第一季");
list.add("Flink流式技術(shù)課程");
list.add("工業(yè)級(jí)微服務(wù)項(xiàng)目大課訓(xùn)練營(yíng)");
list.add("Linux課程");
}
/**
* run 方法調(diào)用前 用于初始化連接
* @param parameters
* @throws Exception
*/
@Override
public void open(Configuration parameters) throws Exception {
System.out.println("-----open-----");
}
/**
* 用于清理之前
* @throws Exception
*/
@Override
public void close() throws Exception {
System.out.println("-----close-----");
}
/**
* 產(chǎn)生數(shù)據(jù)的邏輯
* @param ctx
* @throws Exception
*/
@Override
public void run(SourceContext<VideoOrder> ctx) throws Exception {
while (flag){
Thread.sleep(1000);
String id = UUID.randomUUID().toString();
int userId = random.nextInt(10);
int money = random.nextInt(100);
int videoNum = random.nextInt(list.size());
String title = list.get(videoNum);
VideoOrder videoOrder = new VideoOrder(id,title,money,userId,new Date());
ctx.collect(videoOrder);
}
}
/**
* 控制任務(wù)取消
*/
@Override
public void cancel() {
flag = false;
}
}
保存的格式與存取的方法
public class VideoOrderCounterSink implements RedisMapper<Tuple2<String, Integer>> {
/***
* 選擇需要用到的命令,和key名稱
* @return
*/
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.HSET, "VIDEO_ORDER_COUNTER");
}
/**
* 獲取對(duì)應(yīng)的key或者filed
*
* @param data
* @return
*/
@Override
public String getKeyFromData(Tuple2<String, Integer> data) {
System.out.println("getKeyFromData=" + data.f0);
return data.f0;
}
/**
* 獲取對(duì)應(yīng)的值
*
* @param data
* @return
*/
@Override
public String getValueFromData(Tuple2<String, Integer> data) {
System.out.println("getValueFromData=" + data.f1.toString());
return data.f1.toString();
}
}
落地文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-742374.html
public class Flink07RedisSinkApp {
/**
* source
* transformation
* sink
*
* @param args
*/
public static void main(String[] args) throws Exception {
//構(gòu)建執(zhí)行任務(wù)環(huán)境以及任務(wù)的啟動(dòng)的入口, 存儲(chǔ)全局相關(guān)的參數(shù)
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
env.setParallelism(1);
//數(shù)據(jù)源 source
// DataStream<VideoOrder> ds = env.fromElements(
// new VideoOrder("21312","java",32,5,new Date()),
// new VideoOrder("314","java",32,5,new Date()),
// new VideoOrder("542","springboot",32,5,new Date()),
// new VideoOrder("42","redis",32,5,new Date()),
// new VideoOrder("4252","java",32,5,new Date()),
// new VideoOrder("42","springboot",32,5,new Date()),
// new VideoOrder("554232","flink",32,5,new Date()),
// new VideoOrder("23323","java",32,5,new Date())
// );
DataStream<VideoOrder> ds = env.addSource(new VideoOrderSource());
//transformation
DataStream<Tuple2<String,Integer>> mapDS = ds.map(new MapFunction<VideoOrder, Tuple2<String,Integer>>() {
@Override
public Tuple2<String, Integer> map(VideoOrder value) throws Exception {
return new Tuple2<>(value.getTitle(),1);
}
});
// DataStream<Tuple2<String,Integer>> mapDS = ds.flatMap(new FlatMapFunction<VideoOrder, Tuple2<String,Integer>>() {
// @Override
// public void flatMap(VideoOrder value, Collector<Tuple2<String, Integer>> out) throws Exception {
// out.collect(new Tuple2<>(value.getTitle(),1));
// }
// });
//分組
KeyedStream<Tuple2<String,Integer>,String> keyByDS = mapDS.keyBy(new KeySelector<Tuple2<String,Integer>, String>() {
@Override
public String getKey(Tuple2<String, Integer> value) throws Exception {
return value.f0;
}
});
//統(tǒng)計(jì)每組有多少個(gè)
DataStream<Tuple2<String,Integer>> sumDS = keyByDS.sum(1);
//控制臺(tái)打印
sumDS.print();
//單機(jī)redis
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost("127.0.0.1").setPort(6379).build();
sumDS.addSink(new RedisSink<>(conf,new VideoOrderCounterSink()));
//DataStream需要調(diào)用execute,可以取個(gè)名稱
env.execute("custom redis sink job");
}
}
文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-742374.html
到了這里,關(guān)于【Flink實(shí)戰(zhàn)】Flink 商品銷量統(tǒng)計(jì)-實(shí)戰(zhàn)Bahir Connetor實(shí)戰(zhàn)存儲(chǔ) 數(shù)據(jù)到Redis6.X的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!