国产 无码 综合区,色欲AV无码国产永久播放,无码天堂亚洲国产AV,国产日韩欧美女同一区二区

使用 Spring Kafka 進(jìn)行非阻塞重試的集成測(cè)試

這篇具有很好參考價(jià)值的文章主要介紹了使用 Spring Kafka 進(jìn)行非阻塞重試的集成測(cè)試。希望對(duì)大家有所幫助。如果存在錯(cuò)誤或未考慮完全的地方,請(qǐng)大家不吝賜教,您也可以點(diǎn)擊"舉報(bào)違法"按鈕提交疑問。

?Kafka的非阻塞重試是通過為主題配置重試主題來實(shí)現(xiàn)的。如果需要,還可以配置額外的死信主題。如果所有重試都耗盡,事件將被轉(zhuǎn)發(fā)到DLT。在公共領(lǐng)域中有很多資源可用于了解技術(shù)細(xì)節(jié)。對(duì)于代碼中的重試機(jī)制編寫集成測(cè)試確實(shí)是一項(xiàng)具有挑戰(zhàn)性的工作。以下是一些測(cè)試方法,可以用來驗(yàn)證重試機(jī)制的正確性:

  1. 驗(yàn)證事件已經(jīng)按照所需的次數(shù)進(jìn)行了重試:
  • 在測(cè)試中,模擬一個(gè)會(huì)觸發(fā)重試的事件,并設(shè)置重試次數(shù)為所需的次數(shù)。

  • 使用斷言來驗(yàn)證事件是否被重試了指定的次數(shù)。

  1. 驗(yàn)證只有在特定的異常發(fā)生時(shí)才進(jìn)行重試,而不是其他異常:
  • 在測(cè)試中,模擬不同的異常情況,包括需要重試的異常和不需要重試的異常。

  • 使用斷言來驗(yàn)證只有特定的異常觸發(fā)了重試,而其他異常沒有觸發(fā)重試。

  1. 驗(yàn)證如果前一次重試已經(jīng)解決了異常,不會(huì)進(jìn)行另一次重試:
  • 在測(cè)試中,模擬一個(gè)會(huì)觸發(fā)重試的事件,并在每次重試之間解決異常。

  • 使用斷言來驗(yàn)證只有在異常沒有被解決的情況下才進(jìn)行重試。

  1. 驗(yàn)證在前面的 (n-1) 次重試失敗后,第 n 次重試成功:
  • 在測(cè)試中,模擬一個(gè)會(huì)觸發(fā)重試的事件,并設(shè)置重試次數(shù)為 n。

  • 使用斷言來驗(yàn)證在前面的 (n-1) 次重試失敗后,第 n 次重試成功。

  1. 驗(yàn)證如果所有的重試嘗試都失敗,事件是否已經(jīng)發(fā)送到了死信隊(duì)列:
  • 在測(cè)試中,模擬一個(gè)會(huì)觸發(fā)重試的事件,并設(shè)置重試次數(shù)為一個(gè)較小的值。
  • 使用斷言來驗(yàn)證當(dāng)所有的重試嘗試都失敗后,事件是否已經(jīng)發(fā)送到了死信隊(duì)列。

設(shè)置可重試的消費(fèi)者

@Slf4j
@Component
@RequiredArgsConstructor
public class CustomEventConsumer {
?
    private final CustomEventHandler handler;
?
    @RetryableTopic(attempts = "${retry.attempts}",
            backoff = @Backoff(
                    delayExpression = "${retry.delay}",
                    multiplierExpression = "${retry.delay.multiplier}"
            ),
            topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE,
            dltStrategy = FAIL_ON_ERROR,
            autoStartDltHandler = "true",
            autoCreateTopics = "false",
            include = {CustomRetryableException.class})
    @KafkaListener(topics = "${topic}", id = "${default-consumer-group:default}")
    public void consume(CustomEvent event, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
        try {
            log.info("Received event on topic {}", topic);
            handler.handleEvent(event);
        } catch (Exception e) {
            log.error("Error occurred while processing event", e);
            throw e;
        }
    }
?
    @DltHandler
    public void listenOnDlt(@Payload CustomEvent event) {
        log.error("Received event on dlt.");
        handler.handleEventFromDlt(event);
    }
?
}

