国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Flink Kafka[輸入/輸出] Connector

這篇具有很好參考價值的文章主要介紹了Flink Kafka[輸入/輸出] Connector。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

本章重點介紹生產(chǎn)環(huán)境中最常用到的Flink kafka connector。使用Flink的同學(xué),一定會很熟悉kafka,它是一個分布式的、分區(qū)的、多副本的、 支持高吞吐的、發(fā)布訂閱消息系統(tǒng)。生產(chǎn)環(huán)境環(huán)境中也經(jīng)常會跟kafka進(jìn)行一些數(shù)據(jù)的交換,比如利用kafka consumer讀取數(shù)據(jù),然后進(jìn)行一系列的處理之后,再將結(jié)果寫出到kafka中。這里會主要分兩個部分進(jìn)行介紹,一是Flink kafka Consumer,一個是Flink kafka Producer

Flink 輸入輸出至 Kafka案例

首先看一個例子來串聯(lián)下Flink kafka connector。代碼邏輯里主要是從 kafka里讀數(shù)據(jù),然后做簡單的處理,再寫回到kafka中。首先需要引入 flink-kafka相關(guān)的pom.xml依賴:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.11_2.12</artifactId>
    <version>1.10.0</version>
</dependency>

分別從如何構(gòu)造一個Source sinkFunction。Flink提供了現(xiàn)成的構(gòu)造FlinkKafkaConsumerProducer的接口,可以直接使用。這里需要注意,因為kafka有多個版本,多個版本之間的接口協(xié)議會不同。Flink針對不同版本的kafka有相應(yīng)的版本的ConsumerProducer。例如:針對 08、09、1011版本,Flink對應(yīng)的consumer分別是FlinkKafkaConsumer 08、09010、011producer也是。

 package com.zzx.flink;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import scala.Tuple2;
import scala.tools.nsc.transform.patmat.Logic;

import java.util.Properties;

/**
 * @description: Flink 從kafka 中讀取數(shù)據(jù)并寫入kafka
 * @author: zzx
 * @createDate: 2020/7/22
 * @version: 1.0
 */
public class FlinkKafkaExample {
    public static void main(String[] args) throws Exception{
        //ParameterTool 從參數(shù)中讀取數(shù)據(jù)
        final ParameterTool params = ParameterTool.fromArgs(args);

        //設(shè)置執(zhí)行環(huán)境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //使參數(shù)在web界面中可用
        env.getConfig().setGlobalJobParameters(params);
        /**  TimeCharacteristic 中包含三種時間類型
         * @PublicEvolving
         * public enum TimeCharacteristic {
         * ?    //以operator處理的時間為準(zhǔn),它使用的是機(jī)器的系統(tǒng)時間來作為data stream的時間
         *     ProcessingTime,
         * ?    //以數(shù)據(jù)進(jìn)入flink streaming data flow的時間為準(zhǔn)
         *     IngestionTime,
         * ?    //以數(shù)據(jù)自帶的時間戳字段為準(zhǔn),應(yīng)用程序需要指定如何從record中抽取時間戳字段
         *     EventTime
         * }
         */
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        /**
         * CheckpointingMode:    EXACTLY_ONCE(執(zhí)行一次)  AT_LEAST_ONCE(至少一次)
         */
        env.enableCheckpointing(60*1000, CheckpointingMode.EXACTLY_ONCE);

        //------------------------------------------source start -----------------------------------
        String sourceTopic = "sensor";
        String bootstrapServers = "hadoop1:9092";
        // kafkaConsumer 需要的配置參數(shù)
        Properties props = new Properties();
        // 定義kakfa 服務(wù)的地址,不需要將所有broker指定上
        props.put("bootstrap.servers", bootstrapServers);
        // 制定consumer group
        props.put("group.id", "test");
        // 是否自動確認(rèn)offset
        props.put("enable.auto.commit", "true");
        // 自動確認(rèn)offset的時間間隔
        props.put("auto.commit.interval.ms", "1000");
        // key的序列化類
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        // value的序列化類
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //從kafka讀取數(shù)據(jù),需要實現(xiàn) SourceFunction 他給我們提供了一個
        FlinkKafkaConsumer011<String> consumer = new FlinkKafkaConsumer011<String>(sourceTopic, new SimpleStringSchema(), props);
        //------------------------------------------source end -----------------------------------------

        //------------------------------------------sink start -----------------------------------
        String sinkTopic = "topic";
        Properties properties = new Properties();
        properties.put("bootstrap.servers", bootstrapServers);
        properties.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");
        FlinkKafkaProducer011<String> producer = new FlinkKafkaProducer011<String>(sinkTopic, new SimpleStringSchema(), properties);
        //------------------------------------------sink end --------------------------------------

        //FlinkKafkaConsumer011 繼承自 RichParallelSourceFunction
        env.addSource(consumer)
            .map(new MapFunction<String, Tuple2<Long,String>>(){
                @Override
                public Tuple2<Long, String> map(String s) throws Exception {
                    return new Tuple2<>(1L,s);
                }
            })
            .filter(k -> k != null)
            .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<Tuple2<Long, String>>(Time.seconds(5)) {
                @Override
                public long extractTimestamp(Tuple2<Long, String> element) {
                    return element._1;
                }
            })
            .map(k ->k.toString())
            .addSink(producer);

