国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

多個(gè)消費(fèi)者訂閱一個(gè)Kafka的Topic(使用@KafkaListener和KafkaTemplate)

這篇具有很好參考價(jià)值的文章主要介紹了多個(gè)消費(fèi)者訂閱一個(gè)Kafka的Topic(使用@KafkaListener和KafkaTemplate)。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問(wèn)。

記錄:465

場(chǎng)景:一個(gè)Producer在一個(gè)Topic發(fā)布消息,多個(gè)消費(fèi)者Consumer訂閱Kafka的Topic。每個(gè)Consumer指定一個(gè)特定的ConsumerGroup,達(dá)到一條消息被多個(gè)不同的ConsumerGroup消費(fèi)。

版本:JDK 1.8,Spring?Boot 2.6.3,kafka_2.12-2.8.0,spring-kafka-2.8.2。

Kafka集群安裝:https://blog.csdn.net/zhangbeizhen18/article/details/131156084

1.基礎(chǔ)概念

Topic:Kafka根據(jù)Topic對(duì)消息進(jìn)行歸類,發(fā)布到Kafka的每條消息都需要指定一個(gè)Topic。

Producer:消息生產(chǎn)者,向Broker發(fā)送消息的客戶端。

Consumer:消息消費(fèi)者,從Broker讀取消息的客戶端。

ConsumerGroup:每個(gè)Consumer屬于一個(gè)特定的ConsumerGroup,一條消息可以被多個(gè)不同的ConsumerGroup消費(fèi);但是一個(gè)ConsumerGroup中只能有一個(gè)Consumer能夠消費(fèi)該消息。

publish:發(fā)布,使用Producer向Kafka寫入數(shù)據(jù)。

subscribe:訂閱,使用Consumer從Kafka讀取數(shù)據(jù)。

2.微服務(wù)中配置Kafka信息

(1)在pom.xml添加依賴

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
  <version>2.8.2</version>
</dependency>

請(qǐng)知悉:spring-kafka框架底層使用了原生的kafka-clients。本例對(duì)應(yīng)版本:3.0.0。

(2)在application.yml中配置Kafka信息

配置參考官網(wǎng)的configuration:https://kafka.apache.org/documentation/

(1)application.yml配置內(nèi)容

