国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

非阻塞重試與 Spring Kafka 的集成測試

這篇具有很好參考價值的文章主要介紹了非阻塞重試與 Spring Kafka 的集成測試。希望對大家有所幫助。如果存在錯誤或未考慮完全的地方,請大家不吝賜教,您也可以點擊"舉報違法"按鈕提交疑問。

????????如何為啟用重試和死信發(fā)布的消費者的 Spring Kafka 實現(xiàn)編寫集成測試。

Kafka 非阻塞重試

Kafka 中的非阻塞重試是通過為主主題配置重試主題來完成的。如果需要,還可以配置其他死信主題。如果所有重試均已用盡,事件將轉發(fā)至 DLT。公共領域提供了大量資源來了解技術細節(jié)。?

要測試什么?

在代碼中為重試機制編寫集成測試時,這可能是一項具有挑戰(zhàn)性的工作。?

  • 如何測試該事件是否已重試所需的次數(shù)??
  • 如何測試僅在發(fā)生某些異常時才執(zhí)行重試,而對于其他異常則不執(zhí)行重試?
  • 如果上次重試中異常已解決,如何測試是否未進行另一次重試?
  • 在(n-1)次重試嘗試失敗后,如何測試重試中的第n次嘗試是否成功?
  • 當所有重試嘗試都用完后,如何測試事件是否已發(fā)送到死信隊列?

讓我們看一些代碼。您可以找到很多很好的文章,展示如何使用 Spring Kafka 設置非阻塞重試。下面給出了一種這樣的實現(xiàn)。這是使用Spring-Kafka 的@RetryableTopic@DltHandler? 注釋來完成的。

設置可重試消費者

@Slf4j
@Component
@RequiredArgsConstructor
public class CustomEventConsumer {

    private final CustomEventHandler handler;

    @RetryableTopic(attempts = "${retry.attempts}",
            backoff = @Backoff(
                    delayExpression = "${retry.delay}",
                    multiplierExpression = "${retry.delay.multiplier}"
            ),
            topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE,
            dltStrategy = FAIL_ON_ERROR,
            autoStartDltHandler = "true",
            autoCreateTopics = "false",
            include = {CustomRetryableException.class})
    @KafkaListener(topics = "${topic}", id = "${default-consumer-group:default}")
    public void consume(CustomEvent event, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        try {
            log.info("Received event on topic {}", topic);
            handler.handleEvent(event);
        } catch (Exception e) {
            log.error("Error occurred while processing event", e);
            throw e;
        }
    }

    @DltHandler
    public void listenOnDlt(@Payload CustomEvent event) {
        log.error("Received event on dlt.");
        handler.handleEventFromDlt(event);
    }

}

如果您注意到上面的代碼片段,include參數(shù)包含CustomRetryableException.class.?這告訴使用者僅在該方法拋出 CustomRetryableException 時才重試CustomEventHandler#handleEvent。您可以根據(jù)需要添加任意數(shù)量。還有一個排除參數(shù),但一次可以使用其中任何一個參數(shù)。

${retry.attempts}在發(fā)布到 DLT 之前,事件處理應重試最多次數(shù)。

設置測試基礎設施

要編寫集成測試,您需要確保擁有一個正常運行的 Kafka 代理(首選嵌入式)和一個功能齊全的發(fā)布者。讓我們設置我們的基礎設施:

@EnableKafka
@SpringBootTest
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
@EmbeddedKafka(partitions = 1,
        brokerProperties = {"listeners=" + "${kafka.broker.listeners}", 
                            "port=" + "${kafka.broker.port}"},
        controlledShutdown = true,
        topics = {"test", "test-retry-0", "test-retry-1", "test-dlt"}
)
@ActiveProfiles("test")
class DocumentEventConsumerIntegrationTest {
  
  @Autowired
  private KafkaTemplate<String, CustomEvent> testKafkaTemplate;


    // tests

}

** 配置是從 application-test.yml 文件導入的。

使用嵌入式 kafka 代理時,重要的是要提及要創(chuàng)建的主題。它們不會自動創(chuàng)建。在本例中,我們創(chuàng)建四個主題,即?

"test", "test-retry-0", "test-retry-1", "test-dlt"

我們已將最大重試嘗試次數(shù)設置為 3 次。每個主題對應于每次重試嘗試。因此,如果 3 次重試都用盡,則應將事件轉發(fā)到 DLT。

測試用例

如果第一次嘗試消費成功,則不應重試。

CustomEventHandler#handleEvent這可以通過該方法僅被調用一次的事實來測試。還可以添加對 Log 語句的進一步測試。

    @Test
    void test_should_not_retry_if_consumption_is_successful() throws ExecutionException, InterruptedException {
        CustomEvent event = new CustomEvent("Hello");
        // GIVEN
        doNothing().when(customEventHandler).handleEvent(any(CustomEvent.class));

        // WHEN
        testKafkaTemplate.send("test", event).get();

        // THEN
        verify(customEventHandler, timeout(2000).times(1)).handleEvent(any(CustomEvent.class));
        verify(customEventHandler, timeout(2000).times(0)).handleEventFromDlt(any(CustomEvent.class));
    }