        //執(zhí)行
        env.execute("FlinkKafkaExample");
    }
}

如下創(chuàng)建代碼中涉及的"sensor" Topic

[root@hadoop1 kafka_2.11-2.2.2]# bin/kafka-topics.sh --create --zookeeper hadoop1:2181 --topic sensor --replication-factor 2 --partitions 4

Flink kafka Consumer

反序列化數(shù)據(jù): 因為kafka中數(shù)據(jù)都是以二進(jìn)制byte形式存儲的。讀到Flink系統(tǒng)中之后,需要將二進(jìn)制數(shù)據(jù)轉(zhuǎn)化為具體的java、scala對象。具體需要實現(xiàn)一個schema類定義如何序列化和反序列數(shù)據(jù)。反序列化時需要實現(xiàn)DeserializationSchema
口,并重寫deserialize(byte[] message)函數(shù),如果是反序列化kafkakv的數(shù)據(jù)時,需要實現(xiàn)KeyedDeserializationSchema接口,并重寫 deserialize(byte[] messageKey, byte[] message, String topic, int partition, long offset)函數(shù)。

另外Flink中也提供了一些常用的序列化反序列化的schema類。例如,SimpleStringSchema,按字符串方式進(jìn)行序列化、反序列化。TypeInformationSerializationSchema,它可根據(jù)FlinkTypeInformation信息來推斷出需要選擇的schema。JsonDeserializationSchema使用 jackson反序列化 json格式消息,并返回ObjectNode,可以使用get(“property”)方法來訪問相應(yīng)字段。
Flink Kafka[輸入/輸出] Connector,Flink,flink,kafka,linq,大數(shù)據(jù),java,面試,后端

消費起始位置設(shè)置

如何設(shè)置作業(yè)消費kafka起始位置的數(shù)據(jù),這一部分Flink也提供了非常好的封裝。在構(gòu)造好的FlinkKafkaConsumer類后面調(diào)用如下相應(yīng)函數(shù),設(shè)置合適的起始位置。
【1】setStartFromGroupOffsets,也是默認(rèn)的策略,從group offset位置讀取數(shù)據(jù),group offset指的是kafka broker端記錄的某個group的最后一次的消費位置。但是kafka broker端沒有該group信息,會根據(jù)kafka的參數(shù)auto.offset.reset的設(shè)置來決定從哪個位置開始消費。
setStartFromEarliest,從kafka最早的位置開始讀取。
setStartFromLatest,從kafka最新的位置開始讀取。
setStartFromTimestamp(long),從時間戳大于或等于指定時間戳的位置開始讀取。Kafka時間戳,是指kafka為每條消息增加另一個時戳。該時戳可以表示消息在proudcer端生成時的時間、或進(jìn)入到kafka broker時的時間。
setStartFromSpecificOffsets,從指定分區(qū)的offset位置開始讀取,如指定的offsets中不存某個分區(qū),該分區(qū)從group offset位置開始讀取。此時需要用戶給定一個具體的分區(qū)、offset的集合。

