Instant now = clock.instant();
Duration timeLeft = Duration.between(now, deadline);
+ log.trace("{} - Time left for current poll: {}", id, timeLeft);
if (timeLeft.minus(minTimeForNextRecord).isNegative())
{
@EmbeddedKafka(topics = TOPIC, partitions = NUM_PARTITIONS)
public class ExampleConsumerTest
{
- @DisplayName("All messages are consumed")
+ @DisplayName("All messages are consumed as expected")
@Test
void testOnlyValidMessages()
{
.until(() -> mockRecordHandler.getNumMessagesHandled() == 20);
}
+ @DisplayName("Delayed messages are consumed as expected")
+ @ParameterizedTest(name = "delay for message-consumption: {0}")
+ @ValueSource(ints = { 10, 25, 50, 75, 100 })
+ void testOnlyValidMessagesButAllDelayed(int delay)
+ {
+ createExampleConsumer();
+ mockRecordHandler.normalRecordHandlingDelay = Duration.ofMillis(delay);
+
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendValidMessage(3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendValidMessage(3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+
+ Awaitility
+ .await("All messages are consumed")
+ .atMost(Duration.ofSeconds(5))
+ .until(() -> mockRecordHandler.getNumMessagesHandled() == 20);
+ }
+
@DisplayName("A deserialization exception is skipped and all valid messages are consumed")
@Test
void testDeserializationException()
@BeforeEach
void resetParameters()
{
+ mockRecordHandler.normalRecordHandlingDelay = Duration.ofMillis(0);
mockRecordHandler.exceptionalRecordHandlingDelay = Duration.ofMillis(0);
}
{
private final Map<OffsetInPartition, Integer> retriableErrors = new HashMap<>();
+ Duration normalRecordHandlingDelay;
Duration exceptionalRecordHandlingDelay;
private int numMessagesHandled = 0;
generateError(new OffsetInPartition(offset, partition), value.intValue());
}
+ sleep(normalRecordHandlingDelay);
numMessagesHandled++;
log.trace("Handled {} messages so far", numMessagesHandled);
}