国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Spring Boot 整合kafka消費(fèi)模式AckMode以及手動(dòng)消費(fèi) 依賴管理

這篇具有很好參考價(jià)值的文章主要介紹了Spring Boot 整合kafka消費(fèi)模式AckMode以及手動(dòng)消費(fèi) 依賴管理。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

依賴管理

在pom.xml文件中導(dǎo)入依賴

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.5.5.RELEASE</version>
        </dependency>

配置文件修改

需要自己配置AckMode時(shí)候的配置

spring:
  application:
    name: base.kafka
  kafka:
    bootstrap-servers: kafka服務(wù)地址1:端口,kafka服務(wù)地址2:端口,kafka服務(wù)地址3:端口
    producer:
      # 寫入失敗時(shí),重試次數(shù)。當(dāng)leader節(jié)點(diǎn)失效,一個(gè)repli節(jié)點(diǎn)會(huì)替代成為leader節(jié)點(diǎn),此時(shí)可能出現(xiàn)寫入失敗,
      # 當(dāng)retris為0時(shí),produce不會(huì)重復(fù)。retirs重發(fā),此時(shí)repli節(jié)點(diǎn)完全成為leader節(jié)點(diǎn),不會(huì)產(chǎn)生消息丟失。
      retries: 0
      #procedure要求leader在考慮完成請(qǐng)求之前收到的確認(rèn)數(shù),用于控制發(fā)送記錄在服務(wù)端的持久化,其值可以為如下:
      #acks = 0 如果設(shè)置為零,則生產(chǎn)者將不會(huì)等待來(lái)自服務(wù)器的任何確認(rèn),該記錄將立即添加到套接字緩沖區(qū)并視為已發(fā)送。在這種情況下,無(wú)法保證服務(wù)器已收到記錄,并且重試配置將不會(huì)生效(因?yàn)榭蛻舳送ǔ2粫?huì)知道任何故障),為每條記錄返回的偏移量始終設(shè)置為-1。
      #acks = 1 這意味著leader會(huì)將記錄寫入其本地日志,但無(wú)需等待所有副本服務(wù)器的完全確認(rèn)即可做出回應(yīng),在這種情況下,如果leader在確認(rèn)記錄后立即失敗,但在將數(shù)據(jù)復(fù)制到所有的副本服務(wù)器之前,則記錄將會(huì)丟失。
      #acks = all 這意味著leader將等待完整的同步副本集以確認(rèn)記錄,這保證了只要至少一個(gè)同步副本服務(wù)器仍然存活,記錄就不會(huì)丟失,這是最強(qiáng)有力的保證,這相當(dāng)于acks = -1的設(shè)置。
      #可以設(shè)置的值為:all, -1, 0, 1
      acks: 1
    consumer:
      group-id: testGroup
      # smallest和largest才有效,如果smallest重新0開(kāi)始讀取,如果是largest從logfile的offset讀取。一般情況下我們都是設(shè)置smallest
      auto-offset-reset: earliest
      # 設(shè)置自動(dòng)提交offset
      enable-auto-commit: true
      max-poll-records: 2
server:
  port: 8060

消費(fèi)kafka消息

kafka支持的消費(fèi)模式,設(shè)置在AbstractMessageListenerContainer.AckMode的枚舉中,下面就介紹下各個(gè)模式的區(qū)別

	/**
	 * The offset commit behavior enumeration.
	 */
	public enum AckMode {

		/**
		 * Commit after each record is processed by the listener.
		 */
		RECORD,

		/**
		 * Commit whatever has already been processed before the next poll.
		 */
		BATCH,

		/**
		 * Commit pending updates after
		 * {@link ContainerProperties#setAckTime(long) ackTime} has elapsed.
		 */
		TIME,

		/**
		 * Commit pending updates after
		 * {@link ContainerProperties#setAckCount(int) ackCount} has been
		 * exceeded.
		 */
		COUNT,

		/**
		 * Commit pending updates after
		 * {@link ContainerProperties#setAckCount(int) ackCount} has been
		 * exceeded or after {@link ContainerProperties#setAckTime(long)
		 * ackTime} has elapsed.
		 */
		COUNT_TIME,

		/**
		 * User takes responsibility for acks using an
		 * {@link AcknowledgingMessageListener}.
		 */
		MANUAL,

		/**
		 * User takes responsibility for acks using an
		 * {@link AcknowledgingMessageListener}. The consumer
		 * immediately processes the commit.
		 */
		MANUAL_IMMEDIATE,

	}