如果您注意上面的代碼片段,參數(shù)@RetryableTopic中包含includes。這告訴消費(fèi)者只在方法拋出CustomRetryableException時(shí)進(jìn)行重試。您可以添加任意數(shù)量的異常類型。還有一個(gè)exclude參數(shù),但一次只能使用其中一個(gè)。在將事件發(fā)布到死信隊(duì)列之前,事件處理最多應(yīng)重試指定的次數(shù)。

設(shè)置測(cè)試基礎(chǔ)設(shè)施

為了編寫集成測(cè)試,您需要確保擁有一個(gè)正常運(yùn)行的Kafka代理(最好是嵌入式的)和一個(gè)完全運(yùn)行的發(fā)布者。讓我們?cè)O(shè)置我們的基礎(chǔ)設(shè)施:

@EnableKafka
@SpringBootTest
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
@EmbeddedKafka(partitions = 1,
        brokerProperties = {"listeners=" + "${kafka.broker.listeners}", 
                            "port=" + "${kafka.broker.port}"},
        controlledShutdown = true,
        topics = {"test", "test-retry-0", "test-retry-1", "test-dlt"}
)
@ActiveProfiles("test")
class DocumentEventConsumerIntegrationTest {
?
  @Autowired
  private KafkaTemplate<String, CustomEvent> testKafkaTemplate;
?
?
    // tests
?
}

配置從application-test.yml文件中導(dǎo)入。當(dāng)使用嵌入式Kafka代理時(shí),重要的是要提及要?jiǎng)?chuàng)建的主題。它們不會(huì)自動(dòng)創(chuàng)建。在這種情況下,我們將創(chuàng)建四個(gè)主題,分別是:

"test", "test-retry-0", "test-retry-1", "test-dlt"

我們將最大重試次數(shù)設(shè)置為三次。每個(gè)主題對(duì)應(yīng)于每次重試嘗試。因此,如果三次重試都耗盡,事件應(yīng)該被轉(zhuǎn)發(fā)到DLT(死信隊(duì)列)。

測(cè)試用例

如果在第一次嘗試中成功消費(fèi),就不應(yīng)該進(jìn)行重試??梢酝ㄟ^方法只被調(diào)用一次來測(cè)試這一點(diǎn)。還可以添加對(duì)日志語句的進(jìn)一步測(cè)試。

 @Test
    void test_should_not_retry_if_consumption_is_successful() throws ExecutionException, InterruptedException {
        CustomEvent event = new CustomEvent("Hello");
        // GIVEN
        doNothing().when(customEventHandler).handleEvent(any(CustomEvent.class));
?
        // WHEN
        testKafkaTemplate.send("test", event).get();
?
        // THEN
        verify(customEventHandler, timeout(2000).times(1)).handleEvent(any(CustomEvent.class));
        verify(customEventHandler, timeout(2000).times(0)).handleEventFromDlt(any(CustomEvent.class));
    }

