国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Java輕松使用Kafka生產(chǎn)者,消費(fèi)者

這篇具有很好參考價(jià)值的文章主要介紹了Java輕松使用Kafka生產(chǎn)者,消費(fèi)者。希望對大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

Java輕松使用Kafka生產(chǎn)者,消費(fèi)者

一、環(huán)境說明

  1. 項(xiàng)目中需要下面的依賴:(版本自定義)
<dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>1.0.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>1.0.2</version>
        </dependency>

2.yml配置文件設(shè)置

 kafka:
    bootstrap-servers: ip:端口
    jaas:
      enabled: false
    listener:
      type: single
      concurrency: 3
    consumer:
#      key-deserializer: org.apache.kafka.common.serialization.IntegerDeserializer
      group-id: group-id數(shù)據(jù)
      auto-offset-reset: latest
      enable-auto-commit: false
      max-poll-records: 100

  # kafka topic
  task:
    execute:
      topic: topic名稱

二、生產(chǎn)者

1.簡單生產(chǎn)者的書寫:

@Component
public class SendKafkaUtil {

    @Autowired
    KafkaTemplate<Object, Object> kafkaTemplate;



    @Value("${spring.task.execute.topic}")
    private String topic;


    public void sendMessageKafka() {
        kafkaTemplate.send(topic, "{json數(shù)據(jù)}");
    }

}

三、消費(fèi)者

Consumer 消費(fèi)數(shù)據(jù)時(shí)的可靠性是很容易保證的,因?yàn)閿?shù)據(jù)在 Kafka 中是持久化的,故不用擔(dān)心數(shù)據(jù)丟失問題。由于 consumer 在消費(fèi)過程中可能會出現(xiàn)斷電宕機(jī)等故障,consumer 恢復(fù)后,需要從故障前的位置的繼續(xù)消費(fèi),所以 consumer 需要實(shí)時(shí)記錄自己消費(fèi)到了哪個(gè) offset,以便故障恢復(fù)后繼續(xù)消費(fèi)。

1.簡單消費(fèi)者的書寫:

 @KafkaListener(topics = {"${spring.kafka.topic}"}, groupId = "${spring.kafka.consumer.group-id}")
    public void processMessage(List<ConsumerRecord<String, String>> records){
        logger.info("kafka消費(fèi)消息數(shù)量:" + records.size());
}

2.消費(fèi)者批量消費(fèi)的書寫(批量控制:max-poll-records: 100)

?

@Bean
    public KafkaListenerContainerFactory<?> batchFactory(KafkaProperties properties) {
      Map<String, Object> consumerProperties = properties.buildConsumerProperties();
      ConcurrentKafkaListenerContainerFactory<String, String> factory = new
        ConcurrentKafkaListenerContainerFactory<>();
      factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProperties));
      factory.setBatchListener(true); // 開啟批量監(jiān)聽
      return factory;
    }
  }
  /**
   * 消費(fèi)者1
   * 批處理統(tǒng)一方法
   *
   * @param records
   */

  @KafkaListener(topics = {"${spring.task.execute.topic}"},containerFactory = "batchFactory",topicPartitions = {
    @TopicPartition(partitions = {"0"}, topic = "${spring.task.execute.topic}") })
  public void consumer1(List<ConsumerRecord<String, Object>> records) throws IOException {
    log.info("Id1 Listener, Thread ID: " + Thread.currentThread().getId());
    log.info("Id1 records size " +  records.size());
    // todo數(shù)據(jù)邏輯處理
  }
  /**
   * 消費(fèi)者2
   * 批處理統(tǒng)一方法
   *
   * @param records
   */

  @KafkaListener(topics = {"${spring.task.execute.topic}"}, containerFactory = "batchFactory",topicPartitions = {
    @TopicPartition(partitions = {"1"}, topic = "${spring.task.execute.topic}") })
  public void consumer2(List<ConsumerRecord<String, Object>> records) throws IOException {
    log.info("Id2 Listener, Thread ID: " + Thread.currentThread().getId());
    log.info("Id2 records size " + records.size());
   // todo數(shù)據(jù)邏輯處理
  }