spring:
  kafka:
    #kafka集群的IP和端口,格式:(ip:port)
    bootstrap-servers:
      - 192.168.19.161:29092
      - 192.168.19.162:29092
      - 192.168.19.163:29092
    #生產(chǎn)者
    producer:
      #客戶端發(fā)送服務(wù)端失敗的重試次數(shù)
      retries: 2
      #多個(gè)記錄被發(fā)送到同一個(gè)分區(qū)時(shí),生產(chǎn)者將嘗試將記錄一起批處理成更少的請(qǐng)求.
      #此設(shè)置有助于提高客戶端和服務(wù)器的性能,配置控制默認(rèn)批量大小(以字節(jié)為單位)
      batch-size: 16384
      #生產(chǎn)者可用于緩沖等待發(fā)送到服務(wù)器的記錄的總內(nèi)存字節(jié)數(shù)(以字節(jié)為單位)
      buffer-memory: 33554432
      #指定key使用的序列化類
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      #指定value使用的序列化類
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      #生產(chǎn)者producer要求leader節(jié)點(diǎn)在考慮完成請(qǐng)求之前收到的確認(rèn)數(shù),用于控制發(fā)送記錄在服務(wù)端的持久化
      #acks=0,設(shè)置為0,則生產(chǎn)者producer將不會(huì)等待來(lái)自服務(wù)器的任何確認(rèn).該記錄將立即添加到套接字(socket)緩沖區(qū)并視為已發(fā)送.在這種情況下,無(wú)法保證服務(wù)器已收到記錄,并且重試配置(retries)將不會(huì)生效(因?yàn)榭蛻舳送ǔ2粫?huì)知道任何故障),每條記錄返回的偏移量始終設(shè)置為-1.
      #acks=1,設(shè)置為1,leader節(jié)點(diǎn)會(huì)把記錄寫入本地日志,不需要等待所有follower節(jié)點(diǎn)完全確認(rèn)就會(huì)立即應(yīng)答producer.在這種情況下,在follower節(jié)點(diǎn)復(fù)制前,leader節(jié)點(diǎn)確認(rèn)記錄后立即失敗的話,記錄將會(huì)丟失.
      #acks=all,acks=-1,leader節(jié)點(diǎn)將等待所有同步復(fù)制副本完成再確認(rèn)記錄,這保證了只要至少有一個(gè)同步復(fù)制副本存活,記錄就不會(huì)丟失.
      acks: -1
    consumer:
      #開啟consumer的偏移量(offset)自動(dòng)提交到Kafka
      enable-auto-commit: true
      #consumer的偏移量(offset)自動(dòng)提交的時(shí)間間隔,單位毫秒
      auto-commit-interval: 1000
      #在Kafka中沒有初始化偏移量或者當(dāng)前偏移量不存在情況
      #earliest,在偏移量無(wú)效的情況下,自動(dòng)重置為最早的偏移量
      #latest,在偏移量無(wú)效的情況下,自動(dòng)重置為最新的偏移量
      #none,在偏移量無(wú)效的情況下,拋出異常.
      auto-offset-reset: latest
      #一次調(diào)用poll返回的最大記錄條數(shù)
      max-poll-records: 500
      #請(qǐng)求阻塞的最大時(shí)間(毫秒)
      fetch-max-wait: 500
      #請(qǐng)求應(yīng)答的最小字節(jié)數(shù)
      fetch-min-size: 1
      #心跳間隔時(shí)間(毫秒)
      heartbeat-interval: 3000
      #指定key使用的反序列化類
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      #指定value使用的反序列化類
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

(2)解析

配置類在spring boot自動(dòng)注解包:spring-boot-autoconfigure-2.6.3.jar。

類:org.springframework.boot.autoconfigure.kafka.KafkaProperties。

使用@ConfigurationProperties注解使其生效,前綴是:spring.kafka。

spring-kafka框架對(duì)操作Kafka單機(jī)版和Kafka集群版的配置差異:

在于bootstrap-servers屬性,單機(jī)版配置一個(gè)IP:端口對(duì)。集群版配置多個(gè)IP:端口對(duì)就行。

(3)加載邏輯

Spring Boot微服務(wù)在啟動(dòng)時(shí),Spring Boot會(huì)讀取application.yml的配置信息,根據(jù)配置內(nèi)容在spring-boot-autoconfigure-2.6.3.jar找到KafkaProperties并注入到對(duì)應(yīng)屬性。Spring Boot微服務(wù)在啟動(dòng)完成后,KafkaProperties的配置信息在Spring環(huán)境中就能無(wú)縫使用。

Spring的spring-kafka框架將KafkaProperties配置信息注入到KafkaTemplate操作生產(chǎn)者Producer。

Spring的spring-kafka框架使用KafkaProperties和@KafkaListener操作Kafka的消費(fèi)者Consumer。

3.生產(chǎn)者(ChangjiangDeltaCityProducerController)

(1)示例代碼