如果引發(fā)了不可重試的異常,就不應(yīng)該進(jìn)行重試。在這種情況下,方法CustomEventHandler#handleEvent應(yīng)該只被調(diào)用一次。

 @Test ? ?void test_should_not_retry_if_non_retryable_exception_raised() throws ExecutionException, InterruptedException { ? ? ? ?CustomEvent event = new CustomEvent("Hello"); ? ? ? ?// GIVEN ? ? ? ?doThrow(CustomNonRetryableException.class).when(customEventHandler).handleEvent(any(CustomEvent.class));? ? ? ? ?// WHEN ? ? ? ?testKafkaTemplate.send("test", event).get();? ? ? ? ?// THEN ? ? ? ?verify(customEventHandler, timeout(2000).times(1)).handleEvent(any(CustomEvent.class)); ? ? ? ?verify(customEventHandler, timeout(2000).times(0)).handleEventFromDlt(any(CustomEvent.class)); ? ?}

如果拋出了RetryableException,則應(yīng)該按照配置的最大重試次數(shù)進(jìn)行重試,當(dāng)重試次數(shù)耗盡時(shí),事件應(yīng)該被發(fā)布到死信主題。在這種情況下,方法CustomEventHandler#handleEvent應(yīng)該被調(diào)用三次(maxRetries次),而方法CustomEventHandler#handleEventFromDlt應(yīng)該只被調(diào)用一次。

 @Test
    void test_should_not_retry_if_non_retryable_exception_raised() throws ExecutionException, InterruptedException {
        CustomEvent event = new CustomEvent("Hello");
        // GIVEN
        doThrow(CustomNonRetryableException.class).when(customEventHandler).handleEvent(any(CustomEvent.class));
?
        // WHEN
        testKafkaTemplate.send("test", event).get();
?
        // THEN
        verify(customEventHandler, timeout(2000).times(1)).handleEvent(any(CustomEvent.class));
        verify(customEventHandler, timeout(2000).times(0)).handleEventFromDlt(any(CustomEvent.class));
    }

在驗(yàn)證階段添加了相當(dāng)長(zhǎng)的超時(shí)時(shí)間,以便在測(cè)試完成之前考慮指數(shù)退避延遲。這是很重要的,如果沒有正確設(shè)置,可能會(huì)導(dǎo)致斷言失敗。應(yīng)該重試直到RetryableException被解決,并且如果引發(fā)了不可重試的異?;蛘咦罱K成功消費(fèi),就不應(yīng)該繼續(xù)重試。測(cè)試已經(jīng)設(shè)置為首先拋出RetryableException,然后再拋出NonRetryableException,以便進(jìn)行一次重試。

@Test
    void test_should_retry_until_retryable_exception_is_resolved_by_non_retryable_exception() throws ExecutionException,
            InterruptedException {
        CustomEvent event = new CustomEvent("Hello");
        // GIVEN
        doThrow(CustomRetryableException.class).doThrow(CustomNonRetryableException.class).when(customEventHandler).handleEvent(any(CustomEvent.class));
?
        // WHEN
        testKafkaTemplate.send("test", event).get();
?
        // THEN
        verify(customEventHandler, timeout(10000).times(2)).handleEvent(any(CustomEvent.class));
        verify(customEventHandler, timeout(2000).times(0)).handleEventFromDlt(any(CustomEvent.class));
    }ndleEventFromDlt(any(CustomEvent.class));    }
 @Test
    void test_should_retry_until_retryable_exception_is_resolved_by_successful_consumption() throws ExecutionException,
            InterruptedException {
        CustomEvent event = new CustomEvent("Hello");
        // GIVEN
        doThrow(CustomRetryableException.class).doNothing().when(customEventHandler).handleEvent(any(CustomEvent.class));
?
        // WHEN
        testKafkaTemplate.send("test", event).get();
?
        // THEN
        verify(customEventHandler, timeout(10000).times(2)).handleEvent(any(CustomEvent.class));
        verify(customEventHandler, timeout(2000).times(0)).handleEventFromDlt(any(CustomEvent.class));
        }

結(jié)論

因此,您可以看到集成測(cè)試是一種混合和匹配的策略,超時(shí)時(shí)間,延遲和驗(yàn)證,以確保您的Kafka事件驅(qū)動(dòng)架構(gòu)的重試機(jī)制是可靠的。

作者: Mukut Bhattacharjee

更多技術(shù)干貨請(qǐng)關(guān)注公眾號(hào)“云原生數(shù)據(jù)庫

squids.cn,目前可體驗(yàn)全網(wǎng)zui低價(jià)RDS,免費(fèi)的遷移工具DBMotion、SQL開發(fā)工具等。文章來源地址http://www.zghlxwxcb.cn/news/detail-683258.html

到了這里,關(guān)于使用 Spring Kafka 進(jìn)行非阻塞重試的集成測(cè)試的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!

