国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

flink1.16使用消費(fèi)/生產(chǎn)kafka之DataStream

這篇具有很好參考價(jià)值的文章主要介紹了flink1.16使用消費(fèi)/生產(chǎn)kafka之DataStream。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

flink高級(jí)版本后,消費(fèi)kafka數(shù)據(jù)一種是Datastream 一種之tableApi。

上官網(wǎng)?Kafka | Apache Flink

Kafka Source

引入依賴 flink和kafka的連接器,里面內(nèi)置了kafka-client

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka</artifactId>
    <version>1.16.2</version>
</dependency>

使用方法

KafkaSource<String> source = KafkaSource.<String>builder()
    .setBootstrapServers(brokers)
    .setTopics("input-topic")
    .setGroupId("my-group")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();

env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

很簡單一目了然。

topic和partition

多個(gè)topic
KafkaSource.builder().setTopics("topic-a", "topic-b");
正則匹配多個(gè)topic
KafkaSource.builder().setTopicPattern("topic.*");
指定分區(qū)
final HashSet<TopicPartition> partitionSet = new HashSet<>(Arrays.asList(
        new TopicPartition("topic-a", 0),    // Partition 0 of topic "topic-a"
        new TopicPartition("topic-b", 5)));  // Partition 5 of topic "topic-b"
KafkaSource.builder().setPartitions(partitionSet);

?反序列化

import org.apache.kafka.common.serialization.StringDeserializer;

KafkaSource.<String>builder()
        .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class));

其實(shí)就是實(shí)現(xiàn)接口?DeserializationSchema 的deserialize()方法 把byte轉(zhuǎn)為你想要的類型。

起始消費(fèi)位點(diǎn)

KafkaSource.builder()
    // 從消費(fèi)組提交的位點(diǎn)開始消費(fèi),不指定位點(diǎn)重置策略
    .setStartingOffsets(OffsetsInitializer.committedOffsets())
    // 從消費(fèi)組提交的位點(diǎn)開始消費(fèi),如果提交位點(diǎn)不存在,使用最早位點(diǎn)
    .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
    // 從時(shí)間戳大于等于指定時(shí)間戳(毫秒)的數(shù)據(jù)開始消費(fèi)
    .setStartingOffsets(OffsetsInitializer.timestamp(1657256176000L))
    // 從最早位點(diǎn)開始消費(fèi)
    .setStartingOffsets(OffsetsInitializer.earliest())
    // 從最末尾位點(diǎn)開始消費(fèi)
    .setStartingOffsets(OffsetsInitializer.latest());

有界 / 無界模式?

Kafka Source 支持流式和批式兩種運(yùn)行模式。默認(rèn)情況下,KafkaSource 設(shè)置為以流模式運(yùn)行,因此作業(yè)永遠(yuǎn)不會(huì)停止,直到 Flink 作業(yè)失敗或被取消。 可以使用?setBounded(OffsetsInitializer)?指定停止偏移量使 Kafka Source 以批處理模式運(yùn)行。當(dāng)所有分區(qū)都達(dá)到其停止偏移量時(shí),Kafka Source 會(huì)退出運(yùn)行。

流模式下運(yùn)行通過使用?setUnbounded(OffsetsInitializer)?也可以指定停止消費(fèi)位點(diǎn),當(dāng)所有分區(qū)達(dá)到其指定的停止偏移量時(shí),Kafka Source 會(huì)退出運(yùn)行。

估計(jì)99的人都用不到這個(gè) 。也就是設(shè)置一個(gè)結(jié)束的offset的點(diǎn)

其他屬性

除了上述屬性之外,您還可以使用 setProperties(Properties) 和 setProperty(String, String) 為 Kafka Source 和 Kafka Consumer 設(shè)置任意屬性。KafkaSource 有以下配置項(xiàng):

  • client.id.prefix,指定用于 Kafka Consumer 的客戶端 ID 前綴
  • partition.discovery.interval.ms,定義 Kafka Source 檢查新分區(qū)的時(shí)間間隔。 請(qǐng)參閱下面的動(dòng)態(tài)分區(qū)檢查一節(jié)
  • register.consumer.metrics?指定是否在 Flink 中注冊 Kafka Consumer 的指標(biāo)
  • commit.offsets.on.checkpoint?指定是否在進(jìn)行 checkpoint 時(shí)將消費(fèi)位點(diǎn)提交至 Kafka broker? 這個(gè)還是有用的

