?Kafka的非阻塞重試是通過為主題配置重試主題來實(shí)現(xiàn)的。如果需要,還可以配置額外的死信主題。如果所有重試都耗盡,事件將被轉(zhuǎn)發(fā)到DLT。在公共領(lǐng)域中有很多資源可用于了解技術(shù)細(xì)節(jié)。對(duì)于代碼中的重試機(jī)制編寫集成測(cè)試確實(shí)是一項(xiàng)具有挑戰(zhàn)性的工作。以下是一些測(cè)試方法,可以用來驗(yàn)證重試機(jī)制的正確性:
- 驗(yàn)證事件已經(jīng)按照所需的次數(shù)進(jìn)行了重試:
-
在測(cè)試中,模擬一個(gè)會(huì)觸發(fā)重試的事件,并設(shè)置重試次數(shù)為所需的次數(shù)。
-
使用斷言來驗(yàn)證事件是否被重試了指定的次數(shù)。
- 驗(yàn)證只有在特定的異常發(fā)生時(shí)才進(jìn)行重試,而不是其他異常:
-
在測(cè)試中,模擬不同的異常情況,包括需要重試的異常和不需要重試的異常。
-
使用斷言來驗(yàn)證只有特定的異常觸發(fā)了重試,而其他異常沒有觸發(fā)重試。
- 驗(yàn)證如果前一次重試已經(jīng)解決了異常,不會(huì)進(jìn)行另一次重試:
-
在測(cè)試中,模擬一個(gè)會(huì)觸發(fā)重試的事件,并在每次重試之間解決異常。
-
使用斷言來驗(yàn)證只有在異常沒有被解決的情況下才進(jìn)行重試。
- 驗(yàn)證在前面的 (n-1) 次重試失敗后,第 n 次重試成功:
-
在測(cè)試中,模擬一個(gè)會(huì)觸發(fā)重試的事件,并設(shè)置重試次數(shù)為 n。
-
使用斷言來驗(yàn)證在前面的 (n-1) 次重試失敗后,第 n 次重試成功。
- 驗(yàn)證如果所有的重試嘗試都失敗,事件是否已經(jīng)發(fā)送到了死信隊(duì)列:
- 在測(cè)試中,模擬一個(gè)會(huì)觸發(fā)重試的事件,并設(shè)置重試次數(shù)為一個(gè)較小的值。
- 使用斷言來驗(yàn)證當(dāng)所有的重試嘗試都失敗后,事件是否已經(jīng)發(fā)送到了死信隊(duì)列。
設(shè)置可重試的消費(fèi)者
@Slf4j
@Component
@RequiredArgsConstructor
public class CustomEventConsumer {
?
private final CustomEventHandler handler;
?
@RetryableTopic(attempts = "${retry.attempts}",
backoff = @Backoff(
delayExpression = "${retry.delay}",
multiplierExpression = "${retry.delay.multiplier}"
),
topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE,
dltStrategy = FAIL_ON_ERROR,
autoStartDltHandler = "true",
autoCreateTopics = "false",
include = {CustomRetryableException.class})
@KafkaListener(topics = "${topic}", id = "${default-consumer-group:default}")
public void consume(CustomEvent event, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
try {
log.info("Received event on topic {}", topic);
handler.handleEvent(event);
} catch (Exception e) {
log.error("Error occurred while processing event", e);
throw e;
}
}
?
@DltHandler
public void listenOnDlt(@Payload CustomEvent event) {
log.error("Received event on dlt.");
handler.handleEventFromDlt(event);
}
?
}
如果您注意上面的代碼片段,參數(shù)@RetryableTopic
中包含includes
。這告訴消費(fèi)者只在方法拋出CustomRetryableException
時(shí)進(jìn)行重試。您可以添加任意數(shù)量的異常類型。還有一個(gè)exclude
參數(shù),但一次只能使用其中一個(gè)。在將事件發(fā)布到死信隊(duì)列之前,事件處理最多應(yīng)重試指定的次數(shù)。
設(shè)置測(cè)試基礎(chǔ)設(shè)施
為了編寫集成測(cè)試,您需要確保擁有一個(gè)正常運(yùn)行的Kafka代理(最好是嵌入式的)和一個(gè)完全運(yùn)行的發(fā)布者。讓我們?cè)O(shè)置我們的基礎(chǔ)設(shè)施:
@EnableKafka
@SpringBootTest
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
@EmbeddedKafka(partitions = 1,
brokerProperties = {"listeners=" + "${kafka.broker.listeners}",
"port=" + "${kafka.broker.port}"},
controlledShutdown = true,
topics = {"test", "test-retry-0", "test-retry-1", "test-dlt"}
)
@ActiveProfiles("test")
class DocumentEventConsumerIntegrationTest {
?
@Autowired
private KafkaTemplate<String, CustomEvent> testKafkaTemplate;
?
?
// tests
?
}
配置從application-test.yml
文件中導(dǎo)入。當(dāng)使用嵌入式Kafka代理時(shí),重要的是要提及要?jiǎng)?chuàng)建的主題。它們不會(huì)自動(dòng)創(chuàng)建。在這種情況下,我們將創(chuàng)建四個(gè)主題,分別是:
"test", "test-retry-0", "test-retry-1", "test-dlt"
我們將最大重試次數(shù)設(shè)置為三次。每個(gè)主題對(duì)應(yīng)于每次重試嘗試。因此,如果三次重試都耗盡,事件應(yīng)該被轉(zhuǎn)發(fā)到DLT(死信隊(duì)列)。
測(cè)試用例
如果在第一次嘗試中成功消費(fèi),就不應(yīng)該進(jìn)行重試??梢酝ㄟ^方法只被調(diào)用一次來測(cè)試這一點(diǎn)。還可以添加對(duì)日志語句的進(jìn)一步測(cè)試。
@Test
void test_should_not_retry_if_consumption_is_successful() throws ExecutionException, InterruptedException {
CustomEvent event = new CustomEvent("Hello");
// GIVEN
doNothing().when(customEventHandler).handleEvent(any(CustomEvent.class));
?
// WHEN
testKafkaTemplate.send("test", event).get();
?
// THEN
verify(customEventHandler, timeout(2000).times(1)).handleEvent(any(CustomEvent.class));
verify(customEventHandler, timeout(2000).times(0)).handleEventFromDlt(any(CustomEvent.class));
}
如果引發(fā)了不可重試的異常,就不應(yīng)該進(jìn)行重試。在這種情況下,方法CustomEventHandler#handleEvent
應(yīng)該只被調(diào)用一次。
@Test ? ?void test_should_not_retry_if_non_retryable_exception_raised() throws ExecutionException, InterruptedException { ? ? ? ?CustomEvent event = new CustomEvent("Hello"); ? ? ? ?// GIVEN ? ? ? ?doThrow(CustomNonRetryableException.class).when(customEventHandler).handleEvent(any(CustomEvent.class));? ? ? ? ?// WHEN ? ? ? ?testKafkaTemplate.send("test", event).get();? ? ? ? ?// THEN ? ? ? ?verify(customEventHandler, timeout(2000).times(1)).handleEvent(any(CustomEvent.class)); ? ? ? ?verify(customEventHandler, timeout(2000).times(0)).handleEventFromDlt(any(CustomEvent.class)); ? ?}
如果拋出了RetryableException
,則應(yīng)該按照配置的最大重試次數(shù)進(jìn)行重試,當(dāng)重試次數(shù)耗盡時(shí),事件應(yīng)該被發(fā)布到死信主題。在這種情況下,方法CustomEventHandler#handleEvent
應(yīng)該被調(diào)用三次(maxRetries
次),而方法CustomEventHandler#handleEventFromDlt
應(yīng)該只被調(diào)用一次。
@Test
void test_should_not_retry_if_non_retryable_exception_raised() throws ExecutionException, InterruptedException {
CustomEvent event = new CustomEvent("Hello");
// GIVEN
doThrow(CustomNonRetryableException.class).when(customEventHandler).handleEvent(any(CustomEvent.class));
?
// WHEN
testKafkaTemplate.send("test", event).get();
?
// THEN
verify(customEventHandler, timeout(2000).times(1)).handleEvent(any(CustomEvent.class));
verify(customEventHandler, timeout(2000).times(0)).handleEventFromDlt(any(CustomEvent.class));
}
在驗(yàn)證階段添加了相當(dāng)長(zhǎng)的超時(shí)時(shí)間,以便在測(cè)試完成之前考慮指數(shù)退避延遲。這是很重要的,如果沒有正確設(shè)置,可能會(huì)導(dǎo)致斷言失敗。應(yīng)該重試直到RetryableException
被解決,并且如果引發(fā)了不可重試的異?;蛘咦罱K成功消費(fèi),就不應(yīng)該繼續(xù)重試。測(cè)試已經(jīng)設(shè)置為首先拋出RetryableException
,然后再拋出NonRetryableException
,以便進(jìn)行一次重試。
@Test
void test_should_retry_until_retryable_exception_is_resolved_by_non_retryable_exception() throws ExecutionException,
InterruptedException {
CustomEvent event = new CustomEvent("Hello");
// GIVEN
doThrow(CustomRetryableException.class).doThrow(CustomNonRetryableException.class).when(customEventHandler).handleEvent(any(CustomEvent.class));
?
// WHEN
testKafkaTemplate.send("test", event).get();
?
// THEN
verify(customEventHandler, timeout(10000).times(2)).handleEvent(any(CustomEvent.class));
verify(customEventHandler, timeout(2000).times(0)).handleEventFromDlt(any(CustomEvent.class));
}ndleEventFromDlt(any(CustomEvent.class)); }
@Test
void test_should_retry_until_retryable_exception_is_resolved_by_successful_consumption() throws ExecutionException,
InterruptedException {
CustomEvent event = new CustomEvent("Hello");
// GIVEN
doThrow(CustomRetryableException.class).doNothing().when(customEventHandler).handleEvent(any(CustomEvent.class));
?
// WHEN
testKafkaTemplate.send("test", event).get();
?
// THEN
verify(customEventHandler, timeout(10000).times(2)).handleEvent(any(CustomEvent.class));
verify(customEventHandler, timeout(2000).times(0)).handleEventFromDlt(any(CustomEvent.class));
}
結(jié)論
因此,您可以看到集成測(cè)試是一種混合和匹配的策略,超時(shí)時(shí)間,延遲和驗(yàn)證,以確保您的Kafka事件驅(qū)動(dòng)架構(gòu)的重試機(jī)制是可靠的。
作者: Mukut Bhattacharjee
更多技術(shù)干貨請(qǐng)關(guān)注公眾號(hào)“云原生數(shù)據(jù)庫”文章來源:http://www.zghlxwxcb.cn/news/detail-683258.html
squids.cn,目前可體驗(yàn)全網(wǎng)zui低價(jià)RDS,免費(fèi)的遷移工具DBMotion、SQL開發(fā)工具等。文章來源地址http://www.zghlxwxcb.cn/news/detail-683258.html
到了這里,關(guān)于使用 Spring Kafka 進(jìn)行非阻塞重試的集成測(cè)試的文章就介紹完了。如果您還想了解更多內(nèi)容,請(qǐng)?jiān)谟疑辖撬阉鱐OY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關(guān)文章,希望大家以后多多支持TOY模板網(wǎng)!