Error-Handling für Deserialisierungs-Fehler implementiert und getestet
authorKai Moritz <kai@juplo.de>
Sun, 13 Apr 2025 09:23:19 +0000 (11:23 +0200)
committerKai Moritz <kai@juplo.de>
Tue, 15 Apr 2025 20:38:09 +0000 (22:38 +0200)
* Anwendung bereits entsprechend des Ziels umbenannt.
* Hier erst mal nur die Fehlerbehandlung für Deserialisierungs-Fehler, so
  wie in den Übungen dazu.
* Der Testfall ist schon auf die deutlich komplexeren Tests für die
  vollständige Fehlerbehandlung vorbereitet.

README.sh
build.gradle
docker/docker-compose.yml
pom.xml
src/main/java/de/juplo/kafka/ExampleConsumer.java
src/test/java/de/juplo/kafka/AbstractExampleConsumerTest.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/AbstractMockRecordHandler.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/LongExampleConsumerTest.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/LongMockRecordHandler.java [new file with mode: 0644]

index 203c22b..5ecc994 100755 (executable)
--- a/README.sh
+++ b/README.sh
@@ -1,6 +1,6 @@
 #!/bin/bash
 
-IMAGE=juplo/spring-consumer:1.1-record-handler-SNAPSHOT
+IMAGE=juplo/spring-consumer:1.1-error-handling-SNAPSHOT
 
 if [ "$1" = "cleanup" ]
 then
index f700918..3df5a51 100644 (file)
@@ -8,7 +8,7 @@ plugins {
 }
 
 group = 'de.juplo.kafka'
