一、添加Redis Connector依賴
具體版本根據(jù)實際情況確定
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.1.5</version>
</dependency>
二、啟動redis
參見大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Redis 安裝與使用
三、編寫代碼
package com.lyh.flink06;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
public class SinkRedis {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
DataStreamSource<Integer> dataStreamSource = env.fromElements(1, 2, 3, 4, 5, 6);
KeyedStream<Integer, Integer> keyedStream = dataStreamSource.keyBy(new KeySelector<Integer, Integer>() {
@Override
public Integer getKey(Integer key) throws Exception {
return key.intValue();
}
});
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder()
.setHost("hadoop100")
.setPort(6379)
.setMaxTotal(100)
.setMaxIdle(10)
.setMinIdle(2)
.setTimeout(10*1000)
.setDatabase(0)
.setPassword("redis")
.build();
keyedStream.addSink(new RedisSink<>(conf, new RedisMapper<Integer>() {
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.SET);
}
@Override
public String getKeyFromData(Integer integer) {
return integer.toString();
}
@Override
public String getValueFromData(Integer integer) {
return integer.toString();
}
}));
env.execute();
}
}
可以根據(jù)要寫入的redis的不同數(shù)據(jù)類型進(jìn)行調(diào)整
四、查詢結(jié)果
文章來源:http://www.zghlxwxcb.cn/news/detail-644878.html
文章來源地址http://www.zghlxwxcb.cn/news/detail-644878.html
到了這里,關(guān)于大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Flink RedisSink的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!