Kafka consumer 的配置可以參考?Apache Kafka 文檔。

請(qǐng)注意,即使指定了以下配置項(xiàng),構(gòu)建器也會(huì)將其覆蓋:

  • key.deserializer?始終設(shè)置為 ByteArrayDeserializer
  • value.deserializer?始終設(shè)置為 ByteArrayDeserializer
  • auto.offset.reset.strategy?被 OffsetsInitializer#getAutoOffsetResetStrategy() 覆蓋
  • partition.discovery.interval.ms?會(huì)在批模式下被覆蓋為 -1

動(dòng)態(tài)分區(qū)檢查

為了在不重啟 Flink 作業(yè)的情況下處理 Topic 擴(kuò)容或新建 Topic 等場景,可以將 Kafka Source 配置為在提供的 Topic / Partition 訂閱模式下定期檢查新分區(qū)。要啟用動(dòng)態(tài)分區(qū)檢查,請(qǐng)將?partition.discovery.interval.ms?設(shè)置為非負(fù)值:

KafkaSource.builder()
    .setProperty("partition.discovery.interval.ms", "10000"); // 每 10 秒檢查一次新分區(qū)
分區(qū)檢查功能默認(rèn) 不開啟。需要顯式地設(shè)置分區(qū)檢查間隔才能啟用此功能。

這個(gè)是為了擴(kuò)容的,因?yàn)閗afka的消費(fèi)能力和分區(qū)有關(guān),消費(fèi)能力不夠的時(shí)候需要?jiǎng)討B(tài)增加分區(qū)?

事件時(shí)間和水印

默認(rèn)情況下,Kafka Source 使用 Kafka 消息中的時(shí)間戳作為事件時(shí)間。您可以定義自己的水印策略(Watermark Strategy) 以從消息中提取事件時(shí)間,并向下游發(fā)送水?。?/p>

env.fromSource(kafkaSource, new CustomWatermarkStrategy(), "Kafka Source With Custom Watermark Strategy");

消費(fèi)位點(diǎn)提交?

Kafka source 在 checkpoint?完成時(shí)提交當(dāng)前的消費(fèi)位點(diǎn) ,以保證 Flink 的 checkpoint 狀態(tài)和 Kafka broker 上的提交位點(diǎn)一致。如果未開啟 checkpoint,Kafka source 依賴于 Kafka consumer 內(nèi)部的位點(diǎn)定時(shí)自動(dòng)提交邏輯,自動(dòng)提交功能由?enable.auto.commit?和?auto.commit.interval.ms?兩個(gè) Kafka consumer 配置項(xiàng)進(jìn)行配置。

注意:Kafka source?不依賴于 broker 上提交的位點(diǎn)來恢復(fù)失敗的作業(yè)。提交位點(diǎn)只是為了上報(bào) Kafka consumer 和消費(fèi)組的消費(fèi)進(jìn)度,以在 broker 端進(jìn)行監(jiān)控。

這里是說明消費(fèi)的offset是由kafka管理還是flink管理。

舉個(gè)例子

.setProperty("enable.auto.commit","true")
.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"100")

并且env沒有?env.enableCheckpoint()

此時(shí)只要消息進(jìn)入到flink,過了100ms就會(huì)被認(rèn)為已經(jīng)消費(fèi)過了offset會(huì)+1 不管你這個(gè)消息是否處理完,是否處理失敗了。

但是如果你env.enableCheckpoint(),那么此時(shí)就是由checkpoint被提交的之前提交offset了。和checkpoint息息相關(guān)。checkpoint如果失敗了 那么這個(gè)offset就不會(huì)被提交了。

kafka安全認(rèn)證

KafkaSource.builder()
    .setProperty("security.protocol", "SASL_PLAINTEXT")
    .setProperty("sasl.mechanism", "PLAIN")
    .setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"username\" password=\"password\";");

上面這個(gè)比較常用?

