国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

kafka消費者詳解,根據(jù)實際生產(chǎn)解決問題

這篇具有很好參考價值的文章主要介紹了kafka消費者詳解,根據(jù)實際生產(chǎn)解決問題。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

1.首先kafka每創(chuàng)建一個消費者就是一個消費者組,必須指定groupip

2.兩個消費者組之間不相互影響,消費同一個主題的同一個分區(qū),兩個消費者組不相互影響,各自記錄自己的offset

3.在開發(fā)中如果沒有指定每個消費者去消費特定的分區(qū),那么kafka默認(rèn)是按照roundRobin輪詢的方式分配消費者消費分區(qū)的,如果指定了消費者消費特定的分區(qū),那么,就會按照指定的分區(qū)消費,那么具體如何指定特定分區(qū)消費呢?看代碼

  // 0 配置
        Properties properties = new Properties();

        // 連接
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");

        // 反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        // 組id
        properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");

        // 1 創(chuàng)建一個消費者
        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);

        // 2 訂閱主題對應(yīng)的分區(qū)
        ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
        topicPartitions.add(new TopicPartition("first",0));
        kafkaConsumer.assign(topicPartitions);

        // 3 消費數(shù)據(jù)
        while (true){
            //Duration.ofSeconds(1) 就是等待一秒鐘的意思,如果等待1秒后仍然沒有消息,則返回空
            ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));

            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord);
            }
        }

在topicPartitions 中加入多個對象,每個對象指定該消費者消費的主題和對應(yīng)的分區(qū),再通過assign方法加入消費者配置中

那自定義消費哪個分區(qū)有什么用呢?

那作用大了,假如我們未來有一個消費者組,只有一臺的能力比較強,處理快,那么就可以指定它來消費kafka多的那個分區(qū),而且這樣我們可以自由的指定每個消費者消費哪個分區(qū),有更強的拓展性

消費者可能出現(xiàn)重復(fù)消費和漏消費的情況,如何解決?

這個根據(jù)實際情況來定,這個問題主要出現(xiàn)在kafka的某個消費者節(jié)點宕機后,可能就會出現(xiàn)這樣的問題,那么如何完全解決呢?就是使用事務(wù)消費?但是如果項目中可以接受部分消息丟失,就沒必要使用了,因為會造成挺大的性能損耗的,上代碼

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "my-consumer-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("enable.auto.commit", "false"); // 禁用自動提交
props.put("isolation.level", "read_committed"); // 設(shè)置隔離級別為讀已提交
props.put("transactional.id", "my-transactional-id"); // 設(shè)置事務(wù)ID

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singleton("my-topic"));

 try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(1000); // 設(shè)置拉取的超時時間

                seataTransactionManager.begin(); // 開啟 Seata 分布式事務(wù)

                try {
                    for (ConsumerRecord<String, String> record : records) {
                        System.out.println("Received message: " + record.value());
                        // 在這里處理消息的邏輯
                        // ...
                    }

                    seataTransactionManager.commit(); // 提交事務(wù)
                    consumer.commitSync(); // 手動提交偏移量
                } catch (Exception e) {
                    seataTransactionManager.rollback(); // 回滾事務(wù)
                    consumer.seekToBeginning(records.partitions()); // 將消費者偏移量重置到消息的起始位置,以重新消費
                    throw new RuntimeException("Failed to consume messages", e);
                }
            }
        } finally {
            consumer.close();
        }

那么在生產(chǎn)中消費著具體如何編寫呢?

在生產(chǎn)環(huán)境中使用 Kafka 的 poll 模式來消費數(shù)據(jù),可以按照以下步驟進行配置和實現(xiàn):

1. 添加 Kafka 依賴:在項目的 `pom.xml` 文件中添加 Kafka 相關(guān)的依賴,例如:

<dependency>
? ? <groupId>org.springframework.kafka</groupId>
? ? <artifactId>spring-kafka</artifactId>
</dependency>

2. 配置 Kafka 連接信息:在 `application.properties` 或 `application.yml` 中配置 Kafka 的連接信息,包括 bootstrap servers(Kafka 服務(wù)器地址)、group id(消費者組ID)等配置項。

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my-consumer-group

3. 創(chuàng)建 Kafka 消費者配置類:創(chuàng)建一個 Kafka 消費者配置類,用于配置 KafkaConsumer 的屬性??梢愿鶕?jù)實際需求進行自定義配置。
?

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class KafkaConsumerConfig {

? ? @Bean
? ? public Map<String, Object> consumerConfigs() {
? ? ? ? Map<String, Object> props = new HashMap<>();
? ? ? ? props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
? ? ? ? props.put(ConsumerConfig.GROUP_ID_CONFIG, "my-consumer-group");
? ? ? ? props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
? ? ? ? props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
? ? ? ? // 其他可選配置項
? ? ? ? return props;
? ? }
}