@RestController
@RequestMapping("/hub/example/delta/producer")
@Slf4j
public class ChangjiangDeltaCityProducerController {
  //1.注入KafkaTemplate
  @Autowired
  private KafkaTemplate<String, String> kafkaTemplate;
  //2.定義Kafka的Topic
  private final String topicName = "hub-topic-city-delta";
  @GetMapping("/f01_1")
  public Object f01_1(String msgContent) {
    try {
      //3.獲取業(yè)務(wù)數(shù)據(jù)對(duì)象
      String uuid=UUID.randomUUID().toString().replace("-","");
      long now=System.currentTimeMillis();
      String msgKey = "delta" + ":" + uuid + ":" + now;
      MsgDto msgDto = MsgDto.buildDto(uuid,now,msgContent);
      String msgData = JSONObject.toJSONString(msgDto);
      log.info("KafkaProducer向Kafka集群的Topic: {},寫入Key:", topicName);
      log.info(msgKey);
      log.info("KafkaProducer向Kafka集群的Topic: {},寫入Data:", topicName);
      log.info(msgData);
      //4.使用KafkaTemplate向Kafka集群寫入數(shù)據(jù)(topic,key,data)
      kafkaTemplate.send(topicName, msgKey, msgData);
    } catch (Exception e) {
      log.info("Producer寫入Topic異常.");
      e.printStackTrace();
    }
    return "寫入成功";
  }
}

(2)解析代碼

使用KafkaTemplate向Kafka集群的Topic:hub-topic-city-delta寫入JSON字符串?dāng)?shù)據(jù),發(fā)布一條消息,給訂閱的消費(fèi)者消費(fèi)。

4.消費(fèi)者一(HangzhouCityConsumer)

(1)示例代碼

@Component
@Slf4j
public class HangzhouCityConsumer {
  // 1.定義Kafka的Topic
  private final String topicName = "hub-topic-city-delta";
  // 2.使用@KafkaListener監(jiān)聽Kafka集群的Topic
  @KafkaListener(
      topics = {topicName},
      groupId = "hub-topic-city-delta-group-hangzhou")
  public void consumeMsg(ConsumerRecord<?, ?> record) {
    try {
        //3.KafkaConsumer從集群中監(jiān)聽的消息存儲(chǔ)在ConsumerRecord
        String msgKey= (String) record.key();
        String msgData = (String) record.value();
        log.info("HangzhouCityConsumer從Kafka集群中的Topic:{},消費(fèi)的原始數(shù)據(jù)的Key:",topicName);
        log.info(msgKey);
        log.info("HangzhouCityConsumer從Kafka集群中的Topic:{},消費(fèi)的原始數(shù)據(jù)的Data:",topicName);
        log.info(msgData);
    } catch (Exception e) {
        log.info("HangzhouCityConsumer消費(fèi)Topic異常.");
        e.printStackTrace();
    }
  }
}

(2)解析代碼

使用@KafkaListener的屬性topics指定監(jiān)聽的Topic:hub-topic-city-delta。

使用@KafkaListener的屬性groupId 指定消費(fèi)組:hub-topic-city-delta-group-hangzhou。

5.消費(fèi)者二(ShanghaiCityConsumer)

(1)示例代碼

@Component
@Slf4j
public class ShanghaiCityConsumer {
  // 1.定義Kafka的Topic
  private final String topicName = "hub-topic-city-delta";
  // 2.使用@KafkaListener監(jiān)聽Kafka集群的Topic
  @KafkaListener(
          topics = {topicName},
          groupId = "hub-topic-city-delta-group-shanghai")
  public void consumeMsg(ConsumerRecord<?, ?> record) {
    try {
        //3.KafkaConsumer從集群中監(jiān)聽的消息存儲(chǔ)在ConsumerRecord
        String msgKey = (String) record.key();
        String msgData = (String) record.value();
        log.info("ShanghaiCityConsumer從Kafka集群中的Topic:{},消費(fèi)的原始數(shù)據(jù)的Key:", topicName);
        log.info(msgKey);
        log.info("ShanghaiCityConsumer從Kafka集群中的Topic:{},消費(fèi)的原始數(shù)據(jù)的Data:", topicName);
        log.info(msgData);
    } catch (Exception e) {
        log.info("ShanghaiCityConsumer消費(fèi)Topic異常.");
        e.printStackTrace();
    }
  }
}

(2)解析代碼

使用@KafkaListener的屬性topics指定監(jiān)聽的Topic:hub-topic-city-delta。

使用@KafkaListener的屬性groupId 指定消費(fèi)組:hub-topic-city-delta-group-shanghai。