AckMode模式

AckMode模式 作用
MANUAL 當(dāng)每一批poll()的數(shù)據(jù)被消費(fèi)者監(jiān)聽(tīng)器(ListenerConsumer)處理之后, 手動(dòng)調(diào)用Acknowledgment.acknowledge()后提交
MANUAL_IMMEDIATE 手動(dòng)調(diào)用Acknowledgment.acknowledge()后立即提交
RECORD 當(dāng)每一條記錄被消費(fèi)者監(jiān)聽(tīng)器(ListenerConsumer)處理之后提交
BATCH 當(dāng)每一批poll()的數(shù)據(jù)被消費(fèi)者監(jiān)聽(tīng)器(ListenerConsumer)處理之后提交
TIME 當(dāng)每一批poll()的數(shù)據(jù)被消費(fèi)者監(jiān)聽(tīng)器(ListenerConsumer)處理之后,距離上次提交時(shí)間大于TIME時(shí)提交
COUNT 當(dāng)每一批poll()的數(shù)據(jù)被消費(fèi)者監(jiān)聽(tīng)器(ListenerConsumer)處理之后,被處理record數(shù)量大于等于COUNT時(shí)提交
COUNT_TIME TIME或COUNT?有一個(gè)條件滿足時(shí)提交

監(jiān)聽(tīng)器工廠的配置類:

/**
 * kafka消費(fèi)者配置
 */
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String servers;
    //會(huì)話過(guò)期時(shí)長(zhǎng),consumer通過(guò)ConsumerCoordinator間歇性發(fā)送心跳
    //超期后,會(huì)被認(rèn)為consumer失效,服務(wù)遷移到其他consumer節(jié)點(diǎn).(group)
    //需要注意,Coordinator與kafkaConsumer共享底層通道,也是基于poll獲取協(xié)調(diào)事件,但是會(huì)在單獨(dú)的線程中
    @Value("${spring.kafka.consumer.session.timeout}")
    private String sessionTimeout;

    @Value("${spring.kafka.consumer.concurrency}")
    private int concurrency;
    //單次最多允許poll的消息條數(shù).
    //此值不建議過(guò)大,應(yīng)該考慮你的業(yè)務(wù)處理效率.
    @Value("${spring.kafka.consumer.maxpoll.records}")
    private int maxPollRecords;
    //兩次poll之間的時(shí)間隔間最大值,如果超過(guò)此值將會(huì)被認(rèn)為此consumer失效,觸發(fā)consumer重新平衡.
    //此值必須大于,一個(gè)batch所有消息處理時(shí)間總和.
    //最大500000
    //2分鐘
    @Value("${spring.kafka.consumer.maxpoll.interval}")
    private int maxPollIntervalMS;

    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;


    @Bean
    public StringJsonMessageConverter converter() {
        return new StringJsonMessageConverter();
    }

    @Bean
    public KafkaListenerContainerFactory<?> batchDataFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        Map<String, Object> consumerConfig =  consumerConfigs();
        consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        ConsumerFactory<String, String>  consumerFactory = new DefaultKafkaConsumerFactory<>(consumerConfig);
        factory.setConsumerFactory(consumerFactory);
        factory.setConcurrency(concurrency);
        //設(shè)置為批量消費(fèi),每個(gè)批次數(shù)量在Kafka配置參數(shù)中設(shè)置ConsumerConfig.MAX_POLL_RECORDS_CONFIG
        factory.setBatchListener(true);
        factory.setMessageConverter(new BatchMessagingMessageConverter());
        //設(shè)置提交偏移量的方式
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }


    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        //每一批數(shù)量
        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, this.maxPollRecords);
        propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,this.maxPollIntervalMS);
        return propsMap;
    }

    @Bean
    public TestMessages listener() {
        return new TestMessages();
    }

}

