TMP
authorKai Moritz <kai@juplo.de>
Sat, 14 Dec 2024 16:30:32 +0000 (17:30 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 15 Dec 2024 11:19:17 +0000 (12:19 +0100)
src/main/java/de/juplo/kafka/ExampleConsumer.java
src/main/java/de/juplo/kafka/NonRetriableErrorException.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/RecordHandler.java
src/main/java/de/juplo/kafka/RetriableErrorException.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/ExampleConsumerTest.java
src/test/java/de/juplo/kafka/MockRecordHandler.java

index 65a1b5d..17a0264 100644 (file)
@@ -114,7 +114,7 @@ public class ExampleConsumer implements Runnable
     Integer partition,
     Long offset,
     String key,
-    Long value)
+    Long value) throws RetriableErrorException, NonRetriableErrorException
   {
     consumed++;
     log.info("{} - {}: {}/{} - {}={}", id, offset, topic, partition, key, value);
diff --git a/src/main/java/de/juplo/kafka/NonRetriableErrorException.java b/src/main/java/de/juplo/kafka/NonRetriableErrorException.java
new file mode 100644 (file)
index 0000000..0eb0ff2
--- /dev/null
@@ -0,0 +1,9 @@
+package de.juplo.kafka;
+
+public class NonRetriableErrorException extends Exception
+{
+  public NonRetriableErrorException(String message)
+  {
+    super(message);
+  }
+}
index a7b65af..5edcf58 100644 (file)
@@ -7,5 +7,5 @@ public interface RecordHandler<K, V>
     Integer partition,
     Long offset,
     K key,
-    V value);
+    V value) throws RetriableErrorException, NonRetriableErrorException;
 }
diff --git a/src/main/java/de/juplo/kafka/RetriableErrorException.java b/src/main/java/de/juplo/kafka/RetriableErrorException.java
new file mode 100644 (file)
index 0000000..598ddb0
--- /dev/null
@@ -0,0 +1,9 @@
+package de.juplo.kafka;
+
+public class RetriableErrorException extends Exception
+{
+  public RetriableErrorException(String message)
+  {
+    super(message);
+  }
+}
index 19a9170..f9bf48e 100644 (file)
@@ -202,6 +202,26 @@ public class ExampleConsumerTest
     send(partition, VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION);
   }
 
+  private void sendMessageThatTriggersNonRetriableExceptionInDomain(int partition) throws Exception
+  {
+    sendMessageThatTriggersNonRetriableExceptionInDomain(partition, 1);
+  }
+
+  private void sendMessageThatTriggersNonRetriableExceptionInDomain(int partition, int numFailures)
+  {
+    send(partition, serializer.serialize(TOPIC,(long)VALUE_THAT_TRIGGERS_NONRETRIABLEEXCEPTION * numFailures));
+  }
+
+  private void sendMessageThatTriggersRetriableExceptionInDomain(int partition)
+  {
+    sendMessageThatTriggersRetriableExceptionInDomain(partition, 0);
+  }
+
+  private void sendMessageThatTriggersRetriableExceptionInDomain(int partition, int numFailures)
+  {
+    send(partition, serializer.serialize(TOPIC,(long)VALUE_THAT_TRIGGERS_RETRIABLEEXCEPTION * numFailures));
+  }
+
   private void send(int partition, long message)
   {
     send(partition, serializer.serialize(TOPIC, message));
@@ -213,7 +233,11 @@ public class ExampleConsumerTest
     kafkaTemplate.send(TOPIC, partition, "EGAL", bytes);
   }
 
+
   public final static int VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION = -1;
+  public final static int VALUE_THAT_TRIGGERS_RETRIABLEEXCEPTION = -2;
+  public final static int VALUE_THAT_TRIGGERS_NONRETRIABLEEXCEPTION = -3;
+
 
 
   @TestConfiguration
index fe87361..4587648 100644 (file)
@@ -6,7 +6,7 @@ import lombok.extern.slf4j.Slf4j;
 
 import java.time.Duration;
 
-import static de.juplo.kafka.ExampleConsumerTest.VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION;
+import static de.juplo.kafka.ExampleConsumerTest.*;
 
 
 @RequiredArgsConstructor
@@ -24,7 +24,7 @@ public class MockRecordHandler implements RecordHandler<String, Long> {
       Integer partition,
       Long offset,
       String key,
-      Long value) {
+      Long value) throws RetriableErrorException, NonRetriableErrorException {
 
       if (value < 0)
       {
@@ -35,16 +35,50 @@ public class MockRecordHandler implements RecordHandler<String, Long> {
       log.trace("Handled request for topic {}, num requests handled so far: {}", topic, numRequestsHandled);
     }
 
-    private void generateError(long value)
+    private void generateError(long value) throws RetriableErrorException, NonRetriableErrorException
     {
       if (value == VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION)
       {
-        throw new RuntimeException("Unexpected application error!");
+
+      }
+
+      if ((float)value % (float)VALUE_THAT_TRIGGERS_RETRIABLEEXCEPTION == 0f)
+      {
+        sleep(exceptionDelay);
+        if (value % VALUE_THAT_TRIGGERS_RETRIABLEEXCEPTION == 0)
+        {
+          throw new NonRetriableErrorException("Non-Retriable application error!");
+        }
+        return;
       }
 
+      if ((float)value % (float)VALUE_THAT_TRIGGERS_NONRETRIABLEEXCEPTION == 0f)
+      {
+        sleep(exceptionDelay);
+        if (value % VALUE_THAT_TRIGGERS_NONRETRIABLEEXCEPTION == 0)
+        {
+          throw new NonRetriableErrorException("Non-Retriable application error!");
+        }
+        return;
+      }
+
+
       log.info("Not specifically mapped error: {}", value);
     }
 
+    private void sleep(Duration duration)
+    {
+        try
+        {
+            Thread.sleep(exceptionDelay);
+        }
+        catch(InterruptedException e)
+        {
+            Thread.currentThread().interrupt();
+            throw new RuntimeException(e);
+        }
+    }
+
     public void clear()
     {
         numRequestsHandled = 0;