Handling der Nachricht in das Interface `RecordHandler` verlegt consumer/spring-consumer--record-handler--generics4all consumer/spring-consumer--record-handler--generics4some
authorKai Moritz <kai@juplo.de>
Fri, 21 Feb 2025 14:56:46 +0000 (15:56 +0100)
committerKai Moritz <kai@juplo.de>
Fri, 21 Feb 2025 14:56:46 +0000 (15:56 +0100)
* Tests für das Verhalten im Fehlerfall
* Der Payload einer Nachricht wird über alle Nachrichten hochgezählt
* Konfiguration über `ApplicationConiguration` - aber von Hand

src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ExampleConsumer.java
src/main/java/de/juplo/kafka/RecordHandler.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/ExampleConsumerTest.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/MockRecordHandler.java [new file with mode: 0644]

index 3c0d31d..c417484 100644 (file)
@@ -1,5 +1,6 @@
 package de.juplo.kafka;
 
+import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.StickyAssignor;
@@ -15,11 +16,13 @@ import java.util.Properties;
 
 @Configuration
 @EnableConfigurationProperties(ApplicationProperties.class)
+@Slf4j
 public class ApplicationConfiguration
 {
   @Bean
   public ExampleConsumer<String, Long> exampleConsumer(
     Consumer<String, Long> kafkaConsumer,
+    RecordHandler<String, Long> recordHandler,
     ApplicationProperties properties,
     ConfigurableApplicationContext applicationContext)
   {
@@ -28,9 +31,16 @@ public class ApplicationConfiguration
         properties.getClientId(),
         properties.getConsumerProperties().getTopic(),
         kafkaConsumer,
+        recordHandler,
         () -> applicationContext.close());
   }
 
