国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

Kafka:消費者消費失敗處理-重試隊列

這篇具有很好參考價值的文章主要介紹了Kafka:消費者消費失敗處理-重試隊列。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

kafka沒有重試機制不支持消息重試,也沒有死信隊列,因此使用kafka做消息隊列時,需要自己實 現(xiàn)消息重試的功能。

實現(xiàn)

創(chuàng)建新的kafka主題作為重試隊列:

  1. 創(chuàng)建一個topic作為重試topic,用于接收等待重試的消息。
  2. 普通topic消費者設(shè)置待重試消息的下一個重試topic。
  3. 從重試topic獲取待重試消息儲存到redis的zset中,并以下一次消費時間排序
  4. 定時任務(wù)從redis獲取到達消費事件的消息,并把消息發(fā)送到對應(yīng)的topic
  5. 同一個消息重試次數(shù)過多則不再重試?

代碼實現(xiàn)?

依賴?

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.73</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>io.projectreactor</groupId>
            <artifactId>reactor-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

?添加application.properties

# bootstrap.servers
spring.kafka.bootstrap-servers=node1:9092
# key序列化器
spring.kafka.producer.keyserializer=org.apache.kafka.common.serialization.StringSerializer
# value序列化器
spring.kafka.producer.valueserializer=org.apache.kafka.common.serialization.StringSerializer
# 消費組id:group.id
spring.kafka.consumer.group-id=retryGroup
# key反序列化器
spring.kafka.consumer.keydeserializer=org.apache.kafka.common.serialization.StringDeserializer
# value反序列化器
spring.kafka.consumer.valuedeserializer=org.apache.kafka.common.serialization.StringDeserializer
# redis數(shù)據(jù)庫編號
spring.redis.database=0
# redis主機地址
spring.redis.host=node1
# redis端口
spring.redis.port=6379
# Redis服務(wù)器連接密碼(默認為空)
spring.redis.password=
# 連接池最大連接數(shù)(使用負值表示沒有限制)
spring.redis.jedis.pool.max-active=20
# 連接池最大阻塞等待時間(使用負值表示沒有限制)
spring.redis.jedis.pool.max-wait=-1
# 連接池中的最大空閑連接
spring.redis.jedis.pool.max-idle=10
# 連接池中的最小空閑連接
spring.redis.jedis.pool.min-idle=0
# 連接超時時間(毫秒)
spring.redis.timeout=1000
# Kafka主題名稱
spring.kafka.topics.test=tp_demo_retry_01
# 重試隊列
spring.kafka.topics.retry=tp_demo_retry_02

AppConfig.java

import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;

@Configuration
public class AppConfig {
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory) {

        RedisTemplate<String, Object> template = new RedisTemplate<>();
        // 配置連接工廠
        template.setConnectionFactory(factory);

        return template;
    }

}

RetryController?.java

import com.lagou.kafka.demo.service.KafkaService;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.beans.factory.annotation.Value;

import java.util.concurrent.ExecutionException;

@RestController
public class RetryController {

    @Autowired
    private KafkaService kafkaService;

    @Value("${spring.kafka.topics.test}")
    private String topic;

    @RequestMapping("/send/{message}")
    public String sendMessage(@PathVariable String message) throws ExecutionException, InterruptedException {

        ProducerRecord<String, String> record = new ProducerRecord<>(
                topic,
                message
        );

        // 向業(yè)務(wù)主題發(fā)送消息
        String result = kafkaService.sendMessage(record);

        return result;
    }

}

KafkaService.java

import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;


import java.util.concurrent.ExecutionException;

@Service
public class KafkaService {

    private Logger log = LoggerFactory.getLogger(KafkaService.class);

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public String sendMessage(ProducerRecord<String, String> record) throws ExecutionException, InterruptedException {

        SendResult<String, String> result = this.kafkaTemplate.send(record).get();
        RecordMetadata metadata = result.getRecordMetadata();
        String returnResult = metadata.topic() + "\t" + metadata.partition() + "\t" + metadata.offset();
        log.info("發(fā)送消息成功:" + returnResult);

        return returnResult;
    }

}

?ConsumerListener.java

import com.lagou.kafka.demo.service.RetryService;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;

@Component
public class ConsumerListener {

    private static final Logger log = LoggerFactory.getLogger(ConsumerListener.class);

    @Autowired
    private RetryService kafkaRetryService;

    private static int index = 0;

    @KafkaListener(topics = "${spring.kafka.topics.test}", groupId = "${spring.kafka.consumer.group-id}")
    public void consume(ConsumerRecord<String, String> record) {
        try {
            // 業(yè)務(wù)處理
            log.info("消費的消息:" + record);
            index++;
            if (index % 2 == 0) {
                throw new Exception("該重發(fā)了");
            }
        } catch (Exception e) {
            log.error(e.getMessage());
            // 消息重試,實際上先將消息放到redis
            kafkaRetryService.consumerLater(record);
        }
    }

}