一些具體的使用方法可以參考下圖。需要注意的是,因為Flink框架有容錯機(jī)制,如果作業(yè)故障,如果作業(yè)開啟checkpoint,會從上一次 checkpoint狀態(tài)開始恢復(fù)?;蛘咴谕V棺鳂I(yè)的時候主動做savepoint,啟動作業(yè)時從savepoint開始恢復(fù)。這兩種情況下恢復(fù)作業(yè)時,作業(yè)消費起始位置是從之前保存的狀態(tài)中恢復(fù),與上面提到跟kafka這些單獨的配置無關(guān)。
Flink Kafka[輸入/輸出] Connector,Flink,flink,kafka,linq,大數(shù)據(jù),java,面試,后端

topic 和 partition 動態(tài)發(fā)現(xiàn)

實際的生產(chǎn)環(huán)境中可能有這樣一些需求:
場景一,有一個Flink作業(yè)需要將五份數(shù)據(jù)聚合到一起,五份數(shù)據(jù)對應(yīng)五個kafka topic,隨著業(yè)務(wù)增長,新增一類數(shù)據(jù),同時新增了一個 kafka topic,如何在不重啟作業(yè)的情況下作業(yè)自動感知新的topic。
場景二,作業(yè)從一個固定的kafka topic讀數(shù)據(jù),開始該topic10partition,但隨著業(yè)務(wù)的增長數(shù)據(jù)量變大,需要對kafka partition個數(shù)進(jìn)行擴(kuò)容,由10個擴(kuò)容到20。該情況下如何在不重啟作業(yè)情況下動態(tài)感知新擴(kuò)容的partition
針對上面的兩種場景,首先需要在構(gòu)建FlinkKafkaConsumer時的properties中設(shè)置flink.partition-discovery.interval-millis參數(shù)為非負(fù)值,表示開啟動態(tài)發(fā)現(xiàn)的開關(guān),以及設(shè)置的時間間隔。此時FlinkKafkaConsumer內(nèi)部會啟動一個單獨的線程定期去kafka獲取最新的meta信息。針對場景一,還需在構(gòu)建FlinkKafkaConsumer時,topic的描述可以傳一個正則表達(dá)式(如下圖所示)描述的pattern。每次獲取最新kafka meta時獲取正則匹配的最新topic列表。針對場景二,設(shè)置前面的動態(tài)發(fā)現(xiàn)參數(shù),在定期獲取kafka最新meta信息時會匹配新的partition。為了保證數(shù)據(jù)的正確性,新發(fā)現(xiàn)的partition從最早的位置開始讀取。
Flink Kafka[輸入/輸出] Connector,Flink,flink,kafka,linq,大數(shù)據(jù),java,面試,后端

commit offset 方式

Flink kafka consumer commit offset方式需要區(qū)分是否開啟了checkpoint。如果checkpoint關(guān)閉,commit offset要依賴于kafka客戶端的auto commit。 需設(shè)置enable.auto.commitauto.commit.interval.ms參數(shù)到consumer properties,就會按固定的時間間隔定期auto commit offsetkafka。如果開啟checkpoint,這個時候作業(yè)消費的offset,Flink會在state中自己管理和容錯。此時提交offsetkafka,一般都是作為外部進(jìn)度的監(jiān)控,想實時知道作業(yè)消費的位置和lag情況。此時需要setCommitOffsetsOnCheckpointstrue來設(shè)置當(dāng)checkpoint成功時提交offsetkafka。此時commit offset的間隔就取決于checkpoint的間隔,所以此時從kafka一側(cè)看到的lag可能并非完全實時,如果checkpoint間隔比較長lag曲線可能會是一個鋸齒狀。
Flink Kafka[輸入/輸出] Connector,Flink,flink,kafka,linq,大數(shù)據(jù),java,面試,后端