4. 創(chuàng)建消息處理器:創(chuàng)建一個消息處理器,用于處理從 Kafka 主題中消費的消息。

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.stereotype.Component;

@Component
public class MyMessageHandler {

? ? public void handleMessage(ConsumerRecord<String, String> record) {
? ? ? ? // 處理接收到的消息
? ? ? ? String key = record.key();
? ? ? ? String value = record.value();
? ? ? ? // ... 進行業(yè)務(wù)處理
? ? }
}

5. 創(chuàng)建 Kafka 消費者:創(chuàng)建一個 Kafka 消費者,并使用 KafkaConsumerConfig 中定義的配置。

import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.time.Duration;
import java.util.Collections;
import java.util.Map;

@Component
public class MyKafkaConsumer {

? ? private final KafkaConsumer<String, String> kafkaConsumer;
? ? private final MyMessageHandler messageHandler;

? ? @Autowired
? ? public MyKafkaConsumer(Map<String, Object> consumerConfigs, MyMessageHandler messageHandler) {
? ? ? ? this.kafkaConsumer = new KafkaConsumer<>(consumerConfigs);
? ? ? ? this.messageHandler = messageHandler;
? ? }

? ? @PostConstruct
? ? public void startConsuming() {
? ? ? ? kafkaConsumer.subscribe(Collections.singletonList("my-topic"));

? ? ? ? new Thread(() -> {
? ? ? ? ? ? while (true) {
? ? ? ? ? ? ? ? ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));
? ? ? ? ? ? ? ? records.forEach(record -> messageHandler.handleMessage(record));
? ? ? ? ? ? }
? ? ? ? }).start();
? ? }

? ? // 可以根據(jù)需要添加關(guān)閉消費者的方法
}

在上述示例中,使用 `KafkaConsumerConfig` 類定義

了 KafkaConsumer 的配置,并通過構(gòu)造函數(shù)注入到 `MyKafkaConsumer` 類中。在 `startConsuming()` 方法中,創(chuàng)建了一個新線程來進行消費,不斷地使用 `poll()` 方法輪詢獲取消息,并通過 `MyMessageHandler` 處理消息。

請注意,在實際生產(chǎn)環(huán)境中,你可能還需要添加更多的配置和處理,例如異常處理、消費者的關(guān)閉和資源釋放、消息提交偏移量的控制等。以上示例僅提供了一個基本的框架,你可以根據(jù)實際需求進行擴展和調(diào)整。

好,消費者就介紹到這里,后邊我們介紹在生產(chǎn)中如何選擇硬件以及kafka每個組件具體的優(yōu)化方案,以及如何配置文章來源地址http://www.zghlxwxcb.cn/news/detail-498576.html