RetryService?.java

import com.alibaba.fastjson.JSON;
import com.lzh.kafka.demo.entity.RetryRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;

import java.nio.ByteBuffer;
import java.util.Calendar;


@Service
public class RetryService {
    private static final Logger log = LoggerFactory.getLogger(RetryService.class);

    /**
     * 消息消費失敗后下一次消費的延遲時間(秒)
     * 第一次重試延遲10秒;第	二次延遲30秒,第三次延遲1分鐘...
     */
    private static final int[] RETRY_INTERVAL_SECONDS = {10, 30, 1*60, 2*60, 5*60, 10*60, 30*60, 1*60*60, 2*60*60};

    /**
     * 重試topic
     */
    @Value("${spring.kafka.topics.retry}")
    private String retryTopic;

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    public void consumerLater(ConsumerRecord<String, String> record){
        // 獲取消息的已重試次數(shù)
        int retryTimes = getRetryTimes(record);
        Date nextConsumerTime = getNextConsumerTime(retryTimes);
        // 如果達到重試次數(shù),則不再重試
        if(nextConsumerTime == null) {
            return;
        }

        // 組織消息
        RetryRecord retryRecord = new RetryRecord();
        retryRecord.setNextTime(nextConsumerTime.getTime());
        retryRecord.setTopic(record.topic());
        retryRecord.setRetryTimes(retryTimes);
        retryRecord.setKey(record.key());
        retryRecord.setValue(record.value());

        // 轉(zhuǎn)換為字符串
        String value = JSON.toJSONString(retryRecord);
        // 發(fā)送到重試隊列
        kafkaTemplate.send(retryTopic, null, value);
    }

    /**
     * 獲取消息的已重試次數(shù)
     */
    private int getRetryTimes(ConsumerRecord record){
        int retryTimes = -1;
        for(Header header : record.headers()){
            if(RetryRecord.KEY_RETRY_TIMES.equals(header.key())){
                ByteBuffer buffer = ByteBuffer.wrap(header.value());
                retryTimes = buffer.getInt();
            }
        }
        retryTimes++;
        return retryTimes;
    }

    /**
     * 獲取待重試消息的下一次消費時間
     */
    private Date getNextConsumerTime(int retryTimes){
        // 重試次數(shù)超過上限,不再重試
        if(RETRY_INTERVAL_SECONDS.length < retryTimes) {
            return null;
        }

        Calendar calendar = Calendar.getInstance();
        calendar.add(Calendar.SECOND, RETRY_INTERVAL_SECONDS[retryTimes]);
        return calendar.getTime();
    }
}

RetryListener.java

import com.alibaba.fastjson.JSON;
import com.lzh.kafka.demo.entity.RetryRecord;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.ZSetOperations;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;

import java.util.UUID;

@Component
@EnableScheduling
public class RetryListener {

    private Logger log = LoggerFactory.getLogger(RetryListener.class);

    private static final String RETRY_KEY_ZSET = "_retry_key";
    private static final String RETRY_VALUE_MAP = "_retry_value";
    @Autowired
    private RedisTemplate<String,Object> redisTemplate;
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Value("${spring.kafka.topics.test}")
    private String bizTopic;

    @KafkaListener(topics = "${spring.kafka.topics.retry}")
//    public void consume(List<ConsumerRecord<String, String>> list) {
//        for(ConsumerRecord<String, String> record : list){
    public void consume(ConsumerRecord<String, String> record) {

        System.out.println("需要重試的消息:" + record);
        RetryRecord retryRecord = JSON.parseObject(record.value(), RetryRecord.class);

        /**
         * 防止待重試消息太多撐爆redis,可以將待重試消息按下一次重試時間分開存儲放到不同介質(zhì)
         * 例如下一次重試時間在半小時以后的消息儲存到mysql,并定時從mysql讀取即將重試的消息儲儲存到redis
         */

        // 通過redis的zset進行時間排序
        String key = UUID.randomUUID().toString();
        redisTemplate.opsForHash().put(RETRY_VALUE_MAP, key, record.value());
        redisTemplate.opsForZSet().add(RETRY_KEY_ZSET, key, retryRecord.getNextTime());
    }
//    }