-version = '1.1-record-handler-SNAPSHOT'
+version = '1.1-error-handling-SNAPSHOT'
 
 java {
        toolchain {
index bf306f6..e4727a3 100644 (file)
@@ -140,7 +140,7 @@ services:
     command: kafka:9092 test producer
 
   consumer:
-    image: juplo/spring-consumer:1.1-record-handler-SNAPSHOT
+    image: juplo/spring-consumer:1.1-error-handling-SNAPSHOT
     environment:
       spring.kafka.bootstrap-servers: kafka:9092
       spring.kafka.client-id: consumer
@@ -149,7 +149,7 @@ services:
       juplo.consumer.topic: test
 
   peter:
-    image: juplo/spring-consumer:1.1-record-handler-SNAPSHOT
+    image: juplo/spring-consumer:1.1-error-handling-SNAPSHOT
     environment:
       spring.kafka.bootstrap-servers: kafka:9092
       spring.kafka.client-id: consumer
@@ -158,7 +158,7 @@ services:
       juplo.consumer.topic: test
 
   ute:
-    image: juplo/spring-consumer:1.1-record-handler-SNAPSHOT
+    image: juplo/spring-consumer:1.1-error-handling-SNAPSHOT
     environment:
       spring.kafka.bootstrap-servers: kafka:9092
       spring.kafka.client-id: consumer
diff --git a/pom.xml b/pom.xml
index 7590d25..7cd3cb0 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -15,7 +15,7 @@
   <artifactId>spring-consumer</artifactId>
   <name>Spring Consumer</name>
   <description>Super Simple Consumer-Group, that is implemented as Spring-Boot application and configured by Spring Kafka</description>
-  <version>1.1-record-handler-SNAPSHOT</version>
+  <version>1.1-error-handling-SNAPSHOT</version>
 
   <properties>
     <java.version>21</java.version>
index e5a8b3d..7e820ea 100644 (file)
@@ -4,6 +4,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.errors.RecordDeserializationException;
 import org.apache.kafka.common.errors.WakeupException;
 
 import java.time.Duration;
@@ -52,17 +53,30 @@ public class ExampleConsumer<K, V> implements Runnable
 
       while (true)
       {
-        ConsumerRecords<K, V> records = consumer.poll(Duration.ofSeconds(1));
-
-        log.info("{} - Received {} messages", id, records.count());
-        for (ConsumerRecord<K, V> record : records)
+        try
+        {
+          ConsumerRecords<K, V> records = consumer.poll(Duration.ofSeconds(1));
+
+          log.info("{} - Received {} messages", id, records.count());
+          for (ConsumerRecord<K, V> record : records)
+          {
+            handleRecord(
+              record.topic(),
+              record.partition(),
+              record.offset(),
+              record.key(),
+              record.value());
+          }
+        }
+        catch (RecordDeserializationException e)
         {
-          handleRecord(
-            record.topic(),
-            record.partition(),
-            record.offset(),
-            record.key(),
-            record.value());
+          log.error(
+            "{} - Ignoring invalid record for offset {} on partition {}: {}",
+            id,
+            e.offset(),
+            e.topicPartition(),
+            e.getMessage());
+          consumer.seek(e.topicPartition(), e.offset() + 1);
         }
       }
     }
diff --git a/src/test/java/de/juplo/kafka/AbstractExampleConsumerTest.java b/src/test/java/de/juplo/kafka/AbstractExampleConsumerTest.java
new file mode 100644 (file)
index 0000000..d87b175
--- /dev/null
@@ -0,0 +1,257 @@
+package de.juplo.kafka;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.RecordsToDelete;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Serializer;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
+import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.test.context.EmbeddedKafka;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static de.juplo.kafka.AbstractExampleConsumerTest.NUM_PARTITIONS;
+import static de.juplo.kafka.AbstractExampleConsumerTest.TOPIC;
+import static org.assertj.core.api.Assertions.assertThat;
+
+
+@SpringBootTest(
+  classes = {
+    KafkaAutoConfiguration.class,
+    ApplicationProperties.class,
+    AbstractExampleConsumerTest.ConsumerRunnableTestConfig.class,
+  },
+  properties = {
+    "spring.kafka.consumer.auto-offset-reset=earliest",
+    "spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer",
+    "logging.level.de.juplo.kafka=TRACE",
+  })
+@EmbeddedKafka(topics = TOPIC, partitions = NUM_PARTITIONS)
+public abstract class AbstractExampleConsumerTest<V>
+{
+  @DisplayName("All messages are consumed as expected")
+  @Test
+  void testOnlyValidMessages()
+  {
+    sendValidMessage(0);
+    sendValidMessage(1);
+    sendValidMessage(2);
+    sendValidMessage(3);
+    sendValidMessage(4);
+    sendValidMessage(5);
+    sendValidMessage(6);
+    sendValidMessage(7);
+    sendValidMessage(8);
+    sendValidMessage(9);
+    sendValidMessage(0);
+    sendValidMessage(1);
+    sendValidMessage(2);
+    sendValidMessage(3);
+    sendValidMessage(4);
+    sendValidMessage(5);
+    sendValidMessage(6);
+    sendValidMessage(7);
+    sendValidMessage(8);
+    sendValidMessage(9);
+
+    Awaitility
+      .await("All messages are consumed")
+      .atMost(Duration.ofSeconds(5))
+      .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(20));
+  }
+
+  @DisplayName("A deserialization exception is skipped and all valid messages are consumed")
+  @Test
+  void testDeserializationException()
+  {
+    sendValidMessage(0);
+    sendValidMessage(1);
+    sendValidMessage(2);
+    sendNonDeserializableMessage(3);
+    sendValidMessage(4);
+    sendValidMessage(5);
+    sendValidMessage(6);
+    sendValidMessage(7);
+    sendValidMessage(8);
+    sendValidMessage(9);
+    sendValidMessage(0);
+    sendValidMessage(1);
+    sendValidMessage(2);
+    sendValidMessage(3);
+    sendValidMessage(4);
+    sendValidMessage(5);
+    sendValidMessage(6);
+    sendValidMessage(7);
+    sendValidMessage(8);
+    sendValidMessage(9);
+
+    Awaitility
+      .await("All valid messages are consumed")
+      .atMost(Duration.ofSeconds(15))
+      .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(19));
+  }
+
+  @DisplayName("A message, that triggers an unexpected exception in the domain-logic, exits the application")
+  @Test
+  void testUnexpectedDomainError() throws Exception
+  {
+    sendValidMessage(0);
+    sendValidMessage(1);
+    sendValidMessage(2);
+    sendValidMessage(3);
+    sendValidMessage(4);
+    sendValidMessage(5);
+    sendValidMessage(6);
+    sendValidMessage(7);
+    sendValidMessage(8);
+    sendValidMessage(9);
+    sendValidMessage(0);
+    sendValidMessage(1);
+    sendValidMessage(2);
+    sendMessageThatTriggersRuntimeExceptionInDomain(3);
+    sendValidMessage(4);
+    sendValidMessage(5);
+    sendValidMessage(6);
+    sendValidMessage(7);
+    sendValidMessage(8);
+    sendValidMessage(9);
+
+    Awaitility
+      .await("The ConsumerRunnable is exited by an unexpected exception")
+      .atMost(Duration.ofSeconds(5))
+      .pollInterval(Duration.ofMillis(250))
+      .untilAsserted(() -> assertThat(isTerminatedExceptionally.get()).isTrue());
+  }
+
+
+  static final String ID = "TEST";
+  static final String TOPIC = "ExampleConsumerTest_TEST";
+  static final int NUM_PARTITIONS = 10;
+
+  @Autowired
+  KafkaTemplate<String, byte[]> kafkaTemplate;
+
+  final Serializer<V> serializer = createSerializer();
+  final long[] currentOffsets = new long[NUM_PARTITIONS];
+
+  long nextMessage = 1;
+
+  final AbstractMockRecordHandler mockRecordHandler = createMockRecordHandler();
+  final AtomicBoolean isTerminatedExceptionally = new AtomicBoolean();
+
+  ExampleConsumer exampleConsumer;
+
+
+  abstract Serializer<V> createSerializer();
+  abstract AbstractMockRecordHandler<V> createMockRecordHandler();
+  abstract Consumer<?, ?> createConsumer(KafkaProperties properties);
+  abstract V createValidMessage();
+  abstract V createMessageThatTriggersRuntimeException();
+
+
+  @BeforeEach
+  void createExampleConsumer(@Autowired KafkaProperties properties)
+  {
+    exampleConsumer = new ExampleConsumer(
+      ID,
+      TOPIC,
+      createConsumer(properties),
+      mockRecordHandler,
+      () -> isTerminatedExceptionally.set(true));
+  }
+
+  @AfterEach
+  void resetSetup(@Autowired AdminClient adminClient) throws InterruptedException, ExecutionException
+  {
+    exampleConsumer.shutdown();
+    adminClient
+      .deleteRecords(recordsToDelete())
+      .all()
+      .get();
+    mockRecordHandler.clear();
+    nextMessage = 1;
+    isTerminatedExceptionally.set(false);
+  }
+
+  private Map<TopicPartition, RecordsToDelete> recordsToDelete()
+  {
+    return IntStream
+      .range(0, NUM_PARTITIONS)
+      .filter(i -> currentOffsets[i] > 0)
+      .mapToObj(i -> Integer.valueOf(i))
+      .collect(Collectors.toMap(
+        i -> new TopicPartition(TOPIC, i),
+        i -> recordsToDelete(i)));
+  }
+
+  private RecordsToDelete recordsToDelete(int partition)
+  {
+    return RecordsToDelete.beforeOffset(currentOffsets[partition] + 1);
+  }
+
+  private void sendValidMessage(int partition)
+  {
+    send(partition, createValidMessage());
+  }
+
+  private void sendNonDeserializableMessage(int partition)
+  {
+    send(partition, "BOOM!".getBytes());
+  }
+
+  private void sendMessageThatTriggersRuntimeExceptionInDomain(int partition)
+  {
+    send(partition, createMessageThatTriggersRuntimeException());
+  }
+
+  private void send(int partition, V message)
+  {
+    send(partition, serializer.serialize(TOPIC, message));
+  }
+
+  private void send(int partition, byte[] bytes)
+  {
+    nextMessage++;
+    kafkaTemplate
+      .send(TOPIC, partition, "EGAL", bytes)
+      .thenAccept(result ->
+      {
+        RecordMetadata metadata = result.getRecordMetadata();
+        currentOffsets[metadata.partition()] = metadata.offset();
+      });
+  }
+
+
+
+  @TestConfiguration
+  static class ConsumerRunnableTestConfig
+  {
+    @Bean
+    AdminClient adminClient(@Value("${spring.embedded.kafka.brokers}") String kafkaBroker)
+    {
+      Map<String, Object> properties = new HashMap<>();
+      properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBroker);
+      return AdminClient.create(properties);
+    }
+  }
+}
diff --git a/src/test/java/de/juplo/kafka/AbstractMockRecordHandler.java b/src/test/java/de/juplo/kafka/AbstractMockRecordHandler.java
new file mode 100644 (file)
index 0000000..c861726
--- /dev/null
@@ -0,0 +1,37 @@
+package de.juplo.kafka;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public abstract class AbstractMockRecordHandler<V> implements RecordHandler<String, V>
+{
+  private int numMessagesHandled = 0;
+
+  public int getNumMessagesHandled()
+  {
+    return numMessagesHandled;
+  }
+
+  @Override
+  public void handleRecord(
+    String topic,
+    Integer partition,
+    Long offset,
+    String key,
+    V value)
+  {
+    generateError(value);
+    numMessagesHandled++;
+    log.trace("Handled {} messages so far", numMessagesHandled);
+  }
+
+  abstract void generateError(V value);
+
+  public void clear()
+  {
+    numMessagesHandled = 0;
+  }
+}
diff --git a/src/test/java/de/juplo/kafka/LongExampleConsumerTest.java b/src/test/java/de/juplo/kafka/LongExampleConsumerTest.java
new file mode 100644 (file)
index 0000000..09456b2
--- /dev/null
@@ -0,0 +1,50 @@
+package de.juplo.kafka;
+
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
+
+import java.util.Map;
+
+
+public class LongExampleConsumerTest extends AbstractExampleConsumerTest<Long>
+{
+  @Override
+  AbstractMockRecordHandler<Long> createMockRecordHandler()
+  {
+    return new LongMockRecordHandler();
+  }
+
+  @Override
+  Serializer<Long> createSerializer()
+  {
+    return new LongSerializer();
+  }
+
+  @Override
+  Consumer<?, ?> createConsumer(KafkaProperties kafkaProperties)
+  {
+    Map<String, Object> properties = kafkaProperties.buildConsumerProperties();
+    properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
+    return new DefaultKafkaConsumerFactory<>(properties).createConsumer();
+  }
+
+  @Override
+  Long createValidMessage()
+  {
+    return nextMessage;
+  }
+
+  @Override
+  Long createMessageThatTriggersRuntimeException()
+  {
+    return VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION;
+  }
+
+
+  public final static long VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION = -1;
+}
diff --git a/src/test/java/de/juplo/kafka/LongMockRecordHandler.java b/src/test/java/de/juplo/kafka/LongMockRecordHandler.java
new file mode 100644 (file)
index 0000000..a6a1659
--- /dev/null
@@ -0,0 +1,20 @@
+package de.juplo.kafka;
+
+import lombok.extern.slf4j.Slf4j;
+
+import static de.juplo.kafka.LongExampleConsumerTest.VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION;
+
+
+@Slf4j
+public class LongMockRecordHandler extends AbstractMockRecordHandler<Long>
+{
+  void generateError(Long value)
+  {
+    if (value == VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION)
+    {
+      throw new RuntimeException("Unexpected application error!");
+    }
+
+    log.info("Not specifically mapped error: {}", value);
+  }
+}