到了這里,關(guān)于kafka消費者詳解,根據(jù)實際生產(chǎn)解決問題的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • Kafka生產(chǎn)者與消費者api示例

    Kafka生產(chǎn)者與消費者api示例

    ? 一個正常的生產(chǎn)邏輯需要具備以下幾個步驟 配置生產(chǎn)者參數(shù)及創(chuàng)建相應(yīng)的生產(chǎn)者實例 構(gòu)建待發(fā)送的消息 發(fā)送消息 關(guān)閉生產(chǎn)者實例 采用默認(rèn)分區(qū)方式將消息散列的發(fā)送到各個分區(qū)當(dāng)中 ? ?對于properties配置的第二種寫法,相對來說不會出錯,簡單舉例: ? 1.kafka的生產(chǎn)者可

    2024年02月07日
    瀏覽(24)
  • 筆記:配置多個kafka生產(chǎn)者和消費者

    如果只有一個kafka,那么使用自帶的KafkaAutoConfiguration配置類即可,對應(yīng)已有屬性類KafkaProperties,屬性前綴為spring.kafka.xxx; 本文記錄配置多個kafka的情況,即在KafkaAutoConfiguration的基礎(chǔ)上,自定義額外的kafka生產(chǎn)者和消費者。 適用場景:需要消費來源于不同kafka的消息、需要在不

    2024年02月15日
    瀏覽(32)
  • kafka生產(chǎn)者和消費者(python版)

    生產(chǎn)者 消費者 消費者中的組名主要用戶針對主題的偏移量進行更改,也涉及到主題中分區(qū)的問題, kafka工具類 此工具類基本上拿過去就可以用 疑問 當(dāng)消費者鏈接kafka時發(fā)現(xiàn)topic沒有未讀的消息怎樣退出呢,默認(rèn)是在一直等待,但是我期望沒有要讀的消息的時候直接退出即可

    2024年02月16日
    瀏覽(21)
  • Kafka系列:查看Topic列表、消息消費情況、模擬生產(chǎn)者消費者

    Kafka系列:查看Topic列表、消息消費情況、模擬生產(chǎn)者消費者

    執(zhí)行topic刪除命令時,出現(xiàn)提示 這條命令其實并不執(zhí)行刪除動作,僅僅是在zookeeper上標(biāo)記該topic要被刪除而已,同時也提醒用戶一定要提前打開delete.topic.enable開關(guān),否則刪除動作是不會執(zhí)行的。 解決辦法: a)在server.properties中設(shè)置delete.topic.enable參數(shù)為ture b)如下操作: 1.登

    2023年04月26日
    瀏覽(29)
  • Kafka官方生產(chǎn)者和消費者腳本簡單使用

    怎樣使用Kafka官方生產(chǎn)者和消費者腳本進行消費生產(chǎn)和消費?這里假設(shè)已經(jīng)下載了kafka官方文件,并已經(jīng)解壓. 這就可以見到測試kafka對應(yīng)topic了.

    2024年02月04日
    瀏覽(23)
  • Kafka:主題創(chuàng)建、分區(qū)修改查看、生產(chǎn)者、消費者

    Kafka:主題創(chuàng)建、分區(qū)修改查看、生產(chǎn)者、消費者

    1.創(chuàng)建主題 2.查看所有主題 3.查看詳細(xì)主題 序號從0開始計算 Partition:分區(qū)數(shù),該主題有3個分區(qū) Replica:副本數(shù),該主題有3個副本 Leader:副本數(shù)中的主的序號,生產(chǎn)消費的對象 1.修改分區(qū)數(shù) 修改的分區(qū)數(shù)量不可以小于或者等于當(dāng)前主題分區(qū)的數(shù)量,否則會報錯 在根目錄kaf

    2024年02月11日
    瀏覽(32)
  • 探究:kafka生產(chǎn)者/消費者與多線程安全

    探究:kafka生產(chǎn)者/消費者與多線程安全

    目錄 1. 多線程安全 1.1. 生產(chǎn)者是多線程安全的么? 1.1. 消費者是多線程安全的么? 2. 消費者規(guī)避多線程安全方案 2.1. 每個線程維護一個kafkaConsumer 2.2. [單/多]kafkaConsumer實例 + 多worker線程 2.3.方案優(yōu)缺點對比 ????????Kafka生產(chǎn)者是 線程安全 的,可以在多個線程中共享一個

    2023年04月26日
    瀏覽(24)
  • Linux安裝Kafka,創(chuàng)建topic、生產(chǎn)者、消費者

    Linux安裝Kafka,創(chuàng)建topic、生產(chǎn)者、消費者

    1.創(chuàng)建安裝目錄/usr/local/kafka mkdir /usr/local/kafka 2.進入安裝包目錄 cd?/usr/local/kafka? 3.下載安裝包 wget https://downloads.apache.org/kafka/3.3.1/kafka_2.12-3.3.1.tgz 4.解壓安裝包 tar -zxvf kafka_2.12-3.3.1.tgz 5.進入cd kafka_2.12-3.3.1目錄 cd kafka_2.12-3.3.1/ 6.修改zookeeper配置 cat ./config/zookeeper.properties | grep

    2023年04月17日
    瀏覽(29)
  • Kafka - 3.x 消費者 生產(chǎn)經(jīng)驗不完全指北

    Kafka - 3.x 消費者 生產(chǎn)經(jīng)驗不完全指北

    Kafka引入了消費者事務(wù)(Consumer Transactions)來確保在消息處理期間維護端到端的數(shù)據(jù)一致性。這使得消費者能夠以事務(wù)的方式處理消息,包括從Kafka中讀取消息、處理消息和提交消息的offset。以下是有關(guān)Kafka消費者事務(wù)的詳細(xì)信息: 事務(wù)的引入 :Kafka 0.11.0版本引入了消費者事

    2024年02月06日
    瀏覽(20)
  • Kafka 架構(gòu)深度解析:生產(chǎn)者(Producer)和消費者(Consumer)

    Kafka 架構(gòu)深度解析:生產(chǎn)者(Producer)和消費者(Consumer)

    Apache Kafka 作為分布式流處理平臺,其架構(gòu)中的生產(chǎn)者和消費者是核心組件,負(fù)責(zé)實現(xiàn)高效的消息生產(chǎn)和消費。本文將深入剖析 Kafka 架構(gòu)中生產(chǎn)者和消費者的工作原理、核心概念以及高級功能。 1 發(fā)送消息到 Kafka Kafka 生產(chǎn)者負(fù)責(zé)將消息發(fā)布到指定的主題。以下是一個簡單的生

    2024年02月03日
    瀏覽(50)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包