.until(() -> mockRecordHandler.getNumMessagesHandled() == 20);
}
+ @DisplayName("All messages on a partition are delayed and a message, that triggers a retryable exception in the domain-logic, is retried and all messages are eventually consumed")
+ @ParameterizedTest(name = "Delay for normal messages: {0}ms")
+ @ValueSource(ints = { 10, 20, 50, 100, 150, 200 })
+ void testOneMessageCausesRetryableDomainErrorsWhileAllMessagesAreDelayed(int delay)
+ {
+ createExampleConsumer();
+ mockRecordHandler.normalRecordHandlingDelay = Duration.ofMillis(delay);
+ mockRecordHandler.exceptionalRecordHandlingDelay = Duration.ofMillis(100);
+
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendMessageThatTriggersRetriableExceptionInDomain(3, 1);
+ sendValidMessage(3);
+
+ Awaitility
+ .await("All messages are eventually consumed")
+ .atMost(Duration.ofSeconds(15))
+ .until(() -> mockRecordHandler.getNumMessagesHandled() == 20);
+ }
+
@DisplayName("A message, that triggers a retryable exception in the domain-logic, but fails too often, is skipped and all other messages are eventually consumed")
@Test
void testOneMessageCausesRetryableDomainErrors()