Timestamp Extraction/Watermark 生成

我們知道當(dāng)Flink作業(yè)內(nèi)使用EventTime屬性時,需要指定從消息中提取時間戳和生成水位的函數(shù)。FlinkKakfaConsumer構(gòu)造的source后直接調(diào)用assignTimestampsAndWatermarks函數(shù)設(shè)置水位生成器的好處是此時是每個partition一個watermark assigner,如下圖。source生成的時戳為多個partition時戳對齊后的最小時戳。此時在一個source讀取多個partition,并且partition之間數(shù)據(jù)時戳有一定差距的情況下,因為在 sourcewatermarkpartition級別有對齊,不會導(dǎo)致數(shù)據(jù)讀取較慢partition數(shù)據(jù)丟失。
Flink Kafka[輸入/輸出] Connector,Flink,flink,kafka,linq,大數(shù)據(jù),java,面試,后端

Flink kafka Producer

【1】Producer分區(qū): 使用FlinkKafkaProducerkafka中寫數(shù)據(jù)時,如果不單獨設(shè)置partition策略,會默認(rèn)使用FlinkFixedPartitioner,該 partitioner分區(qū)的方式是task所在的并發(fā)idtopicpartition數(shù)取余:parallelInstanceId % partitions.length。
○ 此時如果sink4,paritition1,則4task往同一個partition中寫數(shù)據(jù)。但當(dāng)sink task < partition個數(shù)時會有部分partition沒有數(shù)據(jù)寫入,例如sink task2,partition總數(shù)為4,則后面兩個partition將沒有數(shù)據(jù)寫入。
○ 如果構(gòu)建FlinkKafkaProducer時,partition設(shè)置為null,此時會使用kafka producer默認(rèn)分區(qū)方式,非key寫入的情況下,使用round-robin的方式進(jìn)行分區(qū),每個task都會輪循的寫下游的所有partition。該方式下游的partition數(shù)據(jù)會比較均衡,但是缺點是partition個數(shù)過多的情況下需要維持過多的網(wǎng)絡(luò)連接,即每個task都會維持跟所有partition所在broker的連接。
Flink Kafka[輸入/輸出] Connector,Flink,flink,kafka,linq,大數(shù)據(jù),java,面試,后端

容錯

Flink kafka 09、010版本下,通過setLogFailuresOnlyfalse,setFlushOnCheckpointtrue, 能達(dá)到at-least-once語義。setLogFailuresOnly默認(rèn)為false,是控制寫kafka失敗時,是否只打印失敗的log不拋異常讓作業(yè)停止。setFlushOnCheckpoint,默認(rèn)為true,是控制是否在 checkpointfluse數(shù)據(jù)到kafka,保證數(shù)據(jù)已經(jīng)寫到kafka。否則數(shù)據(jù)有可能還緩存在kafka客戶端的buffer中,并沒有真正寫出到kafka,此時作業(yè)掛掉數(shù)據(jù)即丟失,不能做到至少一次的語義。
Flink kafka 011版本下,通過兩階段提交的sink結(jié)合kafka事務(wù)的功能,可以保證端到端精準(zhǔn)一次。
Flink Kafka[輸入/輸出] Connector,Flink,flink,kafka,linq,大數(shù)據(jù),java,面試,后端

疑問與解答

