RED: Erwartetes Verhalten für die Fehlerbehandlung definiert
authorKai Moritz <kai@juplo.de>
Mon, 16 Dec 2024 20:29:24 +0000 (21:29 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 5 Jan 2025 09:49:05 +0000 (10:49 +0100)
src/test/java/de/juplo/kafka/ExampleConsumerTest.java
src/test/java/de/juplo/kafka/MockRecordHandler.java

index b480fa3..c2534f2 100644 (file)
@@ -12,6 +12,9 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.DisplayName;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+import org.junit.jupiter.params.provider.ValueSource;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
@@ -31,8 +34,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
-import static de.juplo.kafka.ExampleConsumerTest.NUM_PARTITIONS;
-import static de.juplo.kafka.ExampleConsumerTest.TOPIC;
+import static de.juplo.kafka.ExampleConsumerTest.*;
 
 
 @SpringBootTest(
@@ -43,6 +45,9 @@ import static de.juplo.kafka.ExampleConsumerTest.TOPIC;
   },
   properties = {
     "juplo.bootstrap-server=${spring.embedded.kafka.brokers}",
+    "juplo.consumer.max-poll-interval=" + MAX_POLL_INTERVALL_MS + "ms",
+    "juplo.consumer.max-time-per-record=" + ERROR_TIMEOUT_MS + "ms",
+    "juplo.consumer.num-retries=" + NUM_RETRIES,
     "spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer",
     "logging.level.de.juplo.kafka=TRACE",
   })
@@ -143,10 +148,197 @@ public class ExampleConsumerTest
       .until(() -> isTerminatedExceptionally.get());
   }
 
