"juplo.consumer.poll-request-timeout=" + POLL_REQUEST_TIMEOUT_MS + "ms",
"juplo.consumer.max-poll-interval=" + MAX_POLL_INTERVALL_MS + "ms",
"juplo.consumer.max-time-per-record=" + MAX_TIME_PER_RECORD_MS + "ms",
+ "juplo.consumer.fetch-max-wait=" + FETCH_MAX_WAIT_MS + "ms",
"juplo.consumer.min-slack-per-poll-interval=" + MIN_SLACK_PER_POLL_INTERVAL_MS + "ms",
"juplo.consumer.num-retries=" + NUM_RETRIES,
"spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer",
.untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(19));
}
+ @DisplayName("Several messages are triggering retryable exception in one partition, some of them fail so often, that they are skipped and all successful messages are eventually consumed")
+ @ParameterizedTest(name = "Delay for errors: {0}ms")
+ @ValueSource(ints = { 10, 20, 50, 100, 150, 200 })
+ void testSeveralMessagesCausesRetryableDomainErrorsInOnePartition(int delay)
+ {
+ createExampleConsumer();
+ mockRecordHandler.exceptionalRecordHandlingDelay = Duration.ofMillis(delay);
+
+ sendValidMessage(3);
+ sendMessageThatTriggersRetriableExceptionInDomain(3, 4);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendMessageThatTriggersRetriableExceptionInDomain(3, 6);
+ sendMessageThatTriggersRetriableExceptionInDomain(3, 1);
+ sendValidMessage(3);
+ sendMessageThatTriggersRetriableExceptionInDomain(3, 66);
+ sendMessageThatTriggersRetriableExceptionInDomain(3, 5);
+ sendValidMessage(3);
+ sendValidMessage(3);
+ sendMessageThatTriggersRetriableExceptionInDomain(3, 66);
+ sendMessageThatTriggersRetriableExceptionInDomain(3, 6);
+ sendMessageThatTriggersRetriableExceptionInDomain(3, 66);
+ sendMessageThatTriggersRetriableExceptionInDomain(3, 3);
+ sendValidMessage(3);
+ sendMessageThatTriggersRetriableExceptionInDomain(3, 66);
+ sendValidMessage(3);
+ sendMessageThatTriggersRetriableExceptionInDomain(3, 1);
+ sendValidMessage(3);
+
+ Awaitility
+ .await("All other messages are eventually consumed")
+ .atMost(Duration.ofSeconds(15))
+ .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(16));
+ }
+
@DisplayName("Three messages, that trigger retryable exceptions in the domain-logic, are retried and all messages are eventually consumed")
@ParameterizedTest(name = "Number of failures for the 1. message in partition 3: {0}, number of failures for the 1. message in partition 6: {1}, number of failures for the 2. message in partition 6: {2}")
@CsvSource({ "1,1,1", "6,3,4", "4,5,2", "1,2,3", "6,6,6" })
static final int POLL_REQUEST_TIMEOUT_MS = 50;
static final int MAX_POLL_INTERVALL_MS = 500;
static final int MAX_TIME_PER_RECORD_MS = 100;
+ static final int FETCH_MAX_WAIT_MS = 50;
static final int MIN_SLACK_PER_POLL_INTERVAL_MS = 100;
@Autowired