国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Flink使用 KafkaSource消費(fèi) Kafka中的數(shù)據(jù)

這篇具有很好參考價(jià)值的文章主要介紹了Flink使用 KafkaSource消費(fèi) Kafka中的數(shù)據(jù)。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

1.前言

目前,很多 flink相關(guān)的書(shū)籍和網(wǎng)上的文章講解如何對(duì)接 kafka時(shí)都是使用的 FlinkKafkaConsumer,如下:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

Properties properties = new Properties();
//指定kafka的Broker地址
properties.setProperty("bootstrap.servers", "192.168.xx.xx:9092");
//指定組ID
properties.setProperty("group.id", "flink-demo");
//如果沒(méi)有記錄偏移量,第一次從最開(kāi)始消費(fèi)
properties.setProperty("auto.offset.reset", "earliest");

FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>("topic005", new SimpleStringSchema(), properties);

新版的 flink,比如 1.14.3已經(jīng)將 FlinkKafkaConsumer標(biāo)記為 deprecated(不推薦),如下:

'org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer' is deprecated 

2.如何使用 KafkaSource

新版本的 flink應(yīng)該使用 KafkaSource來(lái)消費(fèi) kafka中的數(shù)據(jù),詳細(xì)代碼如下:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
KafkaSource<String> kfkSource = KafkaSource.<String>builder()
    .setBootstrapServers("192.168.xx.xx:9092")
    .setGroupId("flink-demo")
    .setTopics("topic005")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();
DataStreamSource<String> lines = env.fromSource(kfkSource, WatermarkStrategy.noWatermarks(), "kafka source");

3.總結(jié)

開(kāi)發(fā)者在工作中應(yīng)該盡量避免使用被標(biāo)記為 deprecated的方法或者類,要及時(shí)進(jìn)行更換,保持代碼的活力。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-606351.html

