国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

kafka配置多個(gè)消費(fèi)者groupid kafka多個(gè)消費(fèi)者消費(fèi)同一個(gè)partition(java)

這篇具有很好參考價(jià)值的文章主要介紹了kafka配置多個(gè)消費(fèi)者groupid kafka多個(gè)消費(fèi)者消費(fèi)同一個(gè)partition(java)。希望對大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

kafka是由Apache軟件基金會開發(fā)的一個(gè)開源流處理平臺。kafka是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng),它可以處理消費(fèi)者在網(wǎng)站中的所有動作流數(shù)據(jù)。
kafka中partition類似數(shù)據(jù)庫中的分表數(shù)據(jù),可以起到水平擴(kuò)展數(shù)據(jù)的目的,比如有a,b,c,d,e,f 6個(gè)數(shù)據(jù),某個(gè)topic有兩個(gè)partition,一般情況下partition-0存儲a,c,e3個(gè)數(shù)據(jù),partition-1存儲b,d,f另外3個(gè)數(shù)據(jù)。

1- 單播模式,只有一個(gè)消費(fèi)者組

topic只有1個(gè)partition,該組內(nèi)有多個(gè)消費(fèi)者時(shí),此時(shí)同一個(gè)partition內(nèi)的消息只能被該組中的一個(gè)consumer消費(fèi)。當(dāng)消費(fèi)者數(shù)量多于partition數(shù)量時(shí),多余的消費(fèi)者是處于空閑狀態(tài)的,如圖1所示。topic,test只有一個(gè)partition,并且只有1個(gè)group,G1,該group內(nèi)有多個(gè)consumer,只能被其中一個(gè)消費(fèi)者消費(fèi),其他的處于空閑狀態(tài)。

kafka 多個(gè)消費(fèi)者組,kafka,kafka,java,分布式
該topic有多個(gè)partition,該組內(nèi)有多個(gè)消費(fèi)者,比如test 有3個(gè)partition,該組內(nèi)有2個(gè)消費(fèi)者,那么可能就是C0對應(yīng)消費(fèi)p0,p1內(nèi)的數(shù)據(jù),c1對應(yīng)消費(fèi)p2的數(shù)據(jù);如果有3個(gè)消費(fèi)者,就是一個(gè)消費(fèi)者對應(yīng)消費(fèi)一個(gè)partition內(nèi)的數(shù)據(jù)了。圖解分別如圖2,圖3.這種模式在集群模式下使用是非常普遍的,比如我們可以起3個(gè)服務(wù),對應(yīng)的topic設(shè)置3個(gè)partiition,這樣就可以實(shí)現(xiàn)并行消費(fèi),大大提高處理消息的效率。

kafka 多個(gè)消費(fèi)者組,kafka,kafka,java,分布式
kafka 多個(gè)消費(fèi)者組,kafka,kafka,java,分布式

2- 廣播模式,多個(gè)消費(fèi)者組

如果想實(shí)現(xiàn)廣播的模式就需要設(shè)置多個(gè)消費(fèi)者組,這樣當(dāng)一個(gè)消費(fèi)者組消費(fèi)完這個(gè)消息后,絲毫不影響其他組內(nèi)的消費(fèi)者進(jìn)行消費(fèi),這就是廣播的概念。

多個(gè)消費(fèi)者組,1個(gè)partition;
該topic內(nèi)的數(shù)據(jù)被多個(gè)消費(fèi)者組同時(shí)消費(fèi),當(dāng)某個(gè)消費(fèi)者組有多個(gè)消費(fèi)者時(shí)也只能被一個(gè)消費(fèi)者消費(fèi).

kafka 多個(gè)消費(fèi)者組,kafka,kafka,java,分布式

多個(gè)消費(fèi)者組,多個(gè)partition

該topic內(nèi)的數(shù)據(jù)可被多個(gè)消費(fèi)者組多次消費(fèi),在一個(gè)消費(fèi)者組內(nèi),每個(gè)消費(fèi)者又可對應(yīng)該topic內(nèi)的一個(gè)或者多個(gè)partition并行消費(fèi),如圖

kafka 多個(gè)消費(fèi)者組,kafka,kafka,java,分布式

3- Java實(shí)踐

這里使用Java服務(wù)進(jìn)行實(shí)踐,模擬2個(gè)parition,然后同一個(gè)組內(nèi)有2個(gè)消費(fèi)者的情況:

首先創(chuàng)建一個(gè)發(fā)送消息的controller方法:

@ApiOperation(value = "向具有kafka-2個(gè)partition的topic發(fā)送信息")
    @RequestMapping(value = "/testSendMessage2", method = RequestMethod.POST)
    public String testSendMessage(@RequestParam("msg") String msg) {
        KafkaTemplate.send(KafkaTopicEnum.TEST_TWO_PARTITION_MSG.code,msg);
        System.out.println("發(fā)送的消息是:"+msg);
        return "2個(gè)partition的topic數(shù)據(jù)!--ok";
    }