    /**
     * 定時任務(wù)從redis讀取到達重試時間的消息,發(fā)送到對應(yīng)的topic
     */
//    @Scheduled(cron="2 * * * * *")
    @Scheduled(fixedDelay = 2000)
    public void retryFromRedis() {
        log.warn("retryFromRedis----begin");
        long currentTime = System.currentTimeMillis();
        // 根據(jù)時間倒序獲取
        Set<ZSetOperations.TypedTuple<Object>> typedTuples =
                redisTemplate.opsForZSet().reverseRangeByScoreWithScores(RETRY_KEY_ZSET, 0, currentTime);
        // 移除取出的消息
        redisTemplate.opsForZSet().removeRangeByScore(RETRY_KEY_ZSET, 0, currentTime);
        for(ZSetOperations.TypedTuple<Object> tuple : typedTuples){
            String key = tuple.getValue().toString();
            String value = redisTemplate.opsForHash().get(RETRY_VALUE_MAP, key).toString();
            redisTemplate.opsForHash().delete(RETRY_VALUE_MAP, key);
            RetryRecord retryRecord = JSON.parseObject(value, RetryRecord.class);
            ProducerRecord record = retryRecord.parse();

            ProducerRecord recordReal = new ProducerRecord(
                    bizTopic,
                    record.partition(),
                    record.timestamp(),
                    record.key(),
                    record.value(),
                    record.headers()
            );

            kafkaTemplate.send(recordReal);
        }
        // todo 發(fā)生異常將發(fā)送失敗的消息重新發(fā)送到redis
    }
}

?RetryRecord.java

package com.lzh.kafka.demo.entity;

import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;

import java.nio.ByteBuffer;

public class RetryRecord {

    public static final String KEY_RETRY_TIMES = "retryTimes";

    private String key;
    private String value;

    private Integer retryTimes;
    private String topic;
    private Long nextTime;

    public RetryRecord() {
    }

    public String getKey() {
        return key;
    }

    public void setKey(String key) {
        this.key = key;
    }

    public String getValue() {
        return value;
    }

    public void setValue(String value) {
        this.value = value;
    }

    public Integer getRetryTimes() {
        return retryTimes;
    }

    public void setRetryTimes(Integer retryTimes) {
        this.retryTimes = retryTimes;
    }

    public String getTopic() {
        return topic;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }

    public Long getNextTime() {
        return nextTime;
    }

    public void setNextTime(Long nextTime) {
        this.nextTime = nextTime;
    }

    public ProducerRecord parse() {
        Integer partition = null;
        Long timestamp = System.currentTimeMillis();
        List<Header> headers = new ArrayList<>();
        ByteBuffer retryTimesBuffer = ByteBuffer.allocate(4);
        retryTimesBuffer.putInt(retryTimes);
        retryTimesBuffer.flip();
        headers.add(new RecordHeader(RetryRecord.KEY_RETRY_TIMES, retryTimesBuffer));

        ProducerRecord sendRecord = new ProducerRecord(
                topic, partition, timestamp, key, value, headers);
        return sendRecord;
    }
}

?文章來源地址http://www.zghlxwxcb.cn/news/detail-441734.html

到了這里,關(guān)于Kafka:消費者消費失敗處理-重試隊列的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務(wù),不擁有所有權(quán),不承擔相關(guān)法律責任。如若轉(zhuǎn)載,請注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費用