另一個(gè)更復(fù)雜的例子,使用 SASL_SSL 作為安全協(xié)議并使用 SCRAM-SHA-256 作為 SASL 機(jī)制:

KafkaSource.builder()
    .setProperty("security.protocol", "SASL_SSL")
    // SSL 配置
    // 配置服務(wù)端提供的 truststore (CA 證書) 的路徑
    .setProperty("ssl.truststore.location", "/path/to/kafka.client.truststore.jks")
    .setProperty("ssl.truststore.password", "test1234")
    // 如果要求客戶端認(rèn)證,則需要配置 keystore (私鑰) 的路徑
    .setProperty("ssl.keystore.location", "/path/to/kafka.client.keystore.jks")
    .setProperty("ssl.keystore.password", "test1234")
    // SASL 配置
    // 將 SASL 機(jī)制配置為 as SCRAM-SHA-256
    .setProperty("sasl.mechanism", "SCRAM-SHA-256")
    // 配置 JAAS
    .setProperty("sasl.jaas.config", "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"password\";");

如果在作業(yè) JAR 中 Kafka 客戶端依賴的類路徑被重置了(relocate class),登錄模塊(login module)的類路徑可能會(huì)不同,因此請(qǐng)根據(jù)登錄模塊在 JAR 中實(shí)際的類路徑來改寫以上配置。

Kafka Sink

KafkaSink?可將數(shù)據(jù)流寫入一個(gè)或多個(gè) Kafka topic。

使用方法

DataStream<String> stream = ...;
        
KafkaSink<String> sink = KafkaSink.<String>builder()
        .setBootstrapServers(brokers)
        .setRecordSerializer(KafkaRecordSerializationSchema.builder()
            .setTopic("topic-name")
            .setValueSerializationSchema(new SimpleStringSchema())
            .build()
        )
        .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
        .build();
        
stream.sinkTo(sink);

以下屬性在構(gòu)建 KafkaSink 時(shí)是必須指定的:

  • Bootstrap servers,?setBootstrapServers(String)
  • 消息序列化器(Serializer),?setRecordSerializer(KafkaRecordSerializationSchema)
  • 如果使用DeliveryGuarantee.EXACTLY_ONCE?的語義保證,則需要使用?setTransactionalIdPrefix(String)

?序列化器

KafkaRecordSerializationSchema.builder()
    .setTopicSelector((element) -> {<your-topic-selection-logic>})
    .setValueSerializationSchema(new SimpleStringSchema())
    .setKeySerializationSchema(new SimpleStringSchema())
    .setPartitioner(new FlinkFixedPartitioner())
    .build();

其中消息體(value)序列化方法和 topic 的選擇方法是必須指定的,此外也可以通過?setKafkaKeySerializer(Serializer)?或?setKafkaValueSerializer(Serializer)?來使用 Kafka 提供而非 Flink 提供的序列化器。

容錯(cuò)

KafkaSink?總共支持三種不同的語義保證(DeliveryGuarantee)。對(duì)于?DeliveryGuarantee.AT_LEAST_ONCE?和?DeliveryGuarantee.EXACTLY_ONCE,F(xiàn)link checkpoint 必須啟用。默認(rèn)情況下?KafkaSink?使用?DeliveryGuarantee.NONE。 以下是對(duì)不同語義保證的解釋:

