TEST
authorKai Moritz <kai@juplo.de>
Sat, 23 Nov 2024 08:34:22 +0000 (09:34 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 15 Dec 2024 11:17:58 +0000 (12:17 +0100)
src/test/java/de/juplo/kafka/ExampleConsumerTest.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/MockRecordHandler.java [new file with mode: 0644]

diff --git a/src/test/java/de/juplo/kafka/ExampleConsumerTest.java b/src/test/java/de/juplo/kafka/ExampleConsumerTest.java
new file mode 100644 (file)
index 0000000..9bb9e3c
--- /dev/null
@@ -0,0 +1,230 @@
+package de.juplo.kafka;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.RecordsToDelete;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Import;
+import org.springframework.context.annotation.Primary;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.test.context.EmbeddedKafka;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static de.juplo.kafka.ExampleConsumerTest.NUM_PARTITIONS;
+import static de.juplo.kafka.ExampleConsumerTest.TOPIC;
+
+
+@SpringBootTest(
+  classes = {
+    KafkaAutoConfiguration.class,
+    ExampleConsumerTest.ConsumerRunnableTestConfig.class,
+  },
+  properties = {
+    "spring.main.allow-bean-definition-overriding=true",
+    "logging.level.de.juplo.kafka=TRACE",
+    "juplo.bootstrap-server=${spring.embedded.kafka.brokers}",
+    "juplo.consumer.topic=" + TOPIC,
+    "juplo.consumer.auto-offset-reset=earliest",
+    "spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer",
+  })
+@EmbeddedKafka(topics = TOPIC, partitions = NUM_PARTITIONS)
+public class ExampleConsumerTest
+{
+  @DisplayName("All valid messages are consumed")
+  @Test
+  void testOnlyValidMessages()
+  {
+    sendValidMessage(0);
+    sendValidMessage(1);
+    sendValidMessage(2);
+    sendValidMessage(3);
+    sendValidMessage(4);
+    sendValidMessage(5);
+    sendValidMessage(6);
+    sendValidMessage(7);
+    sendValidMessage(8);
+    sendValidMessage(9);
+    sendValidMessage(0);
+    sendValidMessage(1);
+    sendValidMessage(2);
+    sendValidMessage(3);
+    sendValidMessage(4);
+    sendValidMessage(5);
+    sendValidMessage(6);
+    sendValidMessage(7);
+    sendValidMessage(8);
+    sendValidMessage(9);
+
+    Awaitility
+      .await("All messages are received")
+      .atMost(Duration.ofSeconds(5))
+      .until(() -> mockHandler.getNumRequestsHandled() == 20);
+  }
+
+  @DisplayName("A deserialization exception is skipped")
+  @Test
+  void testDeserializationException()
+  {
+    sendValidMessage(0);
+    sendValidMessage(1);
+    sendValidMessage(2);
+    sendNonDeserializableMessage(3);
+    sendValidMessage(4);
+    sendValidMessage(5);
+    sendValidMessage(6);
+    sendValidMessage(7);
+    sendValidMessage(8);
+    sendValidMessage(9);
+    sendValidMessage(0);
+    sendValidMessage(1);
+    sendValidMessage(2);
+    sendValidMessage(3);
+    sendValidMessage(4);
+    sendValidMessage(5);
+    sendValidMessage(6);
+    sendValidMessage(7);
+    sendValidMessage(8);
+    sendValidMessage(9);
+
+    Awaitility
+      .await("All valid messages are received")
+      .atMost(Duration.ofSeconds(5))
+      .until(() -> mockHandler.getNumRequestsHandled() == 19);
+  }
+
+  @DisplayName("A message, that triggers an unexpected exception in the domain-logic, exits the application")
+  @Test
+  void testDomainError() throws Exception
+  {
+    sendValidMessage(0);
+    sendValidMessage(1);
+    sendValidMessage(2);
+    sendValidMessage(3);
+    sendValidMessage(4);
+    sendValidMessage(5);
+    sendValidMessage(6);
+    sendValidMessage(7);
+    sendValidMessage(8);
+    sendValidMessage(9);
+    sendValidMessage(0);
+    sendValidMessage(1);
+    sendValidMessage(2);
+    sendMessageThatTriggersRuntimeExceptionInDomain(3);
+    sendValidMessage(4);
+    sendValidMessage(5);
+    sendValidMessage(6);
+    sendValidMessage(7);
+    sendValidMessage(8);
+    sendValidMessage(9);
+
+    Awaitility
+      .await("The ConsumerRunnable was exited by an unexpected exception")
+      .atMost(Duration.ofSeconds(5))
+      .pollInterval(Duration.ofMillis(250))
+      .until(() -> isTerminatedExceptionally.get());
+  }
+
+
+  static final String TOPIC = "ExampleConsumerTest_TEST";
+  static final int NUM_PARTITIONS = 10;
+
+  @Autowired
+  KafkaTemplate<String, byte[]> kafkaTemplate;
+
+  LongSerializer serializer = new LongSerializer();
+  long[] currentOffsets = new long[] { 0, 0 };
+
+  @Autowired
+  AdminClient adminClient;
+  @Autowired
+  MockRecordHandler mockHandler;
+  @Autowired
+  AtomicBoolean isTerminatedExceptionally;
+
+
+  @BeforeEach
+  void resetTopic() {
+    adminClient.deleteRecords(Map.of(
+      new TopicPartition(TOPIC, 0), deleteAllRecordsByPartition(0),
+      new TopicPartition(TOPIC, 1), deleteAllRecordsByPartition(1)));
+    mockHandler.clear();
+    isTerminatedExceptionally.set(false);
+  }
+
+  private RecordsToDelete deleteAllRecordsByPartition(int x)
+  {
+    return RecordsToDelete.beforeOffset(currentOffsets[x] + 1);
+  }
+
+  private void sendValidMessage(int partition)
+  {
+    kafkaTemplate.send(TOPIC, partition, "EGAL", serializer.serialize(TOPIC, (long)partition));
+  }
+
+  private void sendNonDeserializableMessage(int partition)
+  {
+    kafkaTemplate.send(TOPIC, partition, "EGAL", "BOOM!".getBytes());
+  }
+
+  private void sendMessageThatTriggersRuntimeExceptionInDomain(int partition)
+  {
+    kafkaTemplate.send(
+      TOPIC,
+      partition,
+      "EGAL",
+      serializer.serialize(TOPIC, (long)VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION));
+  }
+
+
+  public final static int VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION = -1;
+
+
+  @TestConfiguration
+  @Import(ApplicationConfiguration.class)
+  static class ConsumerRunnableTestConfig
+  {
+    @Bean
+    RecordHandler<String, Long> recordHandler()
+    {
+      return new MockRecordHandler(Duration.ofMillis(100));
+    }
+
+    @Primary
+    @Bean
+    Runnable errorCallback(AtomicBoolean isTerminatedExceptionally)
+    {
+      return () -> isTerminatedExceptionally.set(true);
+    }
+
+    @Bean
+    AtomicBoolean isTerminatedExceptionally()
+    {
+      return new AtomicBoolean();
+    }
+
+    @Bean
+    AdminClient adminClient(
+      @Value("${spring.embedded.kafka.brokers}")
+      String kafkaBroker)
+    {
+      Map<String, Object> properties = new HashMap<>();
+      properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBroker);
+      return AdminClient.create(properties);
+    }
+  }
+}
diff --git a/src/test/java/de/juplo/kafka/MockRecordHandler.java b/src/test/java/de/juplo/kafka/MockRecordHandler.java
new file mode 100644 (file)
index 0000000..fe87361
--- /dev/null
@@ -0,0 +1,52 @@
+package de.juplo.kafka;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+
+import java.time.Duration;
+
+import static de.juplo.kafka.ExampleConsumerTest.VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION;
+
+
+@RequiredArgsConstructor
+@Slf4j
+public class MockRecordHandler implements RecordHandler<String, Long> {
+
+    private final Duration exceptionDelay;
+
+    @Getter
+    private int numRequestsHandled = 0;
+
+    @Override
+    public void handleRecord(
+      String topic,
+      Integer partition,
+      Long offset,
+      String key,
+      Long value) {
+
+      if (value < 0)
+      {
+        generateError(value);
+      }
+
+      numRequestsHandled++;
+      log.trace("Handled request for topic {}, num requests handled so far: {}", topic, numRequestsHandled);
+    }
+
+    private void generateError(long value)
+    {
+      if (value == VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION)
+      {
+        throw new RuntimeException("Unexpected application error!");
+      }
+
+      log.info("Not specifically mapped error: {}", value);
+    }
+
+    public void clear()
+    {
+        numRequestsHandled = 0;
+    }
+}