相關(guān)文章

  • 分布式 - 消息隊列Kafka:Kafka 消費者消費位移的提交方式

    分布式 - 消息隊列Kafka:Kafka 消費者消費位移的提交方式

    最簡單的提交方式是讓消費者自動提交偏移量,自動提交 offset 的相關(guān)參數(shù): enable.auto.commit:是否開啟自動提交 offset 功能,默認為 true; auto.commit.interval.ms:自動提交 offset 的時間間隔,默認為5秒; 如果 enable.auto.commit 被設(shè)置為true,那么每過5秒,消費者就會自動提交 poll() 返

    2024年02月12日
    瀏覽(31)
  • 分布式消息隊列Kafka(四)- 消費者

    分布式消息隊列Kafka(四)- 消費者

    1.Kafka消費方式 2.Kafka消費者工作流程 (1)總體工作流程 (2)消費者組工作流程 3.消費者API (1)單個消費者消費 實現(xiàn)代碼 (2)單個消費者指定分區(qū)消費 代碼實現(xiàn): (3)消費者組消費 復(fù)制上面CustomConsumer三個,同時去訂閱統(tǒng)一個主題,消費數(shù)據(jù),發(fā)現(xiàn)一個分區(qū)只能被一個

    2023年04月26日
    瀏覽(33)
  • 分布式 - 消息隊列Kafka:Kafka消費者的分區(qū)分配策略

    分布式 - 消息隊列Kafka:Kafka消費者的分區(qū)分配策略

    Kafka 消費者負載均衡策略? Kafka 消費者分區(qū)分配策略? 1. 環(huán)境準備 創(chuàng)建主題 test 有5個分區(qū),準備 3 個消費者并進行消費,觀察消費分配情況。然后再停止其中一個消費者,再次觀察消費分配情況。 ① 創(chuàng)建主題 test,該主題有5個分區(qū),2個副本: ② 創(chuàng)建3個消費者CustomConsu

    2024年02月13日
    瀏覽(31)
  • 分布式 - 消息隊列Kafka:Kafka消費者分區(qū)再均衡(Rebalance)

    分布式 - 消息隊列Kafka:Kafka消費者分區(qū)再均衡(Rebalance)

    01. Kafka 消費者分區(qū)再均衡是什么? 消費者群組里的消費者共享主題分區(qū)的所有權(quán)。當一個新消費者加入群組時,它將開始讀取一部分原本由其他消費者讀取的消息。當一個消費者被關(guān)閉或發(fā)生崩潰時,它將離開群組,原本由它讀取的分區(qū)將由群組里的其他消費者讀取。 分區(qū)

    2024年02月12日
    瀏覽(31)
  • Spring Boot中使用Kafka時遇到“構(gòu)建Kafka消費者失敗“的問題

    在使用Spring Boot開發(fā)應(yīng)用程序時,集成Apache Kafka作為消息隊列是一種常見的做法。然而,有時候在配置和使用Kafka時可能會遇到一些問題。本文將探討在Spring Boot應(yīng)用程序中使用Kafka時可能遇到的\\\"構(gòu)建Kafka消費者失敗\\\"錯誤,并提供解決方案。 錯誤描述: 當嘗試構(gòu)建Kafka消費者時

    2024年01月17日
    瀏覽(23)
  • 【消息隊列】細說Kafka消費者的分區(qū)分配和重平衡

    【消息隊列】細說Kafka消費者的分區(qū)分配和重平衡

    我們直到在性能設(shè)計中異步模式,一般要么是采用pull,要么采用push。而兩種方式各有優(yōu)缺點。 pull :說白了就是通過消費端進行主動拉去數(shù)據(jù),會根據(jù)自身系統(tǒng)處理能力去獲取消息,上有Broker系統(tǒng)無需關(guān)注消費端的消費能力。kafka采用pull模式 push : Broker主動推送消息到消費端

    2024年02月12日
    瀏覽(30)
  • kakfa 3.5 kafka服務(wù)端處理消費者客戶端拉取數(shù)據(jù)請求源碼

    kafka服務(wù)端接收生產(chǎn)者數(shù)據(jù)的API在KafkaApis.scala類中,handleFetchRequest方法 replicaManager.fetchMessages 最后通過這個方法獲得日志 通過readFromLocalLog查詢數(shù)據(jù)日志 val readResult = read(tp, fetchInfo, limitBytes, minOneMessage) 遍歷主題分區(qū)分別執(zhí)行read內(nèi)部函數(shù)執(zhí)行查詢操作 方法內(nèi)部通過 partition.fe

    2024年02月10日
    瀏覽(26)
  • 13、Kafka ------ kafka 消費者API用法(消費者消費消息代碼演示)

    13、Kafka ------ kafka 消費者API用法(消費者消費消息代碼演示)

    消費者API的核心類是 KafkaConsumer,它提供了如下常用方法: 下面這些方法都體現(xiàn)了Kafka是一個數(shù)據(jù)流平臺,消費者通過這些方法可以從分區(qū)的任意位置、重新開始讀取數(shù)據(jù)。 根據(jù)KafkaConsumer不難看出,使用消費者API拉取消息很簡單,基本只要幾步: 1、創(chuàng)建KafkaConsumer對象,創(chuàng)建

    2024年04月11日
    瀏覽(30)
  • 【Kafka】Kafka消費者

    【Kafka】Kafka消費者

    pull(拉)模式:consumer采用從broker中主動拉取數(shù)據(jù)。 Kafka采用這種方式。 push(推)模式:Kafka沒有采用這種方式,因為由broker決定消息發(fā)送速率,很難適應(yīng)所有的消費者的消費速率。例如推送的速度是50m/s,consumer1和consumer2舊來不及處理消息。 pull模式不足之處是,如果Kafka沒有數(shù)

    2024年02月13日
    瀏覽(24)
  • kafka配置多個消費者groupid kafka多個消費者消費同一個partition(java)

    kafka配置多個消費者groupid kafka多個消費者消費同一個partition(java)

    kafka是由Apache軟件基金會開發(fā)的一個開源流處理平臺。kafka是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng),它可以處理消費者在網(wǎng)站中的所有動作流數(shù)據(jù)。 kafka中partition類似數(shù)據(jù)庫中的分表數(shù)據(jù),可以起到水平擴展數(shù)據(jù)的目的,比如有a,b,c,d,e,f 6個數(shù)據(jù),某個topic有兩個partition,一

    2024年01月22日
    瀏覽(161)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包