一旦啟用了基于 Kerberos 的 Flink 安全性后,只需在提供的屬性配置中包含以下兩個(gè)設(shè)置(通過傳遞給內(nèi)部 Kafka 客戶端),即可使用 Flink Kafka Consumer 或 Producer 向 Kafk a進(jìn)行身份驗(yàn)證:

  • DeliveryGuarantee.NONE?不提供任何保證:消息有可能會(huì)因 Kafka broker 的原因發(fā)生丟失或因 Flink 的故障發(fā)生重復(fù)。
  • DeliveryGuarantee.AT_LEAST_ONCE: sink 在 checkpoint 時(shí)會(huì)等待 Kafka 緩沖區(qū)中的數(shù)據(jù)全部被 Kafka producer 確認(rèn)。消息不會(huì)因 Kafka broker 端發(fā)生的事件而丟失,但可能會(huì)在 Flink 重啟時(shí)重復(fù),因?yàn)?Flink 會(huì)重新處理舊數(shù)據(jù)。
  • DeliveryGuarantee.EXACTLY_ONCE: 該模式下,Kafka sink 會(huì)將所有數(shù)據(jù)通過在 checkpoint 時(shí)提交的事務(wù)寫入。因此,如果 consumer 只讀取已提交的數(shù)據(jù)(參見 Kafka consumer 配置?isolation.level),在 Flink 發(fā)生重啟時(shí)不會(huì)發(fā)生數(shù)據(jù)重復(fù)。然而這會(huì)使數(shù)據(jù)在 checkpoint 完成時(shí)才會(huì)可見,因此請(qǐng)按需調(diào)整 checkpoint 的間隔。請(qǐng)確認(rèn)事務(wù) ID 的前綴(transactionIdPrefix)對(duì)不同的應(yīng)用是唯一的,以保證不同作業(yè)的事務(wù) 不會(huì)互相影響!此外,強(qiáng)烈建議將 Kafka 的事務(wù)超時(shí)時(shí)間調(diào)整至遠(yuǎn)大于 checkpoint 最大間隔 + 最大重啟時(shí)間,否則 Kafka 對(duì)未提交事務(wù)的過期處理會(huì)導(dǎo)致數(shù)據(jù)丟失。
  • 啟用 Kerberos 身份驗(yàn)證

  • Flink 通過 Kafka 連接器提供了一流的支持,可以對(duì) Kerberos 配置的 Kafka 安裝進(jìn)行身份驗(yàn)證。只需在?flink-conf.yaml?中配置 Flink。像這樣為 Kafka 啟用 Kerberos 身份驗(yàn)證:
    1.通過設(shè)置以下內(nèi)容配置 Kerberos 票據(jù)

  • security.kerberos.login.use-ticket-cache:默認(rèn)情況下,這個(gè)值是?true,F(xiàn)link 將嘗試在?kinit?管理的票據(jù)緩存中使用 Kerberos 票據(jù)。注意!在 YARN 上部署的 Flink jobs 中使用 Kafka 連接器時(shí),使用票據(jù)緩存的 Kerberos 授權(quán)將不起作用。
  • security.kerberos.login.keytab?和?security.kerberos.login.principal:要使用 Kerberos keytabs,需為這兩個(gè)屬性設(shè)置值。
    2。將?KafkaClient?追加到?security.kerberos.login.contexts:這告訴 Flink 將配置的 Kerberos 票據(jù)提供給 Kafka 登錄上下文以用于 Kafka 身份驗(yàn)證。
  • 將?security.protocol?設(shè)置為?SASL_PLAINTEXT(默認(rèn)為?NONE):用于與 Kafka broker 進(jìn)行通信的協(xié)議。使用獨(dú)立 Flink 部署時(shí),也可以使用?SASL_SSL;請(qǐng)?jiān)诖颂幉榭慈绾螢?SSL 配置 Kafka 客戶端。
  • 將?sasl.kerberos.service.name?設(shè)置為?kafka(默認(rèn)為?kafka):此值應(yīng)與用于 Kafka broker 配置的?sasl.kerberos.service.name?相匹配??蛻舳撕头?wù)器配置之間的服務(wù)名稱不匹配將導(dǎo)致身份驗(yàn)證失敗。

問題排查

數(shù)據(jù)丟失?

根據(jù)你的 Kafka 配置,即使在 Kafka 確認(rèn)寫入后,你仍然可能會(huì)遇到數(shù)據(jù)丟失。特別要記住在 Kafka 的配置中設(shè)置以下屬性:

  • acks
  • log.flush.interval.messages
  • log.flush.interval.ms
  • log.flush.*

UnknownTopicOrPartitionException?

導(dǎo)致此錯(cuò)誤的一個(gè)可能原因是正在進(jìn)行新的 leader 選舉,例如在重新啟動(dòng) Kafka broker 之后或期間。這是一個(gè)可重試的異常,因此 Flink job 應(yīng)該能夠重啟并恢復(fù)正常運(yùn)行。也可以通過更改 producer 設(shè)置中的?retries?屬性來規(guī)避。但是,這可能會(huì)導(dǎo)致重新排序消息,反過來可以通過將?max.in.flight.requests.per.connection?設(shè)置為 1 來避免不需要的消息。