+  @DisplayName("A message, that triggers an non-retryable exception in the domain-logic, is skipped and all other messages are consumed")
+  @Test
+  void testNonRetryableDomainError() throws Exception
+  {
+    sendValidMessage(0);
+    sendValidMessage(1);
+    sendValidMessage(2);
+    sendMessageThatTriggersNonRetriableExceptionInDomain(3);
+    sendValidMessage(4);
+    sendValidMessage(5);
+    sendValidMessage(6);
+    sendValidMessage(7);
+    sendValidMessage(8);
+    sendValidMessage(9);
+    sendValidMessage(0);
+    sendValidMessage(1);
+    sendValidMessage(2);
+    sendValidMessage(3);
+    sendValidMessage(4);
+    sendValidMessage(5);
+    sendValidMessage(6);
+    sendValidMessage(7);
+    sendValidMessage(8);
+    sendValidMessage(9);
+
+    Awaitility
+      .await("All other valid messages are consumed")
+      .atMost(Duration.ofSeconds(15))
+      .until(() -> mockRecordHandler.getNumMessagesHandled() == 19);
+  }
+
+  @DisplayName("A message, that triggers a retryable exception in the domain-logic, is retried and all messages are eventually consumed")
+  @ParameterizedTest(name = "Number of failures for the 1. message in partition 3: {0}")
+  @ValueSource(ints = { 1, 2, 3, 4, 5, 6 })
+  void testOneMessageCausesRetryableDomainErrors(int numFailures)
+  {
+    sendValidMessage(0);
+    sendValidMessage(1);
+    sendValidMessage(2);
+    sendMessageThatTriggersRetriableExceptionInDomain(3,numFailures);
+    sendValidMessage(4);
+    sendValidMessage(5);
+    sendValidMessage(6);
+    sendValidMessage(7);
+    sendValidMessage(8);
+    sendValidMessage(9);
+    sendValidMessage(0);
+    sendValidMessage(1);
+    sendValidMessage(2);
+    sendValidMessage(3);
+    sendValidMessage(4);
+    sendValidMessage(5);
+    sendValidMessage(6);
+    sendValidMessage(7);
+    sendValidMessage(8);
+    sendValidMessage(9);
+
+    Awaitility
+      .await("All messages are eventually consumed")
+      .atMost(Duration.ofSeconds(15))
+      .until(() -> mockRecordHandler.getNumMessagesHandled() == 20);
+  }
+
+  @DisplayName("A message, that triggers a retryable exception in the domain-logic, but fails too often, is skipped and all other messages are eventually consumed")
+  @Test
+  void testOneMessageCausesRetryableDomainErrors()
+  {
+    sendValidMessage(0);
+    sendValidMessage(1);
+    sendValidMessage(2);
+    sendMessageThatTriggersRetriableExceptionInDomain(3,66);
+    sendValidMessage(4);
+    sendValidMessage(5);
+    sendValidMessage(6);
+    sendValidMessage(7);
+    sendValidMessage(8);
+    sendValidMessage(9);
+    sendValidMessage(0);
+    sendValidMessage(1);
+    sendValidMessage(2);
+    sendValidMessage(3);
+    sendValidMessage(4);
+    sendValidMessage(5);
+    sendValidMessage(6);
+    sendValidMessage(7);
+    sendValidMessage(8);
+    sendValidMessage(9);
+
+    Awaitility
+      .await("All other messages are eventually consumed")
+      .atMost(Duration.ofSeconds(15))
+      .until(() -> mockRecordHandler.getNumMessagesHandled() == 19);
+  }
+
+  @DisplayName("Three messages, that trigger retryable exceptions in the domain-logic, are retried and all messages are eventually consumed")
+  @ParameterizedTest(name = "Number of failures for the 1. message in partition 3: {0}, number of failures for the 1. message in partition 6: {1}, number of failures for the 2. message in partition 6: {2}")
+  @CsvSource({ "1,1,1", "6,3,4", "4,5,2", "1,2,3", "6,6,6" })
+  void testThreeMessagesCauseRetryableDomainErrors(
+    int numFailuresForMessageA,
+    int numFailuresForMessageB,
+    int numFailuresForMessageC)
+  {
+    sendValidMessage(0);
+    sendValidMessage(1);
+    sendValidMessage(2);
+    sendMessageThatTriggersRetriableExceptionInDomain(3,numFailuresForMessageA);
+    sendValidMessage(4);
+    sendValidMessage(5);
+    sendMessageThatTriggersRetriableExceptionInDomain(6,numFailuresForMessageB);
+    sendValidMessage(7);
+    sendValidMessage(8);
+    sendValidMessage(9);
+    sendValidMessage(0);
+    sendValidMessage(1);
+    sendValidMessage(2);
+    sendValidMessage(3);
+    sendValidMessage(4);
+    sendValidMessage(5);
+    sendMessageThatTriggersRetriableExceptionInDomain(6,numFailuresForMessageC);
+    sendValidMessage(7);
+    sendValidMessage(8);
+    sendValidMessage(9);
+    sendValidMessage(0);
+    sendValidMessage(1);
+    sendValidMessage(2);
+    sendValidMessage(3);
+    sendValidMessage(4);
+    sendValidMessage(5);
+    sendValidMessage(6);
+    sendValidMessage(7);
+    sendValidMessage(8);
+    sendValidMessage(9);
+
+    Awaitility
+      .await("All messages are eventually consumed")
+      .atMost(Duration.ofSeconds(20))
+      .until(() -> mockRecordHandler.getNumMessagesHandled() == 30);
+  }
+
+  @DisplayName("Three messages, that trigger retryable exceptions in the domain-logic, are retried, but one of them fails too often and is skipped, still all other messages are eventually consumed")
+  @ParameterizedTest(name = "Number of failures for the 1. message in partition 3: {0}, number of failures for the 1. message in partition 6: {1}, number of failures for the 2. message in partition 6: {2}")
+  @CsvSource({ "66,3,4", "4,66,2", "1,2,66" })
+  void testThreeMessagesCauseRetryableDomainErrorsAndOneFailsTooOften(
+    int numFailuresForMessageA,
+    int numFailuresForMessageB,
+    int numFailuresForMessageC)
+  {
+    sendValidMessage(0);
+    sendValidMessage(1);
+    sendValidMessage(2);
+    sendMessageThatTriggersRetriableExceptionInDomain(3,numFailuresForMessageA);
+    sendValidMessage(4);
+    sendValidMessage(5);
+    sendMessageThatTriggersRetriableExceptionInDomain(6,numFailuresForMessageB);
+    sendValidMessage(7);
+    sendValidMessage(8);
+    sendValidMessage(9);
+    sendValidMessage(0);
+    sendValidMessage(1);
+    sendValidMessage(2);
+    sendValidMessage(3);
+    sendValidMessage(4);
+    sendValidMessage(5);
+    sendMessageThatTriggersRetriableExceptionInDomain(6,numFailuresForMessageC);
+    sendValidMessage(7);
+    sendValidMessage(8);
+    sendValidMessage(9);
+    sendValidMessage(0);
+    sendValidMessage(1);
+    sendValidMessage(2);
+    sendValidMessage(3);
+    sendValidMessage(4);
+    sendValidMessage(5);
+    sendValidMessage(6);
+    sendValidMessage(7);
+    sendValidMessage(8);
+    sendValidMessage(9);
+
+    Awaitility
+      .await("All other messages are eventually consumed")
+      .atMost(Duration.ofSeconds(20))
+      .until(() -> mockRecordHandler.getNumMessagesHandled() == 29);
+  }
+
 
   static final String ID = "TEST";
   static final String TOPIC = "ExampleConsumerTest_TEST";
   static final int NUM_PARTITIONS = 10;
+  static final int NUM_RETRIES = 6;
+  static final int MAX_POLL_INTERVALL_MS = 5000;
+  static final int ERROR_TIMEOUT_MS = 1000;
 
   @Autowired
   KafkaTemplate<String, byte[]> kafkaTemplate;
@@ -182,6 +374,12 @@ public class ExampleConsumerTest
       () -> isTerminatedExceptionally.set(true));
   }
 
