????????如何為啟用重試和死信發(fā)布的消費者的 Spring Kafka 實現(xiàn)編寫集成測試。
Kafka 非阻塞重試
Kafka 中的非阻塞重試是通過為主主題配置重試主題來完成的。如果需要,還可以配置其他死信主題。如果所有重試均已用盡,事件將轉發(fā)至 DLT。公共領域提供了大量資源來了解技術細節(jié)。?
要測試什么?
在代碼中為重試機制編寫集成測試時,這可能是一項具有挑戰(zhàn)性的工作。?
- 如何測試該事件是否已重試所需的次數(shù)??
- 如何測試僅在發(fā)生某些異常時才執(zhí)行重試,而對于其他異常則不執(zhí)行重試?
- 如果上次重試中異常已解決,如何測試是否未進行另一次重試?
- 在(n-1)次重試嘗試失敗后,如何測試重試中的第n次嘗試是否成功?
- 當所有重試嘗試都用完后,如何測試事件是否已發(fā)送到死信隊列?
讓我們看一些代碼。您可以找到很多很好的文章,展示如何使用 Spring Kafka 設置非阻塞重試。下面給出了一種這樣的實現(xiàn)。這是使用Spring-Kafka 的@RetryableTopic
和@DltHandler
? 注釋來完成的。
設置可重試消費者
@Slf4j
@Component
@RequiredArgsConstructor
public class CustomEventConsumer {
private final CustomEventHandler handler;
@RetryableTopic(attempts = "${retry.attempts}",
backoff = @Backoff(
delayExpression = "${retry.delay}",
multiplierExpression = "${retry.delay.multiplier}"
),
topicSuffixingStrategy = TopicSuffixingStrategy.SUFFIX_WITH_INDEX_VALUE,
dltStrategy = FAIL_ON_ERROR,
autoStartDltHandler = "true",
autoCreateTopics = "false",
include = {CustomRetryableException.class})
@KafkaListener(topics = "${topic}", id = "${default-consumer-group:default}")
public void consume(CustomEvent event, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
try {
log.info("Received event on topic {}", topic);
handler.handleEvent(event);
} catch (Exception e) {
log.error("Error occurred while processing event", e);
throw e;
}
}
@DltHandler
public void listenOnDlt(@Payload CustomEvent event) {
log.error("Received event on dlt.");
handler.handleEventFromDlt(event);
}
}
如果您注意到上面的代碼片段,include
參數(shù)包含CustomRetryableException.class
.?這告訴使用者僅在該方法拋出 CustomRetryableException 時才重試CustomEventHandler#handleEvent
。您可以根據(jù)需要添加任意數(shù)量。還有一個排除參數(shù),但一次可以使用其中任何一個參數(shù)。
${retry.attempts}
在發(fā)布到 DLT 之前,事件處理應重試最多次數(shù)。
設置測試基礎設施
要編寫集成測試,您需要確保擁有一個正常運行的 Kafka 代理(首選嵌入式)和一個功能齊全的發(fā)布者。讓我們設置我們的基礎設施:
@EnableKafka
@SpringBootTest
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_CLASS)
@EmbeddedKafka(partitions = 1,
brokerProperties = {"listeners=" + "${kafka.broker.listeners}",
"port=" + "${kafka.broker.port}"},
controlledShutdown = true,
topics = {"test", "test-retry-0", "test-retry-1", "test-dlt"}
)
@ActiveProfiles("test")
class DocumentEventConsumerIntegrationTest {
@Autowired
private KafkaTemplate<String, CustomEvent> testKafkaTemplate;
// tests
}
** 配置是從 application-test.yml 文件導入的。
使用嵌入式 kafka 代理時,重要的是要提及要創(chuàng)建的主題。它們不會自動創(chuàng)建。在本例中,我們創(chuàng)建四個主題,即?
"test", "test-retry-0", "test-retry-1", "test-dlt"
我們已將最大重試嘗試次數(shù)設置為 3 次。每個主題對應于每次重試嘗試。因此,如果 3 次重試都用盡,則應將事件轉發(fā)到 DLT。
測試用例
如果第一次嘗試消費成功,則不應重試。
CustomEventHandler#handleEvent
這可以通過該方法僅被調用一次的事實來測試。還可以添加對 Log 語句的進一步測試。
@Test
void test_should_not_retry_if_consumption_is_successful() throws ExecutionException, InterruptedException {
CustomEvent event = new CustomEvent("Hello");
// GIVEN
doNothing().when(customEventHandler).handleEvent(any(CustomEvent.class));
// WHEN
testKafkaTemplate.send("test", event).get();
// THEN
verify(customEventHandler, timeout(2000).times(1)).handleEvent(any(CustomEvent.class));
verify(customEventHandler, timeout(2000).times(0)).handleEventFromDlt(any(CustomEvent.class));
}
如果引發(fā)不可重試的異常,則不應重試。
在這種情況下,該CustomEventHandler#handleEvent
方法應該只調用一次:
? ? @Test
? ?void test_should_not_retry_if_non_retryable_exception_raised() throws ExecutionException, InterruptedException {
? ? ? ?CustomEvent event = new CustomEvent("Hello");
? ? ? ?// GIVEN
? ? ? ?doThrow(CustomNonRetryableException.class).when(customEventHandler).handleEvent(any(CustomEvent.class));
? ? ? ?// WHEN
? ? ? ?testKafkaTemplate.send("test", event).get();
? ? ? ?// THEN
? ? ? ?verify(customEventHandler, timeout(2000).times(1)).handleEvent(any(CustomEvent.class));
? ? ? ?verify(customEventHandler, timeout(2000).times(0)).handleEventFromDlt(any(CustomEvent.class));
? ?}
如果拋出 a,則重試配置的最大次數(shù)RetryableException
,并在重試用完后將其發(fā)布到死信主題。
在這種情況下,該CustomEventHandler#handleEvent
方法應被調用三次(maxRetries)次,并且CustomEventHandler#handleEventFromDlt
該方法應被調用一次。
@Test
void test_should_retry_maximum_times_and_publish_to_dlt_if_retryable_exception_raised() throws ExecutionException, InterruptedException {
CustomEvent event = new CustomEvent("Hello");
// GIVEN
doThrow(CustomRetryableException.class).when(customEventHandler).handleEvent(any(CustomEvent.class));
// WHEN
testKafkaTemplate.send("test", event).get();
// THEN
verify(customEventHandler, timeout(10000).times(maxRetries)).handleEvent(any(CustomEvent.class));
verify(customEventHandler, timeout(2000).times(1)).handleEventFromDlt(any(CustomEvent.class));
}
**在驗證階段添加了相當大的超時,以便在測試完成之前可以考慮指數(shù)退避延遲。這很重要,如果設置不當可能會導致斷言失敗。
應該重試直到RetryableException
解決,并且如果引發(fā)不可重試的異?;蛳M最終成功,則不應繼續(xù)重試。
測試已設置為RetryableException
先拋出 a 然后再拋出 a?NonRetryable exception
,以便重試一次。文章來源:http://www.zghlxwxcb.cn/news/detail-661910.html
@Test
void test_should_retry_until_retryable_exception_is_resolved_by_non_retryable_exception() throws ExecutionException,
InterruptedException {
CustomEvent event = new CustomEvent("Hello");
// GIVEN
doThrow(CustomRetryableException.class).doThrow(CustomNonRetryableException.class).when(customEventHandler).handleEvent(any(CustomEvent.class));
// WHEN
testKafkaTemplate.send("test", event).get();
// THEN
verify(customEventHandler, timeout(10000).times(2)).handleEvent(any(CustomEvent.class));
verify(customEventHandler, timeout(2000).times(0)).handleEventFromDlt(any(CustomEvent.class));
}
@Test
void test_should_retry_until_retryable_exception_is_resolved_by_successful_consumption() throws ExecutionException,
InterruptedException {
CustomEvent event = new CustomEvent("Hello");
// GIVEN
doThrow(CustomRetryableException.class).doNothing().when(customEventHandler).handleEvent(any(CustomEvent.class));
// WHEN
testKafkaTemplate.send("test", event).get();
// THEN
verify(customEventHandler, timeout(10000).times(2)).handleEvent(any(CustomEvent.class));
verify(customEventHandler, timeout(2000).times(0)).handleEventFromDlt(any(CustomEvent.class));
? ? }
結論
因此,您可以看到集成測試是策略、超時、延遲和驗證的混合和匹配,以確保 Kafka 事件驅動架構的重試機制萬無一失。文章來源地址http://www.zghlxwxcb.cn/news/detail-661910.html
到了這里,關于非阻塞重試與 Spring Kafka 的集成測試的文章就介紹完了。如果您還想了解更多內(nèi)容,請在右上角搜索TOY模板網(wǎng)以前的文章或繼續(xù)瀏覽下面的相關文章,希望大家以后多多支持TOY模板網(wǎng)!