【問題一】:Flink consumer的并行度的設(shè)置:是對應(yīng)topicpartitions個數(shù)嗎?要是有多個主題數(shù)據(jù)源,并行度是設(shè)置成總體的 partitions數(shù)嗎?
【解答】: 這個并不是絕對的,跟topic的數(shù)據(jù)量也有關(guān),如果數(shù)據(jù)量不大,也可以設(shè)置小于partitions個數(shù)的并發(fā)數(shù)。但不要設(shè)置并發(fā)數(shù)大于partitions總數(shù),因為這種情況下某些并發(fā)因為分配不到partition導(dǎo)致沒有數(shù)據(jù)處理。
【問題二】: 如果partitionernull的時候是round-robin發(fā)到每一個partition ?如果有key的時候行為是kafka那種按照key分布到具體分區(qū)的行為嗎?
【解答】: 如果在構(gòu)造FlinkKafkaProducer時,如果沒有設(shè)置單獨的partitioner,則默認(rèn)使用FlinkFixedPartitioner,此時無論是帶key的數(shù)據(jù),還是不帶key。如果主動設(shè)置partitionernull時,不帶key的數(shù)據(jù)會round-robin輪詢的方式寫出到partition,帶key的數(shù)據(jù)會根據(jù)key,相同key數(shù)據(jù)分區(qū)的相同的partition
【問題三】: 如果checkpoint時間過長,offset未提交到kafka,此時節(jié)點宕機(jī)了,重啟之后的重復(fù)消費如何保證呢?
【解答】: 首先開啟checkpointoffsetFlink通過狀態(tài)state管理和恢復(fù)的,并不是從kafkaoffset位置恢復(fù)。在checkpoint機(jī)制下,作業(yè)從最近一次checkpoint恢復(fù),本身是會回放部分歷史數(shù)據(jù),導(dǎo)致部分?jǐn)?shù)據(jù)重復(fù)消費,Flink引擎僅保證計算狀態(tài)的精準(zhǔn)一次,要想做到端到端精準(zhǔn)一次需要依賴一些冪等的存儲系統(tǒng)或者事務(wù)操作。文章來源地址http://www.zghlxwxcb.cn/news/detail-763909.html