到了這里,關(guān)于Flink使用 KafkaSource消費(fèi) Kafka中的數(shù)據(jù)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • flink正常消費(fèi)kafka數(shù)據(jù),flink沒(méi)有做checkpoint,kafka位點(diǎn)沒(méi)有提交

    1、背景 flink消費(fèi)kafka數(shù)據(jù),多并發(fā),實(shí)現(xiàn)雙流join 2、現(xiàn)象 (1)flink任務(wù)消費(fèi)kafka數(shù)據(jù),其中數(shù)據(jù)正常消費(fèi),kafka顯示消息堆積,位點(diǎn)沒(méi)有提交,并且flink任務(wù)沒(méi)有做checkpoint (2)其中一個(gè)流的subtask顯示finished (3)無(wú)背壓 3、問(wèn)題原因 (1)其中一個(gè)topic分區(qū)為1 (2)配置的并行

    2024年02月13日
    瀏覽(22)
  • 使用Flink處理Kafka中的數(shù)據(jù)

    目錄 ????????使用Flink處理Kafka中的數(shù)據(jù) 前提: ?一,?使用Flink消費(fèi)Kafka中ProduceRecord主題的數(shù)據(jù) 具體代碼為(scala) 執(zhí)行結(jié)果 二, 使用Flink消費(fèi)Kafka中ChangeRecord主題的數(shù)據(jù)? ?????????具體代碼(scala) ????????????????具體執(zhí)行代碼① ? ? ? ????????? 重要邏

    2024年01月23日
    瀏覽(20)
  • 輕松通關(guān)Flink第24講:Flink 消費(fèi) Kafka 數(shù)據(jù)業(yè)務(wù)開(kāi)發(fā)

    在上一課時(shí)中我們提過(guò)在實(shí)時(shí)計(jì)算的場(chǎng)景下,絕大多數(shù)的數(shù)據(jù)源都是消息系統(tǒng),而 Kafka 從眾多的消息中間件中脫穎而出,主要是因?yàn)?高吞吐 、 低延遲 的特點(diǎn);同時(shí)也講了 Flink 作為生產(chǎn)者像 Kafka 寫(xiě)入數(shù)據(jù)的方式和代碼實(shí)現(xiàn)。這一課時(shí)我們將從以下幾個(gè)方面介紹 Flink 消費(fèi)

    2024年02月08日
    瀏覽(26)
  • 大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-FLINK-從kafka消費(fèi)數(shù)據(jù)

    大數(shù)據(jù)-玩轉(zhuǎn)數(shù)據(jù)-Kafka安裝 運(yùn)行本段代碼,等待kafka產(chǎn)生數(shù)據(jù)進(jìn)行消費(fèi)。

    2024年02月14日
    瀏覽(23)
  • 流批一體計(jì)算引擎-4-[Flink]消費(fèi)kafka實(shí)時(shí)數(shù)據(jù)

    流批一體計(jì)算引擎-4-[Flink]消費(fèi)kafka實(shí)時(shí)數(shù)據(jù)

    Python3.6.9 Flink 1.15.2消費(fèi)Kafaka Topic PyFlink基礎(chǔ)應(yīng)用之kafka 通過(guò)PyFlink作業(yè)處理Kafka數(shù)據(jù) PyFlink需要特定的Python版本,Python 3.6, 3.7, 3.8 or 3.9。 1.3.1 python3和pip3的配置 一、系統(tǒng)中安裝了多個(gè)版本的python3 。 二、環(huán)境變量path作用順序 三、安裝Pyflink 1.3.2 配置Flink Kafka連接 (1)在https://mvnr

    2024年02月06日
    瀏覽(35)
  • Idea本地跑flink任務(wù)時(shí),總是重復(fù)消費(fèi)kafka的數(shù)據(jù)(kafka->mysql)

    Idea本地跑flink任務(wù)時(shí),總是重復(fù)消費(fèi)kafka的數(shù)據(jù)(kafka->mysql)

    1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 Idea中執(zhí)行任務(wù)時(shí),沒(méi)法看到JobManager的錯(cuò)誤,以至于我以為是什么特殊的原因?qū)е氯蝿?wù)總是反復(fù)消費(fèi)。在close方法中,增加日志,發(fā)現(xiàn)jdbc連接被關(guān)閉了。 重新消費(fèi),jdbc連接又啟動(dòng)了。 注意,在Flink的函數(shù)中,open和close方法

    2024年02月07日
    瀏覽(26)
  • flink1.16使用消費(fèi)/生產(chǎn)kafka之DataStream

    flink高級(jí)版本后,消費(fèi)kafka數(shù)據(jù)一種是Datastream 一種之tableApi。 上官網(wǎng)?Kafka | Apache Flink 引入依賴 flink和kafka的連接器,里面內(nèi)置了kafka-client 使用方法 很簡(jiǎn)單一目了然。 topic和partition ?反序列化 其實(shí)就是實(shí)現(xiàn)接口?DeserializationSchema 的deserialize()方法 把byte轉(zhuǎn)為你想要的類型。 起

    2024年02月16日
    瀏覽(20)
  • flink如何初始化kafka數(shù)據(jù)源的消費(fèi)偏移

    我們知道在日常非flink場(chǎng)景中消費(fèi)kafka主題時(shí),我們只要指定了消費(fèi)者組,下次程序重新消費(fèi)時(shí)是可以從上次消費(fèi)停止時(shí)的消費(fèi)偏移開(kāi)始繼續(xù)消費(fèi)的,這得益于kafka的_offset_主題保存的關(guān)于消費(fèi)者組和topic偏移位置的具體偏移信息,那么flink應(yīng)用中重啟flink應(yīng)用時(shí),flink是從topic的什

    2024年02月16日
    瀏覽(31)
  • 大數(shù)據(jù)之使用Flink消費(fèi)Kafka中topic為ods_mall_log的數(shù)據(jù),根據(jù)不同的表前綴區(qū)分在存入Kafka的topic當(dāng)中

    前言 題目: 一、讀題分析 二、處理過(guò)程 ? 1.數(shù)據(jù)處理部分: 2.HBaseSink(未經(jīng)測(cè)試,不能證明其正確性,僅供參考?。?三、重難點(diǎn)分析 總結(jié)? 什么是HBase? 本題來(lái)源于全國(guó)職業(yè)技能大賽之大數(shù)據(jù)技術(shù)賽項(xiàng)賽題 - 電商數(shù)據(jù)處理 - 實(shí)時(shí)數(shù)據(jù)處理 注:由于設(shè)備問(wèn)題,代碼執(zhí)行結(jié)果

    2024年02月03日
    瀏覽(24)
  • 大數(shù)據(jù)之使用Flink消費(fèi)Kafka中topic為ods_mall_data的數(shù)據(jù),根據(jù)數(shù)據(jù)中不同的表將數(shù)據(jù)分別分發(fā)至kafka的DWD層

    大數(shù)據(jù)之使用Flink消費(fèi)Kafka中topic為ods_mall_data的數(shù)據(jù),根據(jù)數(shù)據(jù)中不同的表將數(shù)據(jù)分別分發(fā)至kafka的DWD層

    前言 題目: 一、讀題分析 二、處理過(guò)程 三、重難點(diǎn)分析 總結(jié)? 本題來(lái)源于全國(guó)職業(yè)技能大賽之大數(shù)據(jù)技術(shù)賽項(xiàng)賽題 - 電商數(shù)據(jù)處理 - 實(shí)時(shí)數(shù)據(jù)處理 注:由于設(shè)備問(wèn)題,代碼執(zhí)行結(jié)果以及數(shù)據(jù)的展示無(wú)法給出,可參照我以往的博客其中有相同數(shù)據(jù)源展示 ? ? 提示:以下是本

    2024年02月04日
    瀏覽(44)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包