import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.ValueSource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
-import static de.juplo.kafka.ExampleConsumerTest.NUM_PARTITIONS;
-import static de.juplo.kafka.ExampleConsumerTest.TOPIC;
+import static de.juplo.kafka.ExampleConsumerTest.*;
@SpringBootTest(
},
properties = {
"juplo.bootstrap-server=${spring.embedded.kafka.brokers}",
+ "juplo.consumer.max-poll-interval=" + MAX_POLL_INTERVALL_MS + "ms",
+ "juplo.consumer.max-time-per-record=" + ERROR_TIMEOUT_MS + "ms",
+ "juplo.consumer.num-retries=" + NUM_RETRIES,
"spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer",
"logging.level.de.juplo.kafka=TRACE",
})
.until(() -> isTerminatedExceptionally.get());
}
+ @DisplayName("A message, that triggers an non-retryable exception in the domain-logic, is skipped and all other messages are consumed")
+ @Test
+ void testNonRetryableDomainError() throws Exception
+ {
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendMessageThatTriggersNonRetriableExceptionInDomain(3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendValidMessage(3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+
+ Awaitility
+ .await("All other valid messages are consumed")
+ .atMost(Duration.ofSeconds(15))
+ .until(() -> mockRecordHandler.getNumMessagesHandled() == 19);
+ }
+
+ @DisplayName("A message, that triggers a retryable exception in the domain-logic, is retried and all messages are eventually consumed")
+ @ParameterizedTest(name = "Number of failures for the 1. message in partition 3: {0}")
+ @ValueSource(ints = { 1, 2, 3, 4, 5, 6 })
+ void testOneMessageCausesRetryableDomainErrors(int numFailures)
+ {
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendMessageThatTriggersRetriableExceptionInDomain(3,numFailures);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendValidMessage(3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+
+ Awaitility
+ .await("All messages are eventually consumed")
+ .atMost(Duration.ofSeconds(15))
+ .until(() -> mockRecordHandler.getNumMessagesHandled() == 20);
+ }
+
+ @DisplayName("A message, that triggers a retryable exception in the domain-logic, but fails too often, is skipped and all other messages are eventually consumed")
+ @Test
+ void testOneMessageCausesRetryableDomainErrors()
+ {
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendMessageThatTriggersRetriableExceptionInDomain(3,66);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendValidMessage(3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+
+ Awaitility
+ .await("All other messages are eventually consumed")
+ .atMost(Duration.ofSeconds(15))
+ .until(() -> mockRecordHandler.getNumMessagesHandled() == 19);
+ }
+
+ @DisplayName("Three messages, that trigger retryable exceptions in the domain-logic, are retried and all messages are eventually consumed")
+ @ParameterizedTest(name = "Number of failures for the 1. message in partition 3: {0}, number of failures for the 1. message in partition 6: {1}, number of failures for the 2. message in partition 6: {2}")
+ @CsvSource({ "1,1,1", "6,3,4", "4,5,2", "1,2,3", "6,6,6" })
+ void testThreeMessagesCauseRetryableDomainErrors(
+ int numFailuresForMessageA,
+ int numFailuresForMessageB,
+ int numFailuresForMessageC)
+ {
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendMessageThatTriggersRetriableExceptionInDomain(3,numFailuresForMessageA);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendMessageThatTriggersRetriableExceptionInDomain(6,numFailuresForMessageB);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendValidMessage(3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendMessageThatTriggersRetriableExceptionInDomain(6,numFailuresForMessageC);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendValidMessage(3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+
+ Awaitility
+ .await("All messages are eventually consumed")
+ .atMost(Duration.ofSeconds(20))
+ .until(() -> mockRecordHandler.getNumMessagesHandled() == 30);
+ }
+
+ @DisplayName("Three messages, that trigger retryable exceptions in the domain-logic, are retried, but one of them fails too often and is skipped, still all other messages are eventually consumed")
+ @ParameterizedTest(name = "Number of failures for the 1. message in partition 3: {0}, number of failures for the 1. message in partition 6: {1}, number of failures for the 2. message in partition 6: {2}")
+ @CsvSource({ "66,3,4", "4,66,2", "1,2,66" })
+ void testThreeMessagesCauseRetryableDomainErrorsAndOneFailsTooOften(
+ int numFailuresForMessageA,
+ int numFailuresForMessageB,
+ int numFailuresForMessageC)
+ {
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendMessageThatTriggersRetriableExceptionInDomain(3,numFailuresForMessageA);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendMessageThatTriggersRetriableExceptionInDomain(6,numFailuresForMessageB);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendValidMessage(3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendMessageThatTriggersRetriableExceptionInDomain(6,numFailuresForMessageC);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendValidMessage(3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+
+ Awaitility
+ .await("All other messages are eventually consumed")
+ .atMost(Duration.ofSeconds(20))
+ .until(() -> mockRecordHandler.getNumMessagesHandled() == 29);
+ }
+
static final String ID = "TEST";
static final String TOPIC = "ExampleConsumerTest_TEST";
static final int NUM_PARTITIONS = 10;
+ static final int NUM_RETRIES = 6;
+ static final int MAX_POLL_INTERVALL_MS = 5000;
+ static final int ERROR_TIMEOUT_MS = 1000;
@Autowired
KafkaTemplate<String, byte[]> kafkaTemplate;
() -> isTerminatedExceptionally.set(true));
}
+ @BeforeEach
+ void resetParameters()
+ {
+ mockRecordHandler.exceptionalRecordHandlingDelay = Duration.ofMillis(ERROR_TIMEOUT_MS);
+ }
+
@AfterEach
void resetSetup(@Autowired AdminClient adminClient) throws InterruptedException, ExecutionException
{
send(partition, VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION);
}
+ private void sendMessageThatTriggersNonRetriableExceptionInDomain(int partition)
+ {
+ send(partition, serializer.serialize(TOPIC,(long)VALUE_THAT_TRIGGERS_NONRETRIABLEEXCEPTION));
+ }
+
+ private void sendMessageThatTriggersRetriableExceptionInDomain(int partition, int numFailures)
+ {
+ send(partition, serializer.serialize(TOPIC,(long)VALUE_THAT_TRIGGERS_RETRIABLEEXCEPTION * numFailures));
+ }
+
private void send(int partition, long message)
{
send(partition, serializer.serialize(TOPIC, message));
public final static int VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION = -1;
+ public final static int VALUE_THAT_TRIGGERS_RETRIABLEEXCEPTION = -2;
+ public final static int VALUE_THAT_TRIGGERS_NONRETRIABLEEXCEPTION = -3;
+
@TestConfiguration
package de.juplo.kafka;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import static de.juplo.kafka.ExampleConsumerTest.VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+
+import static de.juplo.kafka.ExampleConsumerTest.*;
-@RequiredArgsConstructor
@Slf4j
public class MockRecordHandler implements RecordHandler<String, Long>
{
- @Getter
+ private final Map<OffsetInPartition, Integer> retriableErrors = new HashMap<>();
+
+ Duration exceptionalRecordHandlingDelay;
+
private int numMessagesHandled = 0;
+ public int getNumMessagesHandled()
+ {
+ return numMessagesHandled;
+ }
+
@Override
public void handleRecord(
String topic,
Integer partition,
Long offset,
String key,
- Long value)
+ Long value) throws RetriableErrorException, NonRetriableErrorException
{
if (value != null && value < 0)
{
- generateError(value);
+ generateError(new OffsetInPartition(offset, partition), value.intValue());
}
numMessagesHandled++;
log.trace("Handled {} messages so far", numMessagesHandled);
}
- private void generateError(long value)
+ private void generateError(
+ OffsetInPartition offset,
+ int value) throws RetriableErrorException, NonRetriableErrorException
{
if (value == VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION)
{
throw new RuntimeException("Unexpected application error!");
}
- log.info("Not specifically mapped error: {}", value);
+ if (value == VALUE_THAT_TRIGGERS_NONRETRIABLEEXCEPTION)
+ {
+ throw new NonRetriableErrorException("Non-Retryable application error!");
+ }
+
+ if ((float)value % (float)VALUE_THAT_TRIGGERS_RETRIABLEEXCEPTION == 0f)
+ {
+ int totalOccurrences = value / VALUE_THAT_TRIGGERS_RETRIABLEEXCEPTION;
+ int occurrence = retriableErrors.compute(
+ offset,
+ (k, v) ->
+ {
+ if (v == null)
+ {
+ v = totalOccurrences;
+ }
+ else
+ {
+ v--;
+ }
+
+ return v;
+ });
+
+ if (occurrence <= 0)
+ {
+ retriableErrors.remove(offset);
+ }
+ else
+ {
+ log.debug(
+ "Simulating occurrence #{} of {} for a retryable error at offset {} in partition {}",
+ totalOccurrences - occurrence + 1,
+ totalOccurrences,
+ offset.offset,
+ offset.partition);
+ sleep(exceptionalRecordHandlingDelay);
+ throw new RetriableErrorException("Retryable application error! Occurrence #" + (totalOccurrences - occurrence + 1));
+ }
+
+ log.info("Simulating a resolved retryable error after {} occurrences of the error", totalOccurrences);
+ }
+ else
+ {
+ log.warn("Not specifically mapped error: {}", value);
+ }
+ }
+
+ private void sleep(Duration duration)
+ {
+ try
+ {
+ Thread.sleep(duration);
+ }
+ catch(InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ throw new RuntimeException(e);
+ }
}
public void clear()
{
+ retriableErrors.clear();
numMessagesHandled = 0;
}
+
+
+ private static record OffsetInPartition(long offset, int partition) {}
}