監(jiān)聽(tīng)器使用的配置

@Component
public class TestMessages {

    /**
     * MANUAL   當(dāng)每一批poll()的數(shù)據(jù)被消費(fèi)者監(jiān)聽(tīng)器(ListenerConsumer)處理之后, 手動(dòng)調(diào)用Acknowledgment.acknowledge()后提交
     * @param message
     * @param ack
     */
    @KafkaListener(containerFactory = "batchDataFactory" , topics = "kafka(topic名稱)")
    public void onMessageManual(List<Object> message, Acknowledgment ack){
        log.info("batchDataFactory處理數(shù)據(jù)量:{}",message.size());
        message.forEach(item -> log.info("batchDataFactory處理數(shù)據(jù)內(nèi)容:{}",item));
        ack.acknowledge();//直接提交offset
    }

}

MANUAL_IMMEDIATE

當(dāng)每一條記錄被消費(fèi)者監(jiān)聽(tīng)器(ListenerConsumer)處理之后提交

MANUAL和MANUAL_IMMEDIATE兩者的相同和區(qū)別


相同之處
這兩種模式都是需要進(jìn)行手動(dòng)確認(rèn)ack.acknowledge();才能完成消息的消費(fèi),否則在重啟消費(fèi)端實(shí)例的時(shí)候數(shù)據(jù)會(huì)再次被消費(fèi)端接收到。

兩者的區(qū)別
MANUAL: 在處理完最后一次輪詢的所有結(jié)果后,將隊(duì)列排隊(duì),并在一次操作中提交偏移量??梢哉J(rèn)為是在批處理結(jié)束時(shí)提交偏移量
MANUAL_IMMEDIATE:只要在偵聽(tīng)器線程上執(zhí)行確認(rèn),就立即提交偏移。會(huì)在批量執(zhí)行的時(shí)候逐一提交它們。

其他模式大家都可以在批量處理工廠類中進(jìn)行修改設(shè)置:

    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);

上面的內(nèi)容可能存在沒(méi)有描述清楚或者錯(cuò)誤的地方,假如開(kāi)發(fā)同學(xué)發(fā)現(xiàn)了,請(qǐng)及時(shí)告知,我會(huì)第一時(shí)間修改相關(guān)內(nèi)容。假如我的這篇內(nèi)容對(duì)你有任何幫助的話,麻煩給我點(diǎn)一個(gè)贊。你的點(diǎn)贊就是我前進(jìn)的動(dòng)力。文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-564495.html

