kafka是由Apache軟件基金會開發(fā)的一個(gè)開源流處理平臺。kafka是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng),它可以處理消費(fèi)者在網(wǎng)站中的所有動作流數(shù)據(jù)。
kafka中partition類似數(shù)據(jù)庫中的分表數(shù)據(jù),可以起到水平擴(kuò)展數(shù)據(jù)的目的,比如有a,b,c,d,e,f 6個(gè)數(shù)據(jù),某個(gè)topic有兩個(gè)partition,一般情況下partition-0存儲a,c,e3個(gè)數(shù)據(jù),partition-1存儲b,d,f另外3個(gè)數(shù)據(jù)。
1- 單播模式,只有一個(gè)消費(fèi)者組
topic只有1個(gè)partition,該組內(nèi)有多個(gè)消費(fèi)者時(shí),此時(shí)同一個(gè)partition內(nèi)的消息只能被該組中的一個(gè)consumer消費(fèi)。當(dāng)消費(fèi)者數(shù)量多于partition數(shù)量時(shí),多余的消費(fèi)者是處于空閑狀態(tài)的,如圖1所示。topic,test只有一個(gè)partition,并且只有1個(gè)group,G1,該group內(nèi)有多個(gè)consumer,只能被其中一個(gè)消費(fèi)者消費(fèi),其他的處于空閑狀態(tài)。
該topic有多個(gè)partition,該組內(nèi)有多個(gè)消費(fèi)者,比如test 有3個(gè)partition,該組內(nèi)有2個(gè)消費(fèi)者,那么可能就是C0對應(yīng)消費(fèi)p0,p1內(nèi)的數(shù)據(jù),c1對應(yīng)消費(fèi)p2的數(shù)據(jù);如果有3個(gè)消費(fèi)者,就是一個(gè)消費(fèi)者對應(yīng)消費(fèi)一個(gè)partition內(nèi)的數(shù)據(jù)了。圖解分別如圖2,圖3.這種模式在集群模式下使用是非常普遍的,比如我們可以起3個(gè)服務(wù),對應(yīng)的topic設(shè)置3個(gè)partiition,這樣就可以實(shí)現(xiàn)并行消費(fèi),大大提高處理消息的效率。
2- 廣播模式,多個(gè)消費(fèi)者組
如果想實(shí)現(xiàn)廣播的模式就需要設(shè)置多個(gè)消費(fèi)者組,這樣當(dāng)一個(gè)消費(fèi)者組消費(fèi)完這個(gè)消息后,絲毫不影響其他組內(nèi)的消費(fèi)者進(jìn)行消費(fèi),這就是廣播的概念。
多個(gè)消費(fèi)者組,1個(gè)partition;
該topic內(nèi)的數(shù)據(jù)被多個(gè)消費(fèi)者組同時(shí)消費(fèi),當(dāng)某個(gè)消費(fèi)者組有多個(gè)消費(fèi)者時(shí)也只能被一個(gè)消費(fèi)者消費(fèi).
多個(gè)消費(fèi)者組,多個(gè)partition
該topic內(nèi)的數(shù)據(jù)可被多個(gè)消費(fèi)者組多次消費(fèi),在一個(gè)消費(fèi)者組內(nèi),每個(gè)消費(fèi)者又可對應(yīng)該topic內(nèi)的一個(gè)或者多個(gè)partition并行消費(fèi),如圖
3- Java實(shí)踐
這里使用Java服務(wù)進(jìn)行實(shí)踐,模擬2個(gè)parition,然后同一個(gè)組內(nèi)有2個(gè)消費(fèi)者的情況:
首先創(chuàng)建一個(gè)發(fā)送消息的controller方法:
@ApiOperation(value = "向具有kafka-2個(gè)partition的topic發(fā)送信息")
@RequestMapping(value = "/testSendMessage2", method = RequestMethod.POST)
public String testSendMessage(@RequestParam("msg") String msg) {
KafkaTemplate.send(KafkaTopicEnum.TEST_TWO_PARTITION_MSG.code,msg);
System.out.println("發(fā)送的消息是:"+msg);
return "2個(gè)partition的topic數(shù)據(jù)!--ok";
}
然后再創(chuàng)建一個(gè)監(jiān)聽類監(jiān)聽該topic,這里的監(jiān)聽類即為消費(fèi)者。
/**
* @date 2020-09-24
* 兩個(gè)partition的topic,同一個(gè)組的兩個(gè)消費(fèi)者就可以并行的消費(fèi)了,需要kafka也是集群才行,單機(jī)版并不支持
* @param consumerRecord
* @param acknowledgment
*/
@KafkaListener(topics = "two-partition-msg",groupId ="serverGroup1",containerFactory = "ackContainerFactory")
public void receiveKafkaTwoParMsg(ConsumerRecord<?,?> consumerRecord, Acknowledgment acknowledgment){
InetAddress address = null;
try {
address = InetAddress.getLocalHost();
} catch (UnknownHostException e) {
e.printStackTrace();
}
System.out.println("當(dāng)前的IP地址是:"+address.getHostAddress());
System.out.println("監(jiān)聽服務(wù)A-收到的消息是::");
System.out.println(consumerRecord.value().toString());
System.out.println("=================== end =================");
// ack 提交掉,避免服務(wù)重啟再次拉取到消息
acknowledgment.acknowledge();
}
然后我們給該服務(wù)起2個(gè)實(shí)例,即模擬該組內(nèi)serverGroup1內(nèi)的2個(gè)消費(fèi)者,然后我們使用測試方法進(jìn)行測試,向該topic內(nèi)發(fā)送多個(gè)消息,觀察2個(gè)實(shí)例的輸出日志:
實(shí)例1:
發(fā)送的消息是:111
當(dāng)前的IP地址是:10.244.3.114
監(jiān)聽服務(wù)A-收到的消息是::
“111”
=================== end =================
發(fā)送的消息是:222
發(fā)送的消息是:333
當(dāng)前的IP地址是:10.244.3.114
監(jiān)聽服務(wù)A-收到的消息是::
“333”
=================== end =================
發(fā)送的消息是:444
發(fā)送的消息是:555
當(dāng)前的IP地址是:10.244.3.114
監(jiān)聽服務(wù)A-收到的消息是::
“555”
=================== end =================
發(fā)送的消息是:666
發(fā)送的消息是:777
當(dāng)前的IP地址是:10.244.3.114
監(jiān)聽服務(wù)A-收到的消息是::
“777”
=================== end =================
發(fā)送的消息是:888
發(fā)送的消息是:999
當(dāng)前的IP地址是:10.244.3.114
監(jiān)聽服務(wù)A-收到的消息是::
“999”
-----------------------------------
實(shí)例2:文章來源:http://www.zghlxwxcb.cn/news/detail-815342.html
當(dāng)前的IP地址是:10.244.0.237
監(jiān)聽服務(wù)A-收到的消息是::
“222”
=================== end =================
當(dāng)前的IP地址是:10.244.0.237
監(jiān)聽服務(wù)A-收到的消息是::
“444”
=================== end =================
當(dāng)前的IP地址是:10.244.0.237
監(jiān)聽服務(wù)A-收到的消息是::
“666”
=================== end =================
當(dāng)前的IP地址是:10.244.0.237
監(jiān)聽服務(wù)A-收到的消息是::
“888”
發(fā)現(xiàn)該組內(nèi)的一個(gè)消費(fèi)者消費(fèi)到了111,333,555,777,999 ,另外一個(gè)消費(fèi)者消費(fèi)到了222,444,666,888,起到了均衡消費(fèi)的效果。
所以在微服務(wù)的集群中,我們可以通過給topic設(shè)置多個(gè)partition,然后讓每一個(gè)實(shí)例對應(yīng)消費(fèi)1個(gè)partition的數(shù)據(jù),從而實(shí)現(xiàn)并行的處理數(shù)據(jù),可以顯著地提高處理消息的速度。文章來源地址http://www.zghlxwxcb.cn/news/detail-815342.html
到了這里,關(guān)于kafka配置多個(gè)消費(fèi)者groupid kafka多個(gè)消費(fèi)者消費(fèi)同一個(gè)partition(java)的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!