1.前言
目前,很多 flink相關(guān)的書(shū)籍和網(wǎng)上的文章講解如何對(duì)接 kafka時(shí)都是使用的 FlinkKafkaConsumer,如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
//指定kafka的Broker地址
properties.setProperty("bootstrap.servers", "192.168.xx.xx:9092");
//指定組ID
properties.setProperty("group.id", "flink-demo");
//如果沒(méi)有記錄偏移量,第一次從最開(kāi)始消費(fèi)
properties.setProperty("auto.offset.reset", "earliest");
FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>("topic005", new SimpleStringSchema(), properties);
新版的 flink,比如 1.14.3已經(jīng)將 FlinkKafkaConsumer標(biāo)記為 deprecated(不推薦),如下:
'org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer' is deprecated
2.如何使用 KafkaSource
新版本的 flink應(yīng)該使用 KafkaSource來(lái)消費(fèi) kafka中的數(shù)據(jù),詳細(xì)代碼如下:文章來(lái)源:http://www.zghlxwxcb.cn/news/detail-606351.html
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSource<String> kfkSource = KafkaSource.<String>builder()
.setBootstrapServers("192.168.xx.xx:9092")
.setGroupId("flink-demo")
.setTopics("topic005")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
DataStreamSource<String> lines = env.fromSource(kfkSource, WatermarkStrategy.noWatermarks(), "kafka source");
3.總結(jié)
開(kāi)發(fā)者在工作中應(yīng)該盡量避免使用被標(biāo)記為 deprecated的方法或者類,要及時(shí)進(jìn)行更換,保持代碼的活力。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-606351.html
到了這里,關(guān)于Flink使用 KafkaSource消費(fèi) Kafka中的數(shù)據(jù)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!