+  @Bean
+  public RecordHandler<String, Long> recordHandler()
+  {
+    return (topic, partition, offset, key, value) -> log.info("No-Ops Handler called for {}={}", key, value);
+  }
+
   @Bean(destroyMethod = "")
   public KafkaConsumer<String, Long> kafkaConsumer(ApplicationProperties properties)
   {
index 6554da4..b2c59f8 100644 (file)
@@ -17,6 +17,7 @@ public class ExampleConsumer<K, V> implements Runnable
   private final String id;
   private final String topic;
   private final Consumer<K, V> consumer;
+  private final RecordHandler<K, V> recordHandler;
   private final Thread workerThread;
   private final Runnable closeCallback;
 
@@ -28,11 +29,13 @@ public class ExampleConsumer<K, V> implements Runnable
     String clientId,
     String topic,
     Consumer<K, V> consumer,
+    RecordHandler<K, V> recordHandler,
     Runnable closeCallback)
   {
     this.id = clientId;
     this.topic = topic;
     this.consumer = consumer;
+    this.recordHandler = recordHandler;
 
     workerThread = new Thread(this, "ExampleConsumer Worker-Thread");
     workerThread.start();
@@ -107,6 +110,7 @@ public class ExampleConsumer<K, V> implements Runnable
   {
     consumed++;
     log.info("{} - partition={}-{}, offset={}: {}={}", id, topic, partition, offset, key, value);
+    recordHandler.handleRecord(topic, partition, offset, key, value);
   }
 
 
diff --git a/src/main/java/de/juplo/kafka/RecordHandler.java b/src/main/java/de/juplo/kafka/RecordHandler.java
new file mode 100644 (file)
index 0000000..a7b65af
--- /dev/null
@@ -0,0 +1,11 @@
+package de.juplo.kafka;
+
+public interface RecordHandler<K, V>
+{
+  void handleRecord(
+    String topic,
+    Integer partition,
+    Long offset,
+    K key,
+    V value);
+}
diff --git a/src/test/java/de/juplo/kafka/ExampleConsumerTest.java b/src/test/java/de/juplo/kafka/ExampleConsumerTest.java
new file mode 100644 (file)
index 0000000..590c9cd
--- /dev/null
@@ -0,0 +1,253 @@
+package de.juplo.kafka;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.RecordsToDelete;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.test.context.EmbeddedKafka;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static de.juplo.kafka.ExampleConsumerTest.NUM_PARTITIONS;
+import static de.juplo.kafka.ExampleConsumerTest.TOPIC;
+
+
+@SpringBootTest(
+  classes = {
+    KafkaAutoConfiguration.class,
+    ApplicationProperties.class,
+    ExampleConsumerTest.ConsumerRunnableTestConfig.class,
+  },
+  properties = {
+    "juplo.bootstrap-server=${spring.embedded.kafka.brokers}",
+    "spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer",
+    "logging.level.de.juplo.kafka=TRACE",
+  })
+@EmbeddedKafka(topics = TOPIC, partitions = NUM_PARTITIONS)
+public class ExampleConsumerTest
+{
+  @DisplayName("All messages are consumed")
+  @Test
+  void testOnlyValidMessages()
+  {
+    sendValidMessage(0);
+    sendValidMessage(1);
+    sendValidMessage(2);
+    sendValidMessage(3);
+    sendValidMessage(4);
+    sendValidMessage(5);
+    sendValidMessage(6);
+    sendValidMessage(7);
+    sendValidMessage(8);
+    sendValidMessage(9);
+    sendValidMessage(0);
+    sendValidMessage(1);
+    sendValidMessage(2);
+    sendValidMessage(3);
+    sendValidMessage(4);
+    sendValidMessage(5);
+    sendValidMessage(6);
+    sendValidMessage(7);
+    sendValidMessage(8);
+    sendValidMessage(9);
+
+    Awaitility
+      .await("All messages are consumed")
+      .atMost(Duration.ofSeconds(5))
+      .until(() -> mockRecordHandler.getNumMessagesHandled() == 20);
+  }
+
+  @DisplayName("A deserialization exception is skipped and all valid messages are consumed")
+  @Test
+  void testDeserializationException()
+  {
+    sendValidMessage(0);
+    sendValidMessage(1);
+    sendValidMessage(2);
+    sendNonDeserializableMessage(3);
+    sendValidMessage(4);
+    sendValidMessage(5);
+    sendValidMessage(6);
+    sendValidMessage(7);
+    sendValidMessage(8);
+    sendValidMessage(9);
+    sendValidMessage(0);
+    sendValidMessage(1);
+    sendValidMessage(2);
+    sendValidMessage(3);
+    sendValidMessage(4);
+    sendValidMessage(5);
+    sendValidMessage(6);
+    sendValidMessage(7);
+    sendValidMessage(8);
+    sendValidMessage(9);
+
+    Awaitility
+      .await("All valid messages are consumed")
+      .atMost(Duration.ofSeconds(15))
+      .until(() -> mockRecordHandler.getNumMessagesHandled() == 19);
+  }
+
+  @DisplayName("A message, that triggers an unexpected exception in the domain-logic, exits the application")
+  @Test
+  void testUnexpectedDomainError() throws Exception
+  {
+    sendValidMessage(0);
+    sendValidMessage(1);
+    sendValidMessage(2);
+    sendValidMessage(3);
+    sendValidMessage(4);
+    sendValidMessage(5);
+    sendValidMessage(6);
+    sendValidMessage(7);
+    sendValidMessage(8);
+    sendValidMessage(9);
+    sendValidMessage(0);
+    sendValidMessage(1);
+    sendValidMessage(2);
+    sendMessageThatTriggersRuntimeExceptionInDomain(3);
+    sendValidMessage(4);
+    sendValidMessage(5);
+    sendValidMessage(6);
+    sendValidMessage(7);
+    sendValidMessage(8);
+    sendValidMessage(9);
+
+    Awaitility
+      .await("The ConsumerRunnable is exited by an unexpected exception")
+      .atMost(Duration.ofSeconds(5))
+      .pollInterval(Duration.ofMillis(250))
+      .until(() -> isTerminatedExceptionally.get());
+  }
+
+
+  static final String ID = "TEST";
+  static final String TOPIC = "ExampleConsumerTest_TEST";
+  static final int NUM_PARTITIONS = 10;
+
+  @Autowired
+  KafkaTemplate<String, byte[]> kafkaTemplate;
+
+  final LongSerializer serializer = new LongSerializer();
+  final long[] currentOffsets = new long[NUM_PARTITIONS];
+
+  long nextMessage = 1;
+
+  final MockRecordHandler mockRecordHandler = new MockRecordHandler();
+  final AtomicBoolean isTerminatedExceptionally = new AtomicBoolean();
+
+  ExampleConsumer exampleConsumer;
+
+
+  @BeforeEach
+  void createExampleConsumer(@Autowired ApplicationProperties properties)
+  {
+    ApplicationConfiguration configuration = new ApplicationConfiguration();
+    Consumer<String, Long> consumer = configuration.kafkaConsumer(properties);
+
+    exampleConsumer = new ExampleConsumer(
+      ID,
+      TOPIC,
+      consumer,
+      mockRecordHandler,
+      () -> isTerminatedExceptionally.set(true));
+  }
+
+  @AfterEach
+  void resetSetup(@Autowired AdminClient adminClient) throws InterruptedException, ExecutionException
+  {
+    exampleConsumer.shutdown();
+    adminClient
+      .deleteRecords(recordsToDelete())
+      .all()
+      .get();
+    mockRecordHandler.clear();
+    nextMessage = 1;
+    isTerminatedExceptionally.set(false);
+  }
+
+  private Map<TopicPartition, RecordsToDelete> recordsToDelete()
+  {
+    return IntStream
+      .range(0, NUM_PARTITIONS)
+      .filter(i -> currentOffsets[i] > 0)
+      .mapToObj(i -> Integer.valueOf(i))
+      .collect(Collectors.toMap(
+        i -> new TopicPartition(TOPIC, i),
+        i -> recordsToDelete(i)));
+  }
+
+  private RecordsToDelete recordsToDelete(int partition)
+  {
+    return RecordsToDelete.beforeOffset(currentOffsets[partition] + 1);
+  }
+
+  private void sendValidMessage(int partition)
+  {
+    send(partition, nextMessage);
+  }
+
+  private void sendNonDeserializableMessage(int partition)
+  {
+    send(partition, "BOOM!".getBytes());
+  }
+
+  private void sendMessageThatTriggersRuntimeExceptionInDomain(int partition)
+  {
+    send(partition, VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION);
+  }
+
+  private void send(int partition, long message)
+  {
+    send(partition, serializer.serialize(TOPIC, message));
+  }
+
+  private void send(int partition, byte[] bytes)
+  {
+    nextMessage++;
+    kafkaTemplate
+      .send(TOPIC, partition, "EGAL", bytes)
+      .thenAccept(result ->
+      {
+        RecordMetadata metadata = result.getRecordMetadata();
+        currentOffsets[metadata.partition()] = metadata.offset();
+      });
+  }
+
+
+  public final static int VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION = -1;
+
+
+  @TestConfiguration
+  static class ConsumerRunnableTestConfig
+  {
+    @Bean
+    AdminClient adminClient(@Value("${spring.embedded.kafka.brokers}") String kafkaBroker)
+    {
+      Map<String, Object> properties = new HashMap<>();
+      properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBroker);
+      return AdminClient.create(properties);
+    }
+  }
+}
diff --git a/src/test/java/de/juplo/kafka/MockRecordHandler.java b/src/test/java/de/juplo/kafka/MockRecordHandler.java
new file mode 100644 (file)
index 0000000..9f540f0
--- /dev/null
@@ -0,0 +1,48 @@
+package de.juplo.kafka;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import static de.juplo.kafka.ExampleConsumerTest.VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class MockRecordHandler implements RecordHandler<String, Long>
+{
+  @Getter
+  private int numMessagesHandled = 0;
+
+  @Override
+  public void handleRecord(
+    String topic,
+    Integer partition,
+    Long offset,
+    String key,
+    Long value)
+  {
+    if (value != null && value < 0)
+    {
+      generateError(value);
+    }
+
+    numMessagesHandled++;
+    log.trace("Handled {} messages so far", numMessagesHandled);
+  }
+
+  private void generateError(long value)
+  {
+    if (value == VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION)
+    {
+      throw new RuntimeException("Unexpected application error!");
+    }
+
+    log.info("Not specifically mapped error: {}", value);
+  }
+
+  public void clear()
+  {
+    numMessagesHandled = 0;
+  }
+}