+  @BeforeEach
+  void resetParameters()
+  {
+    mockRecordHandler.exceptionalRecordHandlingDelay = Duration.ofMillis(ERROR_TIMEOUT_MS);
+  }
+
   @AfterEach
   void resetSetup(@Autowired AdminClient adminClient) throws InterruptedException, ExecutionException
   {
@@ -226,6 +424,16 @@ public class ExampleConsumerTest
     send(partition, VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION);
   }
 
+  private void sendMessageThatTriggersNonRetriableExceptionInDomain(int partition)
+  {
+    send(partition, serializer.serialize(TOPIC,(long)VALUE_THAT_TRIGGERS_NONRETRIABLEEXCEPTION));
+  }
+
+  private void sendMessageThatTriggersRetriableExceptionInDomain(int partition, int numFailures)
+  {
+    send(partition, serializer.serialize(TOPIC,(long)VALUE_THAT_TRIGGERS_RETRIABLEEXCEPTION * numFailures));
+  }
+
   private void send(int partition, long message)
   {
     send(partition, serializer.serialize(TOPIC, message));
@@ -245,6 +453,9 @@ public class ExampleConsumerTest
 
 
   public final static int VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION = -1;
+  public final static int VALUE_THAT_TRIGGERS_RETRIABLEEXCEPTION = -2;
+  public final static int VALUE_THAT_TRIGGERS_NONRETRIABLEEXCEPTION = -3;
+
 
 
   @TestConfiguration
index 9f540f0..37be0b4 100644 (file)
 package de.juplo.kafka;
 
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 
-import static de.juplo.kafka.ExampleConsumerTest.VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION;
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+
+import static de.juplo.kafka.ExampleConsumerTest.*;
 
 
-@RequiredArgsConstructor
 @Slf4j
 public class MockRecordHandler implements RecordHandler<String, Long>
 {
-  @Getter
+  private final Map<OffsetInPartition, Integer> retriableErrors = new HashMap<>();
+
+  Duration exceptionalRecordHandlingDelay;
+
   private int numMessagesHandled = 0;
 
+  public int getNumMessagesHandled()
+  {
+    return numMessagesHandled;
+  }
+
   @Override
   public void handleRecord(
     String topic,
     Integer partition,
     Long offset,
     String key,
-    Long value)
+    Long value) throws RetriableErrorException, NonRetriableErrorException
   {
     if (value != null && value < 0)
     {
-      generateError(value);
+      generateError(new OffsetInPartition(offset, partition), value.intValue());
     }
 
     numMessagesHandled++;
     log.trace("Handled {} messages so far", numMessagesHandled);
   }
 
-  private void generateError(long value)
+  private void generateError(
+    OffsetInPartition offset,
+    int value) throws RetriableErrorException, NonRetriableErrorException
   {
     if (value == VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION)
     {
       throw new RuntimeException("Unexpected application error!");
     }
 
-    log.info("Not specifically mapped error: {}", value);
+    if (value == VALUE_THAT_TRIGGERS_NONRETRIABLEEXCEPTION)
+    {
+      throw new NonRetriableErrorException("Non-Retryable application error!");
+    }
+
+    if ((float)value % (float)VALUE_THAT_TRIGGERS_RETRIABLEEXCEPTION == 0f)
+    {
+      int totalOccurrences = value / VALUE_THAT_TRIGGERS_RETRIABLEEXCEPTION;
+      int occurrence = retriableErrors.compute(
+        offset,
+        (k, v) ->
+        {
+          if (v == null)
+          {
+            v = totalOccurrences;
+          }
+          else
+          {
+            v--;
+          }
+
+          return v;
+        });
+
+      if (occurrence <= 0)
+      {
+        retriableErrors.remove(offset);
+      }
+      else
+      {
+        log.debug(
+          "Simulating occurrence #{} of {} for a retryable error at offset {} in partition {}",
+          totalOccurrences - occurrence + 1,
+          totalOccurrences,
+          offset.offset,
+          offset.partition);
+        sleep(exceptionalRecordHandlingDelay);
+        throw new RetriableErrorException("Retryable application error! Occurrence #" + (totalOccurrences - occurrence + 1));
+      }
+
+      log.info("Simulating a resolved retryable error after {} occurrences of the error", totalOccurrences);
+    }
+    else
+    {
+      log.warn("Not specifically mapped error: {}", value);
+    }
+  }
+
+  private void sleep(Duration duration)
+  {
+    try
+    {
+      Thread.sleep(duration);
+    }
+    catch(InterruptedException e)
+    {
+      Thread.currentThread().interrupt();
+      throw new RuntimeException(e);
+    }
   }
 
   public void clear()
   {
+    retriableErrors.clear();
     numMessagesHandled = 0;
   }
+
+
+  private static record OffsetInPartition(long offset, int partition) {}
 }