如果引發(fā)不可重試的異常,則不應重試。

在這種情況下,該CustomEventHandler#handleEvent方法應該只調用一次:

? ? @Test
 ? ?void test_should_not_retry_if_non_retryable_exception_raised() throws ExecutionException, InterruptedException {
 ? ? ? ?CustomEvent event = new CustomEvent("Hello");
 ? ? ? ?// GIVEN
 ? ? ? ?doThrow(CustomNonRetryableException.class).when(customEventHandler).handleEvent(any(CustomEvent.class));

 ? ? ? ?// WHEN
 ? ? ? ?testKafkaTemplate.send("test", event).get();

 ? ? ? ?// THEN
 ? ? ? ?verify(customEventHandler, timeout(2000).times(1)).handleEvent(any(CustomEvent.class));
 ? ? ? ?verify(customEventHandler, timeout(2000).times(0)).handleEventFromDlt(any(CustomEvent.class));
 ? ?}

如果拋出 a,則重試配置的最大次數(shù)RetryableException,并在重試用完后將其發(fā)布到死信主題。

在這種情況下,該CustomEventHandler#handleEvent方法應被調用三次(maxRetries)次,并且CustomEventHandler#handleEventFromDlt該方法應被調用一次。

    @Test
    void test_should_retry_maximum_times_and_publish_to_dlt_if_retryable_exception_raised() throws ExecutionException, InterruptedException {
        CustomEvent event = new CustomEvent("Hello");
        // GIVEN
        doThrow(CustomRetryableException.class).when(customEventHandler).handleEvent(any(CustomEvent.class));

        // WHEN
        testKafkaTemplate.send("test", event).get();

        // THEN
        verify(customEventHandler, timeout(10000).times(maxRetries)).handleEvent(any(CustomEvent.class));
        verify(customEventHandler, timeout(2000).times(1)).handleEventFromDlt(any(CustomEvent.class));
    }

**在驗證階段添加了相當大的超時,以便在測試完成之前可以考慮指數(shù)退避延遲。這很重要,如果設置不當可能會導致斷言失敗。

應該重試直到RetryableException解決,并且如果引發(fā)不可重試的異?;蛳M最終成功,則不應繼續(xù)重試。

測試已設置為RetryableException先拋出 a 然后再拋出 a?NonRetryable exception,以便重試一次。

    @Test
    void test_should_retry_until_retryable_exception_is_resolved_by_non_retryable_exception() throws ExecutionException,
            InterruptedException {
        CustomEvent event = new CustomEvent("Hello");
        // GIVEN
        doThrow(CustomRetryableException.class).doThrow(CustomNonRetryableException.class).when(customEventHandler).handleEvent(any(CustomEvent.class));

        // WHEN
        testKafkaTemplate.send("test", event).get();

        // THEN
        verify(customEventHandler, timeout(10000).times(2)).handleEvent(any(CustomEvent.class));
        verify(customEventHandler, timeout(2000).times(0)).handleEventFromDlt(any(CustomEvent.class));
    }

    @Test
    void test_should_retry_until_retryable_exception_is_resolved_by_successful_consumption() throws ExecutionException,
            InterruptedException {
        CustomEvent event = new CustomEvent("Hello");
        // GIVEN
        doThrow(CustomRetryableException.class).doNothing().when(customEventHandler).handleEvent(any(CustomEvent.class));

        // WHEN
        testKafkaTemplate.send("test", event).get();

        // THEN
        verify(customEventHandler, timeout(10000).times(2)).handleEvent(any(CustomEvent.class));
        verify(customEventHandler, timeout(2000).times(0)).handleEventFromDlt(any(CustomEvent.class));
? ? }

結論

因此,您可以看到集成測試是策略、超時、延遲和驗證的混合和匹配,以確保 Kafka 事件驅動架構的重試機制萬無一失。文章來源地址http://www.zghlxwxcb.cn/news/detail-661910.html

到了這里,關于非阻塞重試與 Spring Kafka 的集成測試的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點僅代表作者本人,不代表本站立場。本站僅提供信息存儲空間服務,不擁有所有權,不承擔相關法律責任。如若轉載,請注明出處: 如若內(nèi)容造成侵權/違法違規(guī)/事實不符,請點擊違法舉報進行投訴反饋,一經(jīng)查實,立即刪除!

領支付寶紅包贊助服務器費用