6.測(cè)試

(1)使用Postman測(cè)試,調(diào)用生產(chǎn)者寫入數(shù)據(jù)

請(qǐng)求RUL:http://127.0.0.1:18208/hub-208-kafka/hub/example/delta/producer/f01_1

參數(shù):msgContent="長(zhǎng)三角經(jīng)濟(jì)帶實(shí)力強(qiáng)大"

(2)生產(chǎn)者日志

KafkaProducer向Kafka集群的Topic: hub-topic-city-delta,寫入Key:
delta:b5a669933f4041588d53d53c22888943:1687789723647
KafkaProducer向Kafka集群的Topic: hub-topic-city-delta,寫入Data:
{"msgContent":"長(zhǎng)三角經(jīng)濟(jì)帶實(shí)力強(qiáng)大","publicTime":"2023-06-26 22:28:43","uuid":"b5a669933f4041588d53d53c22888943"}

(3)消費(fèi)者一日志

HangzhouCityConsumer從Kafka集群中的Topic:hub-topic-city-delta,消費(fèi)的原始數(shù)據(jù)的Key:
delta:b5a669933f4041588d53d53c22888943:1687789723647
HangzhouCityConsumer從Kafka集群中的Topic:hub-topic-city-delta,消費(fèi)的原始數(shù)據(jù)的Data:
{"msgContent":"長(zhǎng)三角經(jīng)濟(jì)帶實(shí)力強(qiáng)大","publicTime":"2023-06-26 22:28:43","uuid":"b5a669933f4041588d53d53c22888943"}

(4)消費(fèi)者二日志

ShanghaiCityConsumer從Kafka集群中的Topic:hub-topic-city-delta,消費(fèi)的原始數(shù)據(jù)的Key:
delta:b5a669933f4041588d53d53c22888943:1687789723647
ShanghaiCityConsumer從Kafka集群中的Topic:hub-topic-city-delta,消費(fèi)的原始數(shù)據(jù)的Data:
{"msgContent":"長(zhǎng)三角經(jīng)濟(jì)帶實(shí)力強(qiáng)大","publicTime":"2023-06-26 22:28:43","uuid":"b5a669933f4041588d53d53c22888943"}

(5)結(jié)論

每個(gè)Consumer指定一個(gè)特定的ConsumerGroup,一條消息可以被多個(gè)不同的ConsumerGroup消費(fèi)。

7.輔助類

@Data
@Builder
public class MsgDto implements Serializable {
  private String uuid;
  private String publicTime;
  private String msgContent;
  public static MsgDto buildDto(String uuid,
                      long publicTime,
                      String msgContent) {
      return builder().uuid(uuid)
          .publicTime(DateUtil.formatDateTime(new Date(publicTime)))
          .msgContent(msgContent).build();
  }
}

以上,感謝。

2023年6月26日文章來(lái)源地址http://www.zghlxwxcb.cn/news/detail-552039.html