到了這里,關(guān)于Spring Boot 整合kafka消費(fèi)模式AckMode以及手動(dòng)消費(fèi) 依賴管理的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • springboot整合rabbitmq的發(fā)布確認(rèn),消費(fèi)者手動(dòng)返回ack,設(shè)置備用隊(duì)列,以及面試題:rabbitmq確保消息不丟失

    springboot整合rabbitmq的發(fā)布確認(rèn),消費(fèi)者手動(dòng)返回ack,設(shè)置備用隊(duì)列,以及面試題:rabbitmq確保消息不丟失

    目錄 1.生產(chǎn)者發(fā)消息到交換機(jī)時(shí)候的消息確認(rèn) 2.交換機(jī)給隊(duì)列發(fā)消息時(shí)候的消息確認(rèn) 3.備用隊(duì)列 3.消費(fèi)者手動(dòng)ack ? rabbitmq的發(fā)布確認(rèn)方式,可以有效的保證我們的數(shù)據(jù)不丟失。 ? 消息正常發(fā)送的流程是:生產(chǎn)者發(fā)送消息到交換機(jī),然后交換機(jī)通過(guò)路由鍵把消息發(fā)送給對(duì)應(yīng)的隊(duì)

    2024年02月09日
    瀏覽(28)
  • spring boot學(xué)習(xí)第八篇:kafka監(jiān)聽(tīng)消費(fèi)

    spring boot學(xué)習(xí)第八篇:kafka監(jiān)聽(tīng)消費(fèi)

    為了實(shí)現(xiàn)監(jiān)聽(tīng)器功能 pom.xml文件內(nèi)容如下: ?application.yml文件內(nèi)容如下: logback.xml文件內(nèi)容如下: BackendApplication.java文件內(nèi)容如下: 然后添加了kafkaConsumerListenerExample.java文件 發(fā)到服務(wù)器上,啟動(dòng)hmblogs報(bào)錯(cuò),截圖如下: Caused by: java.lang.TypeNotPresentException: Type org.springframework.k

    2024年01月19日
    瀏覽(40)
  • Spring Boot 整合 Kafka

    環(huán)境:自行創(chuàng)建 Spring Boot 項(xiàng)目,添加測(cè)試依賴,并啟動(dòng) Zookeeper 和 kafka 服務(wù)。 注意:Zookeeper 默認(rèn)好像占用 8080 端口,自己注意端口占用問(wèn)題。 1. 添加依賴 2. 添加配置 3. 創(chuàng)建消息生產(chǎn)者 4. 創(chuàng)建消息消費(fèi)者 5. 消息發(fā)送測(cè)試

    2023年04月11日
    瀏覽(22)
  • Kafka篇——Kafka消費(fèi)者端常見(jiàn)配置,涵蓋自動(dòng)手動(dòng)提交offset、poll消息細(xì)節(jié)、健康狀態(tài)檢查、新消費(fèi)組消費(fèi)offset規(guī)則以及指定分區(qū)等技術(shù)點(diǎn)配置,全面無(wú)死角,一篇文章拿下!

    Kafka篇——Kafka消費(fèi)者端常見(jiàn)配置,涵蓋自動(dòng)手動(dòng)提交offset、poll消息細(xì)節(jié)、健康狀態(tài)檢查、新消費(fèi)組消費(fèi)offset規(guī)則以及指定分區(qū)等技術(shù)點(diǎn)配置,全面無(wú)死角,一篇文章拿下!

    一、自動(dòng)提交offset 1、概念 Kafka中默認(rèn)是自動(dòng)提交offset。消費(fèi)者在poll到消息后默認(rèn)情況下,會(huì)自動(dòng)向Broker的_consumer_offsets主題提交當(dāng)前 主題-分區(qū)消費(fèi)的偏移量 2、自動(dòng)提交offset和手動(dòng)提交offset流程圖 3、在Java中實(shí)現(xiàn)配置 4、自動(dòng)提交offset問(wèn)題 自動(dòng)提交會(huì)丟消息。因?yàn)槿绻M(fèi)

    2024年01月22日
    瀏覽(22)
  • spring-kafka中ContainerProperties.AckMode詳解

    ??近期,我們線上遇到了一個(gè)性能問(wèn)題,幾乎快引起線上故障,后來(lái)僅僅是修改了一行代碼,性能就提升了幾十倍。一行代碼幾十倍,數(shù)據(jù)聽(tīng)起來(lái)很夸張,不過(guò)這是真實(shí)的數(shù)據(jù),線上錯(cuò)誤的配置的確有可能導(dǎo)致性能有數(shù)量級(jí)上的差異,等我說(shuō)完我們這個(gè)性能問(wèn)題你就清楚了

    2024年02月08日
    瀏覽(21)
  • Spring Boot中使用Kafka時(shí)遇到“構(gòu)建Kafka消費(fèi)者失敗“的問(wèn)題

    在使用Spring Boot開(kāi)發(fā)應(yīng)用程序時(shí),集成Apache Kafka作為消息隊(duì)列是一種常見(jiàn)的做法。然而,有時(shí)候在配置和使用Kafka時(shí)可能會(huì)遇到一些問(wèn)題。本文將探討在Spring Boot應(yīng)用程序中使用Kafka時(shí)可能遇到的\\\"構(gòu)建Kafka消費(fèi)者失敗\\\"錯(cuò)誤,并提供解決方案。 錯(cuò)誤描述: 當(dāng)嘗試構(gòu)建Kafka消費(fèi)者時(shí)

    2024年01月17日
    瀏覽(23)
  • MQTT協(xié)議-EMQX技術(shù)文檔-spring-boot整合使用--發(fā)送接收-消費(fèi)

    MQTT協(xié)議-EMQX技術(shù)文檔-spring-boot整合使用--發(fā)送接收-消費(fèi)

    MQTT(Message Queuing Telemetry Transport)是一種基于發(fā)布/訂閱模式的通信協(xié)議,它與MQ(Message Queue,消息隊(duì)列)有一定的關(guān)聯(lián),但二者并不完全相同。 MQTT是一種輕量級(jí)的通信協(xié)議,專門為在物聯(lián)網(wǎng)(IoT)設(shè)備之間的消息傳遞而設(shè)計(jì)。它運(yùn)行在TCP協(xié)議之上,以“發(fā)布-訂閱”模式進(jìn)行

    2024年02月12日
    瀏覽(21)
  • Spring整合RabbitMQ-配制文件方式-2-推模式消費(fèi)者

    推模式的消費(fèi)者 在推模式中使用可以兩種實(shí)現(xiàn): 使用ChannelAwareMessageListener. 除消息外,還提供了Channel這個(gè)對(duì)象,通過(guò)channel可以有更大的靈活性。 使用MessageListener 基本的消息的臨時(shí)。普通的場(chǎng)景基本夠用。 此處以ChannelAwareMessageListener為樣例: spring-rabbit.xml 容器啟動(dòng)類 首先

    2024年02月09日
    瀏覽(20)
  • Spring Boot進(jìn)階(27):Spring Boot 整合 kafka(環(huán)境搭建+演示) | 超級(jí)詳細(xì),建議收藏

    Spring Boot進(jìn)階(27):Spring Boot 整合 kafka(環(huán)境搭建+演示) | 超級(jí)詳細(xì),建議收藏

    ? ? ? ?在現(xiàn)代互聯(lián)網(wǎng)應(yīng)用中,消息驅(qū)動(dòng)已經(jīng)成為一種不可或缺的開(kāi)發(fā)模式。而Kafka作為一款高性能的分布式消息系統(tǒng),已經(jīng)成為很多公司在消息驅(qū)動(dòng)架構(gòu)中的首選工具。本篇文章將介紹如何使用Spring Boot和Kafka快速構(gòu)建消息驅(qū)動(dòng)應(yīng)用,讓你在開(kāi)發(fā)過(guò)程中輕松應(yīng)對(duì)高并發(fā)的消息

    2024年02月05日
    瀏覽(34)
  • 使用 Spring Boot 整合 Kafka:實(shí)現(xiàn)高效的消息傳遞

    Kafka 是一種流處理平臺(tái),用于在分布式系統(tǒng)中處理高吞吐量的數(shù)據(jù)流。它是一種基于發(fā)布訂閱模式的消息系統(tǒng),能夠處理來(lái)自多個(gè)應(yīng)用程序的數(shù)據(jù)流。Kafka 具有高度的可擴(kuò)展性、可靠性和性能,使得它成為處理大數(shù)據(jù)的流行選擇。 Spring Boot 是一種開(kāi)源框架,用于簡(jiǎn)化 Java 應(yīng)用

    2024年02月14日
    瀏覽(23)

覺(jué)得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包