然后再創(chuàng)建一個(gè)監(jiān)聽類監(jiān)聽該topic,這里的監(jiān)聽類即為消費(fèi)者。

/**
     * @date 2020-09-24
     * 兩個(gè)partition的topic,同一個(gè)組的兩個(gè)消費(fèi)者就可以并行的消費(fèi)了,需要kafka也是集群才行,單機(jī)版并不支持
     * @param consumerRecord
     * @param acknowledgment
     */
    @KafkaListener(topics = "two-partition-msg",groupId ="serverGroup1",containerFactory = "ackContainerFactory")
    public void receiveKafkaTwoParMsg(ConsumerRecord<?,?> consumerRecord, Acknowledgment acknowledgment){
        InetAddress address = null;
        try {
            address = InetAddress.getLocalHost();
        } catch (UnknownHostException e) {
            e.printStackTrace();
        }
        System.out.println("當(dāng)前的IP地址是:"+address.getHostAddress());
        System.out.println("監(jiān)聽服務(wù)A-收到的消息是::");
        System.out.println(consumerRecord.value().toString());
        System.out.println("=================== end =================");
//        ack 提交掉,避免服務(wù)重啟再次拉取到消息
        acknowledgment.acknowledge();
    }

然后我們給該服務(wù)起2個(gè)實(shí)例,即模擬該組內(nèi)serverGroup1內(nèi)的2個(gè)消費(fèi)者,然后我們使用測試方法進(jìn)行測試,向該topic內(nèi)發(fā)送多個(gè)消息,觀察2個(gè)實(shí)例的輸出日志:

實(shí)例1:

發(fā)送的消息是:111
當(dāng)前的IP地址是:10.244.3.114
監(jiān)聽服務(wù)A-收到的消息是::
“111=================== end =================
發(fā)送的消息是:222
發(fā)送的消息是:333
當(dāng)前的IP地址是:10.244.3.114
監(jiān)聽服務(wù)A-收到的消息是::
“333=================== end =================
發(fā)送的消息是:444
發(fā)送的消息是:555
當(dāng)前的IP地址是:10.244.3.114
監(jiān)聽服務(wù)A-收到的消息是::
“555=================== end =================
發(fā)送的消息是:666
發(fā)送的消息是:777
當(dāng)前的IP地址是:10.244.3.114
監(jiān)聽服務(wù)A-收到的消息是::
“777=================== end =================
發(fā)送的消息是:888
發(fā)送的消息是:999
當(dāng)前的IP地址是:10.244.3.114
監(jiān)聽服務(wù)A-收到的消息是::
“999-----------------------------------

實(shí)例2:

當(dāng)前的IP地址是:10.244.0.237
監(jiān)聽服務(wù)A-收到的消息是::
“222=================== end =================
當(dāng)前的IP地址是:10.244.0.237
監(jiān)聽服務(wù)A-收到的消息是::
“444=================== end =================
當(dāng)前的IP地址是:10.244.0.237
監(jiān)聽服務(wù)A-收到的消息是::
“666=================== end =================
當(dāng)前的IP地址是:10.244.0.237
監(jiān)聽服務(wù)A-收到的消息是::
“888”
發(fā)現(xiàn)該組內(nèi)的一個(gè)消費(fèi)者消費(fèi)到了111,333,555,777,999 ,另外一個(gè)消費(fèi)者消費(fèi)到了222,444,666,888,起到了均衡消費(fèi)的效果。

所以在微服務(wù)的集群中,我們可以通過給topic設(shè)置多個(gè)partition,然后讓每一個(gè)實(shí)例對應(yīng)消費(fèi)1個(gè)partition的數(shù)據(jù),從而實(shí)現(xiàn)并行的處理數(shù)據(jù),可以顯著地提高處理消息的速度。文章來源地址http://www.zghlxwxcb.cn/news/detail-815342.html