到了這里,關(guān)于Flink Kafka[輸入/輸出] Connector的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進(jìn)行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • flink正常消費kafka數(shù)據(jù),flink沒有做checkpoint,kafka位點沒有提交

    1、背景 flink消費kafka數(shù)據(jù),多并發(fā),實現(xiàn)雙流join 2、現(xiàn)象 (1)flink任務(wù)消費kafka數(shù)據(jù),其中數(shù)據(jù)正常消費,kafka顯示消息堆積,位點沒有提交,并且flink任務(wù)沒有做checkpoint (2)其中一個流的subtask顯示finished (3)無背壓 3、問題原因 (1)其中一個topic分區(qū)為1 (2)配置的并行

    2024年02月13日
    瀏覽(22)
  • 【Flink-Kafka-To-ClickHouse】使用 Flink 實現(xiàn) Kafka 數(shù)據(jù)寫入 ClickHouse

    需求描述: 1、數(shù)據(jù)從 Kafka 寫入 ClickHouse。 2、相關(guān)配置存放于 Mysql 中,通過 Mysql 進(jìn)行動態(tài)讀取。 3、此案例中的 Kafka 是進(jìn)行了 Kerberos 安全認(rèn)證的,如果不需要自行修改。 4、先在 ClickHouse 中創(chuàng)建表然后動態(tài)獲取 ClickHouse 的表結(jié)構(gòu)。 5、Kafka 數(shù)據(jù)為 Json 格式,通過 FlatMap 扁平

    2024年02月03日
    瀏覽(23)
  • 【Flink-Kafka-To-Hive】使用 Flink 實現(xiàn) Kafka 數(shù)據(jù)寫入 Hive

    需求描述: 1、數(shù)據(jù)從 Kafka 寫入 Hive。 2、相關(guān)配置存放于 Mysql 中,通過 Mysql 進(jìn)行動態(tài)讀取。 3、此案例中的 Kafka 是進(jìn)行了 Kerberos 安全認(rèn)證的,如果不需要自行修改。 4、Flink 集成 Kafka 寫入 Hive 需要進(jìn)行 checkpoint 才能落盤至 HDFS。 5、先在 Hive 中創(chuàng)建表然后動態(tài)獲取 Hive 的表

    2024年02月03日
    瀏覽(23)
  • 【Flink-Kafka-To-RocketMQ】使用 Flink 自定義 Sink 消費 Kafka 數(shù)據(jù)寫入 RocketMQ

    這里的 maven 依賴比較冗余,推薦大家都加上,后面陸續(xù)優(yōu)化。 注意: 1、此程序中所有的相關(guān)配置都是通過 Mysql 讀取的(生產(chǎn)環(huán)境中沒有直接寫死的,都是通過配置文件動態(tài)配置),大家實際測試過程中可以將相關(guān)配置信息寫死。 2、此程序中 Kafka 涉及到了 Kerberos 認(rèn)證操作

    2024年02月03日
    瀏覽(21)
  • 【flink番外篇】3、flink的source(內(nèi)置、mysql、kafka、redis、clickhouse)介紹及示例(3)- kafka

    一、Flink 專欄 Flink 專欄系統(tǒng)介紹某一知識點,并輔以具體的示例進(jìn)行說明。 1、Flink 部署系列 本部分介紹Flink的部署、配置相關(guān)基礎(chǔ)內(nèi)容。 2、Flink基礎(chǔ)系列 本部分介紹Flink 的基礎(chǔ)部分,比如術(shù)語、架構(gòu)、編程模型、編程指南、基本的datastream api用法、四大基石等內(nèi)容。 3、

    2024年02月03日
    瀏覽(25)
  • flink連接kafka

    示例 以下屬性在構(gòu)建 KafkaSource 時是必須指定的: Bootstrap server,通過 setBootstrapServers(String) 方法配置 消費者組 ID,通過setGroupId(String) 配置 要訂閱的 Topic / Partition, 用于解析 Kafka消息的反序列化器(Deserializer) 起始消費位點 Kafka source 能夠通過位點初始化器(OffsetsInitialize

    2024年02月22日
    瀏覽(16)
  • Flink生產(chǎn)數(shù)據(jù)到kafka

    文章目錄 前言 一、版本 二、使用步驟 1.maven引入庫 2.上代碼 近期開始學(xué)習(xí)Flink程序開發(fā),使用java語言,此文以生產(chǎn)數(shù)據(jù)至kafka為例記錄下遇到的問題以及代碼實現(xiàn),若有錯誤請?zhí)岢觥?Flink版本:1.15.4 kafka版本:3.0.0 以下代碼將Flink環(huán)境初始化、配置、生產(chǎn)數(shù)據(jù)至kafka代碼放在

    2023年04月26日
    瀏覽(16)
  • 基于Flink+kafka實時告警

    基于Flink+kafka實時告警

    項目使用告警系統(tǒng)的邏輯是將實時數(shù)據(jù)保存到本地數(shù)據(jù)庫再使用定時任務(wù)做判斷,然后產(chǎn)生告警數(shù)據(jù)。這種方式存在告警的延時實在是太高了。數(shù)據(jù)從產(chǎn)生到保存,從保存到判斷都會存在時間差,按照保存數(shù)據(jù)定時5分鐘一次,定時任務(wù)5分鐘一次。最高會產(chǎn)生10分鐘的誤差,這

    2024年02月16日
    瀏覽(24)
  • Flink之Kafka Sink

    代碼內(nèi)容 結(jié)果數(shù)據(jù)

    2024年02月15日
    瀏覽(23)
  • 輕松通關(guān)Flink第24講:Flink 消費 Kafka 數(shù)據(jù)業(yè)務(wù)開發(fā)

    在上一課時中我們提過在實時計算的場景下,絕大多數(shù)的數(shù)據(jù)源都是消息系統(tǒng),而 Kafka 從眾多的消息中間件中脫穎而出,主要是因為 高吞吐 、 低延遲 的特點;同時也講了 Flink 作為生產(chǎn)者像 Kafka 寫入數(shù)據(jù)的方式和代碼實現(xiàn)。這一課時我們將從以下幾個方面介紹 Flink 消費

    2024年02月08日
    瀏覽(25)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包