相關文章

  • Spring集成Kafka

    Spring集成Kafka

    我負責的其中一個項目,接口的交互量在千萬級/d,所以要存儲大量的日志,為了防止日志的存儲影響到系統(tǒng)的性能,所以在技術選型就決定了使用Kafka中間件和一個日志存儲系統(tǒng)來負責日志的存儲。 使用Kafka 的優(yōu)點: 1.Kafka 是一種高吞吐量的分布式消息系統(tǒng),可以支持水平

    2024年02月14日
    瀏覽(16)
  • Spring Boot集成Kafka詳解

    Spring Boot集成Kafka詳解

    Spring Boot是一個用于構建獨立的、生產(chǎn)級的Java應用程序的框架,而Kafka是一種高吞吐量的分布式發(fā)布訂閱消息系統(tǒng)。在本文中,我們將詳細解釋如何在Spring Boot項目中集成Kafka。 1. 添加依賴 首先,我們需要在項目的pom.xml文件中添加Spring Boot和Kafka的依賴。 2. 配置Kafka 接下來,

    2024年02月09日
    瀏覽(20)
  • Spring Cloud Stream集成Kafka

    Spring Cloud Stream集成Kafka

    Spring Cloud Stream是一個構建消息驅動微服務的框架,抽象了MQ的使用方式, 提供統(tǒng)一的API操作。Spring Cloud Stream通過Binder(綁定器)、inputs/outputs Channel完成應用程序和MQ的解耦。 Binder 負責綁定應用程序和MQ中間件,即指定應用程序是和KafKa交互還是和RabbitMQ交互或者和其他的MQ中間件交

    2024年02月13日
    瀏覽(32)
  • spring集成kafka并對消息進行監(jiān)聽

    需要依賴zookeeper,需提前啟動 在server.properties文件中配置kafka連接zookeeper相關信息 在zookeeper.properties中配置zookeeper所需配置 kafka本地安裝啟動 pom文件 生產(chǎn)配置 消費者配置 創(chuàng)建topic工具類 生產(chǎn)業(yè)務 消費業(yè)務 消息接收類 監(jiān)聽類 業(yè)務處理 異步 同步 ONEWAY kafka消息發(fā)送方式有同步

    2024年02月03日
    瀏覽(18)
  • Spring Boot集成kafka的相關配置

    額外依賴只需要這一個,kafka-client 不是springboot 的東西,那是原生的 kafka 客戶端, kafka-test也不需要,是用代碼控制broker的東西。 也可以用java類Config 方式配置,如果沒有特殊要求,可以只用spring配置的方式 注意加上@Component,被spring管理監(jiān)聽才有效 注意這里不能用@Value注解

    2024年02月07日
    瀏覽(26)
  • 在Spring Boot微服務集成kafka-clients操作Kafka集群

    記錄 :463 場景 :在Spring Boot微服務集成kafka-clients-3.0.0操作Kafka集群。使用kafka-clients的原生KafkaProducer操作Kafka集群生產(chǎn)者Producer。使用kafka-clients的原生KafkaConsumer操作Kafka集群的消費者Consumer。 版本 :JDK 1.8,Spring?Boot 2.6.3,kafka_2.12-2.8.0,kafka-clients-3.0.0。 Kafka集群安裝 :https://bl

    2024年02月09日
    瀏覽(95)
  • 在Spring Boot微服務集成Kafka客戶端(kafka-clients)操作Kafka

    記錄 :459 場景 :在Spring Boot微服務集成Kafka客戶端kafka-clients-3.0.0操作Kafka。使用kafka-clients的原生KafkaProducer操作Kafka生產(chǎn)者Producer。使用kafka-clients的原生KafkaConsumer操作Kafka的消費者Consumer。 版本 :JDK 1.8,Spring?Boot 2.6.3,kafka_2.12-2.8.0,kafka-clients-3.0.0。 Kafka安裝 :https://blog.csdn.ne

    2024年02月12日
    瀏覽(91)
  • 【Spring Boot】集成Kafka實現(xiàn)消息發(fā)送和訂閱

    【Spring Boot】集成Kafka實現(xiàn)消息發(fā)送和訂閱

    最近忙著搞低代碼開發(fā),好久沒新建spring項目了,結果今天心血來潮準備建個springboot項目 注意Type選Maven,java選8,其他默認 點下一步后完成就新建了一個spring boot項目,配置下Maven環(huán)境,主要是settings.xml文件,里面要包含阿里云倉庫,不然可能依賴下載不下來 在maven配置沒問

    2024年02月09日
    瀏覽(32)
  • 從零開始學Spring Boot系列-集成Kafka

    Apache Kafka是一個開源的分布式流處理平臺,由LinkedIn公司開發(fā)和維護,后來捐贈給了Apache軟件基金會。Kafka主要用于構建實時數(shù)據(jù)管道和流應用。它類似于一個分布式、高吞吐量的發(fā)布-訂閱消息系統(tǒng),可以處理消費者網(wǎng)站的所有動作流數(shù)據(jù)。這種動作流數(shù)據(jù)包括頁面瀏覽、搜

    2024年03月21日
    瀏覽(23)
  • kafka--技術文檔--spring-boot集成基礎簡單使用

    kafka--技術文檔--spring-boot集成基礎簡單使用

    ? ? ? ? 查閱了很多資料了解到,使用了spring-boot中整合的kafka的使用是被封裝好的。也就是說這些使用其實和在linux中的使用kafka代碼的使用其實沒有太大關系。但是邏輯是一樣的。這點要注意! 核心配置為: 如果在下面規(guī)定了spring-boot的版本那么就不需要再使用版本號,如

    2024年02月11日
    瀏覽(23)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請作者喝杯咖啡吧~博客贊助

支付寶掃一掃領取紅包,優(yōu)惠每天領

二維碼1

領取紅包

二維碼2

領紅包