本文來自互聯(lián)網(wǎng)用戶投稿,該文觀點(diǎn)僅代表作者本人,不代表本站立場(chǎng)。本站僅提供信息存儲(chǔ)空間服務(wù),不擁有所有權(quán),不承擔(dān)相關(guān)法律責(zé)任。如若轉(zhuǎn)載,請(qǐng)注明出處: 如若內(nèi)容造成侵權(quán)/違法違規(guī)/事實(shí)不符,請(qǐng)點(diǎn)擊違法舉報(bào)進(jìn)行投訴反饋,一經(jīng)查實(shí),立即刪除!

領(lǐng)支付寶紅包贊助服務(wù)器費(fèi)用

相關(guān)文章

  • 接口請(qǐng)求重試的8種方法

    接口請(qǐng)求重試的8種方法

    日常業(yè)務(wù)開發(fā)過程中,可能第三方的服務(wù)器分布在世界的各個(gè)角落,所以請(qǐng)求三方接口的時(shí)候,難免會(huì)遇到一些網(wǎng)絡(luò)問題,這時(shí)候需要加入重試機(jī)制了,這期就給大家分享幾個(gè)接口重試的寫法。 ? 這是最簡(jiǎn)單也最直接的一種方式。在請(qǐng)求接口的代碼塊中加入循環(huán),如果請(qǐng)求失

    2024年02月04日
    瀏覽(19)
  • 大廠都是怎么做Redis重試的?

    潛心打造國內(nèi)一流,國際領(lǐng)先的技術(shù)干貨。 文章收錄在我的 GitHub 倉庫,歡迎Star/fork: JavaEdge-Interview 受網(wǎng)絡(luò)和運(yùn)行環(huán)境影響,應(yīng)用程序可能遇到暫時(shí)性故障,如瞬時(shí)網(wǎng)絡(luò)抖動(dòng)、服務(wù)暫時(shí)不可用、服務(wù)繁忙導(dǎo)致超時(shí)等。 自動(dòng)重試機(jī)制可大幅避免此類故障,保障操作成功執(zhí)行。

    2024年02月05日
    瀏覽(14)
  • docker安裝kafka,并集成springboot進(jìn)行測(cè)試

    docker安裝kafka,并集成springboot進(jìn)行測(cè)試

    大家好,今天我們開始學(xué)習(xí)kafka中間件,今天我們改變一下策略,不刷視頻學(xué)習(xí),改為實(shí)踐學(xué)習(xí),在網(wǎng)上找一些案例功能去做,來達(dá)到學(xué)習(xí)實(shí)踐的目的。 首先,是安裝相關(guān)組件。 1. docker安裝 安裝 1.1 yum-utils軟件包 1.2?設(shè)置阿里云鏡像 1.3?安裝docker 1.4?啟動(dòng)docker 1.5?測(cè)試 至此

    2023年04月25日
    瀏覽(17)
  • java阻塞隊(duì)列/kafka/spring整合kafka

    java阻塞隊(duì)列/kafka/spring整合kafka

    queue增加刪除元素 增加元素 add方法在添加元素的時(shí)候,若超出了度列的長(zhǎng)度會(huì)直接拋出異常: put方法,若向隊(duì)尾添加元素的時(shí)候發(fā)現(xiàn)隊(duì)列已經(jīng)滿了會(huì)發(fā)生阻塞一直等待空間,以加入元素 offer方法在添加元素時(shí),如果發(fā)現(xiàn)隊(duì)列已滿無法添加的話,會(huì)直接返回false 刪除元素 pol

    2024年02月12日
    瀏覽(22)
  • 使用Spring Boot集成中間件:Kafka的高級(jí)使用案例講解

    在實(shí)際應(yīng)用中,Kafka作為一種強(qiáng)大的分布式消息系統(tǒng),廣泛應(yīng)用于實(shí)時(shí)數(shù)據(jù)處理和消息傳遞。本文將通過一個(gè)全面的使用案例,詳細(xì)介紹如何使用Spring Boot集成Kafka,并展示其在實(shí)際場(chǎng)景中的應(yīng)用。 在開始之前,我們需要確保已經(jīng)完成以下準(zhǔn)備工作: 安裝并啟動(dòng)Kafka集群 創(chuàng)建

    2024年02月01日
    瀏覽(19)
  • kafka--技術(shù)文檔--spring-boot集成基礎(chǔ)簡(jiǎn)單使用

    kafka--技術(shù)文檔--spring-boot集成基礎(chǔ)簡(jiǎn)單使用

    ? ? ? ? 查閱了很多資料了解到,使用了spring-boot中整合的kafka的使用是被封裝好的。也就是說這些使用其實(shí)和在linux中的使用kafka代碼的使用其實(shí)沒有太大關(guān)系。但是邏輯是一樣的。這點(diǎn)要注意! 核心配置為: 如果在下面規(guī)定了spring-boot的版本那么就不需要再使用版本號(hào),如

    2024年02月11日
    瀏覽(23)
  • 如何使用Java進(jìn)行集成測(cè)試?

    在Java中進(jìn)行集成測(cè)試有很多種方法,以下介紹一種比較常見的基于JUnit框架的集成測(cè)試方法: 確定需要測(cè)試的代碼 首先需要確定需要進(jìn)行集成測(cè)試的代碼,可以是整個(gè)應(yīng)用程序,也可以是特定的模塊或者方法。 配置測(cè)試環(huán)境 在測(cè)試環(huán)境中創(chuàng)建測(cè)試數(shù)據(jù)庫、配置文件、mock對(duì)

    2024年02月11日
    瀏覽(19)
  • 使用TestContainers在Docker中進(jìn)行集成測(cè)試

    現(xiàn)代軟件應(yīng)用很少獨(dú)立工作。典型的應(yīng)用程序會(huì)與幾個(gè)外部系統(tǒng)進(jìn)行通信,如: 數(shù)據(jù)庫、 消息系統(tǒng)、 緩存提供商 其他第三方服務(wù)。 你應(yīng)該編寫測(cè)試確保一切正常運(yùn)行。 單元測(cè)試 有助于隔離地測(cè)試業(yè)務(wù)邏輯,不涉及任何外部服務(wù)。它們易于編寫并提供幾乎即時(shí)的反饋。 有了

    2024年02月08日
    瀏覽(13)
  • SpringBoot 如何使用 TestEntityManager 進(jìn)行 JPA 集成測(cè)試, 如何使用

    SpringBoot 如何使用 TestEntityManager 進(jìn)行 JPA 集成測(cè)試, 如何使用

    Spring Boot 是一個(gè)非常流行的 Java Web 開發(fā)框架,它簡(jiǎn)化了開發(fā)過程,提高了開發(fā)效率。在開發(fā)過程中,我們通常需要使用 JPA 操作數(shù)據(jù)庫,為了保證代碼的質(zhì)量和正確性,我們需要進(jìn)行集成測(cè)試。TestEntityManager 是 Spring Boot 提供的用于 JPA 集成測(cè)試的工具,它可以模擬 EntityManag

    2024年02月13日
    瀏覽(41)
  • SpringBoot 如何使用 MockMvc 進(jìn)行 Web 集成測(cè)試

    SpringBoot 如何使用 MockMvc 進(jìn)行 Web 集成測(cè)試

    SpringBoot 是一個(gè)流行的 Java Web 開發(fā)框架,它提供了一些強(qiáng)大的工具和庫,使得開發(fā) Web 應(yīng)用程序變得更加容易。其中之一是 MockMvc,它提供了一種測(cè)試 SpringBoot Web 應(yīng)用程序的方式,可以模擬 HTTP 請(qǐng)求和響應(yīng)的行為。 在本文中,我們將介紹 SpringBoot 中的 MockMvc,演示如何使用它

    2024年02月16日
    瀏覽(22)

覺得文章有用就打賞一下文章作者

支付寶掃一掃打賞

博客贊助

微信掃一掃打賞

請(qǐng)作者喝杯咖啡吧~博客贊助

支付寶掃一掃領(lǐng)取紅包,優(yōu)惠每天領(lǐng)

二維碼1

領(lǐng)取紅包

二維碼2

領(lǐng)紅包