注:多消費(fèi)者時(shí),需要對應(yīng)kafka中配置的分區(qū);多少的Partition就有多少個(gè)消費(fèi)者,以免資源浪費(fèi)文章來源地址http://www.zghlxwxcb.cn/news/detail-612885.html

到了這里,關(guān)于Java輕松使用Kafka生產(chǎn)者,消費(fèi)者的文章就介紹完了。如果您還想了解更多內(nèi)容,請?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • kafka生產(chǎn)者和消費(fèi)者配置介紹

    每個(gè)kafka broker中配置文件 server.properties 默認(rèn)必須配置的屬性如下: **bootstrap.servers** - 指定生產(chǎn)者客戶端連接kafka集群所需的broker地址列表,格式為host1:port1,host2:port2,可以設(shè)置一個(gè)或多個(gè)。這里并非需要所有的broker地址,因?yàn)樯a(chǎn)者會從給定的broker里尋找其它的broker。 **key

    2024年02月12日
    瀏覽(23)
  • Kafka生產(chǎn)者與消費(fèi)者api示例

    Kafka生產(chǎn)者與消費(fèi)者api示例

    ? 一個(gè)正常的生產(chǎn)邏輯需要具備以下幾個(gè)步驟 配置生產(chǎn)者參數(shù)及創(chuàng)建相應(yīng)的生產(chǎn)者實(shí)例 構(gòu)建待發(fā)送的消息 發(fā)送消息 關(guān)閉生產(chǎn)者實(shí)例 采用默認(rèn)分區(qū)方式將消息散列的發(fā)送到各個(gè)分區(qū)當(dāng)中 ? ?對于properties配置的第二種寫法,相對來說不會出錯(cuò),簡單舉例: ? 1.kafka的生產(chǎn)者可

    2024年02月07日
    瀏覽(24)
  • 筆記:配置多個(gè)kafka生產(chǎn)者和消費(fèi)者

    如果只有一個(gè)kafka,那么使用自帶的KafkaAutoConfiguration配置類即可,對應(yīng)已有屬性類KafkaProperties,屬性前綴為spring.kafka.xxx; 本文記錄配置多個(gè)kafka的情況,即在KafkaAutoConfiguration的基礎(chǔ)上,自定義額外的kafka生產(chǎn)者和消費(fèi)者。 適用場景:需要消費(fèi)來源于不同kafka的消息、需要在不

    2024年02月15日
    瀏覽(32)
  • kafka生產(chǎn)者和消費(fèi)者(python版)

    生產(chǎn)者 消費(fèi)者 消費(fèi)者中的組名主要用戶針對主題的偏移量進(jìn)行更改,也涉及到主題中分區(qū)的問題, kafka工具類 此工具類基本上拿過去就可以用 疑問 當(dāng)消費(fèi)者鏈接kafka時(shí)發(fā)現(xiàn)topic沒有未讀的消息怎樣退出呢,默認(rèn)是在一直等待,但是我期望沒有要讀的消息的時(shí)候直接退出即可

    2024年02月16日
    瀏覽(21)
  • Kafka:主題創(chuàng)建、分區(qū)修改查看、生產(chǎn)者、消費(fèi)者

    Kafka:主題創(chuàng)建、分區(qū)修改查看、生產(chǎn)者、消費(fèi)者

    1.創(chuàng)建主題 2.查看所有主題 3.查看詳細(xì)主題 序號從0開始計(jì)算 Partition:分區(qū)數(shù),該主題有3個(gè)分區(qū) Replica:副本數(shù),該主題有3個(gè)副本 Leader:副本數(shù)中的主的序號,生產(chǎn)消費(fèi)的對象 1.修改分區(qū)數(shù) 修改的分區(qū)數(shù)量不可以小于或者等于當(dāng)前主題分區(qū)的數(shù)量,否則會報(bào)錯(cuò) 在根目錄kaf

    2024年02月11日
    瀏覽(32)
  • Kafka系列:查看Topic列表、消息消費(fèi)情況、模擬生產(chǎn)者消費(fèi)者

    Kafka系列:查看Topic列表、消息消費(fèi)情況、模擬生產(chǎn)者消費(fèi)者

    執(zhí)行topic刪除命令時(shí),出現(xiàn)提示 這條命令其實(shí)并不執(zhí)行刪除動作,僅僅是在zookeeper上標(biāo)記該topic要被刪除而已,同時(shí)也提醒用戶一定要提前打開delete.topic.enable開關(guān),否則刪除動作是不會執(zhí)行的。 解決辦法: a)在server.properties中設(shè)置delete.topic.enable參數(shù)為ture b)如下操作: 1.登

    2023年04月26日
    瀏覽(29)
  • 探究:kafka生產(chǎn)者/消費(fèi)者與多線程安全

    探究:kafka生產(chǎn)者/消費(fèi)者與多線程安全

    目錄 1. 多線程安全 1.1. 生產(chǎn)者是多線程安全的么? 1.1. 消費(fèi)者是多線程安全的么? 2. 消費(fèi)者規(guī)避多線程安全方案 2.1. 每個(gè)線程維護(hù)一個(gè)kafkaConsumer 2.2. [單/多]kafkaConsumer實(shí)例 + 多worker線程 2.3.方案優(yōu)缺點(diǎn)對比 ????????Kafka生產(chǎn)者是 線程安全 的,可以在多個(gè)線程中共享一個(gè)

    2023年04月26日
    瀏覽(24)
  • Linux安裝Kafka,創(chuàng)建topic、生產(chǎn)者、消費(fèi)者

    Linux安裝Kafka,創(chuàng)建topic、生產(chǎn)者、消費(fèi)者

    1.創(chuàng)建安裝目錄/usr/local/kafka mkdir /usr/local/kafka 2.進(jìn)入安裝包目錄 cd?/usr/local/kafka? 3.下載安裝包 wget https://downloads.apache.org/kafka/3.3.1/kafka_2.12-3.3.1.tgz 4.解壓安裝包 tar -zxvf kafka_2.12-3.3.1.tgz 5.進(jìn)入cd kafka_2.12-3.3.1目錄 cd kafka_2.12-3.3.1/ 6.修改zookeeper配置 cat ./config/zookeeper.properties | grep

    2023年04月17日
    瀏覽(29)
  • Kafka 架構(gòu)深度解析:生產(chǎn)者(Producer)和消費(fèi)者(Consumer)

    Kafka 架構(gòu)深度解析:生產(chǎn)者(Producer)和消費(fèi)者(Consumer)

    Apache Kafka 作為分布式流處理平臺,其架構(gòu)中的生產(chǎn)者和消費(fèi)者是核心組件,負(fù)責(zé)實(shí)現(xiàn)高效的消息生產(chǎn)和消費(fèi)。本文將深入剖析 Kafka 架構(gòu)中生產(chǎn)者和消費(fèi)者的工作原理、核心概念以及高級功能。 1 發(fā)送消息到 Kafka Kafka 生產(chǎn)者負(fù)責(zé)將消息發(fā)布到指定的主題。以下是一個(gè)簡單的生

    2024年02月03日
    瀏覽(50)
  • 在Windows上搭建Kafka環(huán)境的步驟,包括安裝Java、下載Kafka、配置Zookeeper和Kafka、啟動Zookeeper和Kafka、創(chuàng)建主題和生產(chǎn)者/消費(fèi)者等

    1. 安裝Java Kafka需要Java環(huán)境支持??梢詮腛racle官網(wǎng)下載JDK,或者使用OpenJDK。 2. 下載Kafka 可以從Kafka官網(wǎng)下載Kafka二進(jìn)制壓縮包。解壓后可以看到bin、config、libs等目錄。 3. 配置Zookeeper Kafka依賴Zookeeper實(shí)現(xiàn)分布式協(xié)作??梢允褂肒afka自帶的Zookeeper,也可以獨(dú)立安裝Zookeeper。 如果使

    2024年02月11日
    瀏覽(22)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包