到了這里,關(guān)于kafka配置多個(gè)消費(fèi)者groupid kafka多個(gè)消費(fèi)者消費(fèi)同一個(gè)partition(java)的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 【Kafka】【十七】消費(fèi)者poll消息的細(xì)節(jié)與消費(fèi)者心跳配置

    默認(rèn)情況下,消費(fèi)者?次會poll500條消息。 代碼中設(shè)置了?輪詢的時(shí)間是1000毫秒 意味著: 如果?次poll到500條,就直接執(zhí)?for循環(huán) 如果這?次沒有poll到500條。且時(shí)間在1秒內(nèi),那么?輪詢繼續(xù)poll,要么到500條,要么到1s 如果多次poll都沒達(dá)到500條,且1秒時(shí)間到了,那么直接執(zhí)

    2024年02月09日
    瀏覽(30)
  • Kafka:消費(fèi)者參數(shù)配置

    maven配置 springboot配置類 配置文件 參數(shù)配置列表 屬性 說明 bootstrap.servers 向Kafka集群建立初始連接用到的host/port列表。 客戶端會使用這里列出的所有服務(wù)器進(jìn)行集群其他服務(wù)器的發(fā)現(xiàn),而不管 是否指定了哪個(gè)服務(wù)器用作引導(dǎo)。 這個(gè)列表僅影響用來發(fā)現(xiàn)集群所有服務(wù)器的初始

    2024年02月09日
    瀏覽(28)
  • 關(guān)于kafka消費(fèi)者超時(shí)配置

    在Kafka中,消費(fèi)者超時(shí)配置是指消費(fèi)者在等待服務(wù)器響應(yīng)時(shí)的超時(shí)時(shí)間。如果消費(fèi)者在超時(shí)時(shí)間內(nèi)未收到服務(wù)器的響應(yīng),它將重新發(fā)起請求或執(zhí)行其他邏輯。 以下是關(guān)于Kafka消費(fèi)者超時(shí)配置的一些常見選項(xiàng): session.timeout.ms :該配置定義了消費(fèi)者與Kafka集群之間的會話超時(shí)時(shí)間

    2024年02月16日
    瀏覽(22)
  • 分布式 - 消息隊(duì)列Kafka:Kafka 消費(fèi)者消息消費(fèi)與參數(shù)配置

    分布式 - 消息隊(duì)列Kafka:Kafka 消費(fèi)者消息消費(fèi)與參數(shù)配置

    01. 創(chuàng)建消費(fèi)者 在讀取消息之前,需要先創(chuàng)建一個(gè)KafkaConsumer對象。創(chuàng)建KafkaConsumer對象與創(chuàng)建KafkaProducer對象非常相似——把想要傳給消費(fèi)者的屬性放在Properties對象里。 為簡單起見,這里只提供4個(gè)必要的屬性:bootstrap.servers、key.deserializer 和 value.deserializer。 ① bootstrap.servers 指

    2024年02月12日
    瀏覽(27)
  • Kafka消費(fèi)者常用超時(shí)時(shí)間配置

    https://blog.csdn.net/BHSZZY/article/details/126757295 //心跳超時(shí)時(shí)間(session超時(shí)時(shí)間)增加成25秒(之前項(xiàng)目設(shè)置了15秒) spring.kafka.properties.session.timeout.ms = 25000 //每次拉取的消息減少為20(之前是默認(rèn)值500) spring.kafka.consumer.max-poll-records=20 //消息消費(fèi)超時(shí)時(shí)間增加為10分鐘 spring.kafka.p

    2024年02月03日
    瀏覽(25)
  • Kafka系列——詳解創(chuàng)建Kafka消費(fèi)者及相關(guān)配置

    參考自kafka系列文章——消費(fèi)者創(chuàng)建與配置 在讀取消息之前,需要先創(chuàng)建一個(gè) KafkaConsumer 對象。 創(chuàng)建 KafkaConsumer 對象與創(chuàng)建 KafkaProducer 對象非常相似——把想要傳給消費(fèi)者的屬性放在 Properties 對象里,后面深入討論所有屬性。這里我們只需要使用 3 個(gè)必要的屬性: bootstrap.

    2024年02月09日
    瀏覽(21)
  • kafka生產(chǎn)者和消費(fèi)者配置介紹

    每個(gè)kafka broker中配置文件 server.properties 默認(rèn)必須配置的屬性如下: **bootstrap.servers** - 指定生產(chǎn)者客戶端連接kafka集群所需的broker地址列表,格式為host1:port1,host2:port2,可以設(shè)置一個(gè)或多個(gè)。這里并非需要所有的broker地址,因?yàn)樯a(chǎn)者會從給定的broker里尋找其它的broker。 **key

    2024年02月12日
    瀏覽(23)
  • kafka配置大全broker、topic、生產(chǎn)者和消費(fèi)者等配置介紹

    每個(gè)kafka broker中配置文件 server.properties 默認(rèn)必須配置的屬性如下: **bootstrap.servers** - 指定生產(chǎn)者客戶端連接kafka集群所需的broker地址列表,格式為host1:port1,host2:port2,可以設(shè)置一個(gè)或多個(gè)。這里并非需要所有的broker地址,因?yàn)樯a(chǎn)者會從給定的broker里尋找其它的broker。 **key

    2024年02月05日
    瀏覽(40)
  • Kafka-Java四:Spring配置Kafka消費(fèi)者提交Offset的策略

    Kafka消費(fèi)者提交Offset的策略有 自動提交Offset: 消費(fèi)者將消息拉取下來以后未被消費(fèi)者消費(fèi)前,直接自動提交offset。 自動提交可能丟失數(shù)據(jù),比如消息在被消費(fèi)者消費(fèi)前已經(jīng)提交了offset,有可能消息拉取下來以后,消費(fèi)者掛了 手動提交Offset 消費(fèi)者在消費(fèi)消息時(shí)/后,再提交o

    2024年02月08日
    瀏覽(22)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包