到了這里,關(guān)于多個(gè)消費(fèi)者訂閱一個(gè)Kafka的Topic(使用@KafkaListener和KafkaTemplate)的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來(lái)自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • Kafka3.0.0版本——消費(fèi)者(獨(dú)立消費(fèi)者消費(fèi)某一個(gè)主題中某個(gè)分區(qū)數(shù)據(jù)案例__訂閱分區(qū))

    Kafka3.0.0版本——消費(fèi)者(獨(dú)立消費(fèi)者消費(fèi)某一個(gè)主題中某個(gè)分區(qū)數(shù)據(jù)案例__訂閱分區(qū))

    1.1、案例需求 創(chuàng)建一個(gè)獨(dú)立消費(fèi)者,消費(fèi)firstTopic主題 0 號(hào)分區(qū)的數(shù)據(jù),所下圖所示: 1.2、案例代碼 生產(chǎn)者往firstTopic主題 0 號(hào)分區(qū)發(fā)送數(shù)據(jù)代碼 消費(fèi)者消費(fèi)firstTopic主題 0 分區(qū)數(shù)據(jù)代碼 1.3、測(cè)試 在 IDEA 中執(zhí)行消費(fèi)者程序,如下圖: 在 IDEA 中執(zhí)行生產(chǎn)者程序 ,在控制臺(tái)觀察

    2024年02月09日
    瀏覽(33)
  • kafka 基礎(chǔ)概念、命令行操作(查看所有topic、創(chuàng)建topic、刪除topic、查看某個(gè)Topic的詳情、修改分區(qū)數(shù)、發(fā)送消息、消費(fèi)消息、 查看消費(fèi)者組 、更新消費(fèi)者的偏移位置)

    kafka 基礎(chǔ)概念、命令行操作(查看所有topic、創(chuàng)建topic、刪除topic、查看某個(gè)Topic的詳情、修改分區(qū)數(shù)、發(fā)送消息、消費(fèi)消息、 查看消費(fèi)者組 、更新消費(fèi)者的偏移位置)

    kafka官網(wǎng) Broker ??一臺(tái)kafka服務(wù)器就是一個(gè)broker,可容納多個(gè)topic。一個(gè)集群由多個(gè)broker組成; Producer ??生產(chǎn)者,即向kafka的broker-list發(fā)送消息的客戶端; Consumer ??消費(fèi)者,即向kafka的broker-list訂閱消息的客戶端; Consumer Group ??消費(fèi)者組是 邏輯上的一個(gè)訂閱者 ,由多個(gè)

    2024年02月01日
    瀏覽(121)
  • Kafka系列:查看Topic列表、消息消費(fèi)情況、模擬生產(chǎn)者消費(fèi)者

    Kafka系列:查看Topic列表、消息消費(fèi)情況、模擬生產(chǎn)者消費(fèi)者

    執(zhí)行topic刪除命令時(shí),出現(xiàn)提示 這條命令其實(shí)并不執(zhí)行刪除動(dòng)作,僅僅是在zookeeper上標(biāo)記該topic要被刪除而已,同時(shí)也提醒用戶一定要提前打開delete.topic.enable開關(guān),否則刪除動(dòng)作是不會(huì)執(zhí)行的。 解決辦法: a)在server.properties中設(shè)置delete.topic.enable參數(shù)為ture b)如下操作: 1.登

    2023年04月26日
    瀏覽(29)
  • Linux安裝Kafka,創(chuàng)建topic、生產(chǎn)者、消費(fèi)者

    Linux安裝Kafka,創(chuàng)建topic、生產(chǎn)者、消費(fèi)者

    1.創(chuàng)建安裝目錄/usr/local/kafka mkdir /usr/local/kafka 2.進(jìn)入安裝包目錄 cd?/usr/local/kafka? 3.下載安裝包 wget https://downloads.apache.org/kafka/3.3.1/kafka_2.12-3.3.1.tgz 4.解壓安裝包 tar -zxvf kafka_2.12-3.3.1.tgz 5.進(jìn)入cd kafka_2.12-3.3.1目錄 cd kafka_2.12-3.3.1/ 6.修改zookeeper配置 cat ./config/zookeeper.properties | grep

    2023年04月17日
    瀏覽(28)
  • Kafka - 主題Topic與消費(fèi)者消息Offset日志記錄機(jī)制

    Kafka - 主題Topic與消費(fèi)者消息Offset日志記錄機(jī)制

    可以根據(jù)業(yè)務(wù)類型,分發(fā)到不同的Topic中,對(duì)于每一個(gè)Topic,下面可以有多個(gè)分區(qū)(Partition)日志文件: kafka 下的Topic的多個(gè)分區(qū),每一個(gè)分區(qū)實(shí)質(zhì)上就是一個(gè)隊(duì)列,將接收到的消息暫時(shí)存儲(chǔ)到隊(duì)列中,根據(jù)配置以及消息消費(fèi)情況來(lái)對(duì)隊(duì)列消息刪除。 可以這么來(lái)理解Topic,Partitio

    2024年02月03日
    瀏覽(22)
  • 如何查看kafka的topic的消費(fèi)者組有沒有積壓

    Kafka 自帶的命令行工具 kafka-consumer-groups.sh 來(lái)查看消費(fèi)者組的消費(fèi)情況,包括是否有積壓。 具體步驟如下: 打開命令行終端,進(jìn)入 Kafka 安裝目錄下的 bin 文件夾。 輸入以下命令,查看消費(fèi)者組的消費(fèi)情況: ./kafka-consumer-groups.sh --bootstrap-server --describe --group kafka-consumer-groups.

    2023年04月18日
    瀏覽(73)
  • kafka配置大全broker、topic、生產(chǎn)者和消費(fèi)者等配置介紹

    每個(gè)kafka broker中配置文件 server.properties 默認(rèn)必須配置的屬性如下: **bootstrap.servers** - 指定生產(chǎn)者客戶端連接kafka集群所需的broker地址列表,格式為host1:port1,host2:port2,可以設(shè)置一個(gè)或多個(gè)。這里并非需要所有的broker地址,因?yàn)樯a(chǎn)者會(huì)從給定的broker里尋找其它的broker。 **key

    2024年02月05日
    瀏覽(40)
  • Kafka系列之:記錄一次Kafka Topic分區(qū)擴(kuò)容,但是下游flink消費(fèi)者沒有自動(dòng)消費(fèi)新的分區(qū)的解決方法

    Kafka系列之:記錄一次Kafka Topic分區(qū)擴(kuò)容,但是下游flink消費(fèi)者沒有自動(dòng)消費(fèi)新的分區(qū)的解決方法

    生產(chǎn)環(huán)境Kafka集群壓力大,Topic讀寫壓力大,消費(fèi)的lag比較大,因此通過(guò)擴(kuò)容Topic的分區(qū),增大Topic的讀寫性能 理論上下游消費(fèi)者應(yīng)該能夠自動(dòng)消費(fèi)到新的分區(qū),例如flume消費(fèi)到了新的分區(qū),但是實(shí)際情況是存在flink消費(fèi)者沒有消費(fèi)到新的分區(qū) 出現(xiàn)無(wú)法消費(fèi)topic新的分區(qū)這種情況

    2024年02月14日
    瀏覽(63)
  • Kafka消費(fèi)者訂閱指定主題(subscribe)或分區(qū)(assign)詳解

    Kafka消費(fèi)者訂閱指定主題(subscribe)或分區(qū)(assign)詳解

    在連接Kafka服務(wù)器消費(fèi)數(shù)據(jù)前,需要?jiǎng)?chuàng)建Kafka消費(fèi)者進(jìn)行拉取數(shù)據(jù),需要配置相應(yīng)的參數(shù),比如設(shè)置消費(fèi)者所屬的消費(fèi)者組名稱、連接的broker服務(wù)器地址、序列號(hào)和反序列化的方式等配置。 更多消費(fèi)者配置可參考官網(wǎng): https://kafka.apache.org/documentation/#consumerconfigs 訂閱主題(s

    2023年04月24日
    瀏覽(26)
  • 筆記:配置多個(gè)kafka生產(chǎn)者和消費(fèi)者

    如果只有一個(gè)kafka,那么使用自帶的KafkaAutoConfiguration配置類即可,對(duì)應(yīng)已有屬性類KafkaProperties,屬性前綴為spring.kafka.xxx; 本文記錄配置多個(gè)kafka的情況,即在KafkaAutoConfiguration的基礎(chǔ)上,自定義額外的kafka生產(chǎn)者和消費(fèi)者。 適用場(chǎng)景:需要消費(fèi)來(lái)源于不同kafka的消息、需要在不

    2024年02月15日
    瀏覽(32)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包