ProducerFencedException?

這個(gè)錯(cuò)誤是由于?FlinkKafkaProducer?所生成的?transactional.id?與其他應(yīng)用所使用的的產(chǎn)生了沖突。多數(shù)情況下,由于?FlinkKafkaProducer?產(chǎn)生的 ID 都是以?taskName + "-" + operatorUid?為前綴的,這些產(chǎn)生沖突的應(yīng)用也是使用了相同 Job Graph 的 Flink Job。 我們可以使用?setTransactionalIdPrefix()?方法來覆蓋默認(rèn)的行為,為每個(gè)不同的 Job 分配不同的?transactional.id?前綴來解決這個(gè)問題。文章來源地址http://www.zghlxwxcb.cn/news/detail-605321.html

到了這里,關(guān)于flink1.16使用消費(fèi)/生產(chǎn)kafka之DataStream的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • Java輕松使用Kafka生產(chǎn)者,消費(fèi)者

    Java輕松使用Kafka生產(chǎn)者,消費(fèi)者 一、環(huán)境說明 項(xiàng)目中需要下面的依賴: ( 版本自定義 ) 2. yml配置文件設(shè)置 1. 簡單生產(chǎn)者的書寫: 1. 簡單消費(fèi)者的書寫: ? 注:多消費(fèi)者時(shí),需要對(duì)應(yīng)kafka中配置的分區(qū);多少的Partition就有多少個(gè)消費(fèi)者,以免資源浪費(fèi)

    2024年02月15日
    瀏覽(29)
  • Kafka官方生產(chǎn)者和消費(fèi)者腳本簡單使用

    怎樣使用Kafka官方生產(chǎn)者和消費(fèi)者腳本進(jìn)行消費(fèi)生產(chǎn)和消費(fèi)?這里假設(shè)已經(jīng)下載了kafka官方文件,并已經(jīng)解壓. 這就可以見到測試kafka對(duì)應(yīng)topic了.

    2024年02月04日
    瀏覽(23)
  • flink1.17.0 集成kafka,并且計(jì)算

    flink1.17.0 集成kafka,并且計(jì)算

    flink是實(shí)時(shí)計(jì)算的重要集成組件,這里演示如何集成,并且使用一個(gè)小例子。例子是kafka輸入消息,用逗號(hào)隔開,統(tǒng)計(jì)每個(gè)相同單詞出現(xiàn)的次數(shù),這么一個(gè)功能。 這里我使用的kafka版本是3.2.0,部署的方法可以參考, kafka部署 啟動(dòng)后查看java進(jìn)程是否存在,存在后執(zhí)行下一步。

    2024年02月09日
    瀏覽(18)
  • Flink使用 KafkaSource消費(fèi) Kafka中的數(shù)據(jù)

    目前,很多 flink相關(guān)的書籍和網(wǎng)上的文章講解如何對(duì)接 kafka時(shí)都是使用的 FlinkKafkaConsumer,如下: 新版的 flink,比如 1.14.3已經(jīng)將 FlinkKafkaConsumer標(biāo)記為 deprecated(不推薦),如下: 新版本的 flink應(yīng)該使用 KafkaSource來消費(fèi) kafka中的數(shù)據(jù),詳細(xì)代碼如下: 開發(fā)者在工作中應(yīng)該盡量避

    2024年02月15日
    瀏覽(22)
  • 【Flink-Kafka-To-RocketMQ】使用 Flink 自定義 Sink 消費(fèi) Kafka 數(shù)據(jù)寫入 RocketMQ

    這里的 maven 依賴比較冗余,推薦大家都加上,后面陸續(xù)優(yōu)化。 注意: 1、此程序中所有的相關(guān)配置都是通過 Mysql 讀取的(生產(chǎn)環(huán)境中沒有直接寫死的,都是通過配置文件動(dòng)態(tài)配置),大家實(shí)際測試過程中可以將相關(guān)配置信息寫死。 2、此程序中 Kafka 涉及到了 Kerberos 認(rèn)證操作

    2024年02月03日
    瀏覽(21)
  • 掌握實(shí)時(shí)數(shù)據(jù)流:使用Apache Flink消費(fèi)Kafka數(shù)據(jù)

    掌握實(shí)時(shí)數(shù)據(jù)流:使用Apache Flink消費(fèi)Kafka數(shù)據(jù)

    ? ? ? ? 導(dǎo)讀:使用Flink實(shí)時(shí)消費(fèi)Kafka數(shù)據(jù)的案例是探索實(shí)時(shí)數(shù)據(jù)處理領(lǐng)域的絕佳方式。不僅非常實(shí)用,而且對(duì)于理解現(xiàn)代數(shù)據(jù)架構(gòu)和流處理技術(shù)具有重要意義。 ????????Apache Flink ?是一個(gè)在 有界 數(shù)據(jù)流和 無界 數(shù)據(jù)流上進(jìn)行有狀態(tài)計(jì)算分布式處理引擎和框架。Flink 設(shè)計(jì)旨

    2024年02月03日
    瀏覽(31)
  • 從Flink的Kafka消費(fèi)者看算子聯(lián)合列表狀態(tài)的使用

    從Flink的Kafka消費(fèi)者看算子聯(lián)合列表狀態(tài)的使用

    算子的聯(lián)合列表狀態(tài)是平時(shí)使用的比較少的一種狀態(tài),本文通過kafka的消費(fèi)者實(shí)現(xiàn)來看一下怎么使用算子列表聯(lián)合狀態(tài) 首先我們看一下算子聯(lián)合列表狀態(tài)的在進(jìn)行故障恢復(fù)或者從某個(gè)保存點(diǎn)進(jìn)行擴(kuò)縮容啟動(dòng)應(yīng)用時(shí)狀態(tài)的恢復(fù)情況 算子聯(lián)合列表狀態(tài)主要由這兩個(gè)方法處理: 1初

    2024年02月08日
    瀏覽(23)
  • Kafka生產(chǎn)與消費(fèi)示例

    Kafka生產(chǎn)與消費(fèi)示例

    Kafka是一款消息中間件,消息中間件本質(zhì)就是收消息與發(fā)消息,所以這節(jié)課我們會(huì)從一條消息開始生產(chǎn)出發(fā),去了解生產(chǎn)端的運(yùn)行流程,然后簡單的了解一下broker的存儲(chǔ)流程,最后這條消息是如何被消費(fèi)者消費(fèi)掉的。其中最核心的有以下內(nèi)容。 1、Kafka客戶端是如何去設(shè)計(jì)一個(gè)

    2024年02月09日
    瀏覽(51)
  • Kafka生產(chǎn)消費(fèi)流程

    Kafka生產(chǎn)消費(fèi)流程

    準(zhǔn)備工作 創(chuàng)建maven工程,引入依賴 消費(fèi)者 1.1 發(fā)送并忘記 忽略send方法的返回值,不做任何處理。大多數(shù)情況下,消息會(huì)正常到達(dá),而且生產(chǎn)者會(huì)自動(dòng)重試,但有時(shí)會(huì)丟失消息。 消費(fèi)者 測試結(jié)果 1.2同步發(fā)送 測試結(jié)果 1.3 異步發(fā)送 測試結(jié)果 Kafka里消費(fèi)者從屬于消費(fèi)者群組,一

    2024年01月16日
    瀏覽(18)
  • 二、Kafka生產(chǎn)與消費(fèi)全流程

    二、Kafka生產(chǎn)與消費(fèi)全流程

    Kafka是一款消息中間件,消息中間件本質(zhì)就是收消息與發(fā)消息,所以這節(jié)課我們會(huì)從一條消息開始生產(chǎn)出發(fā),去了解生產(chǎn)端的運(yùn)行流程,然后簡單的了解一下broker的存儲(chǔ)流程,最后這條消息是如何被消費(fèi)者消費(fèi)掉的。其中最核心的有以下內(nèi)容。 1、Kafka客戶端是如何去設(shè)計(jì)一個(gè)

    2024年02月09日
    瀏覽(13)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包