From 24ad34a80ad0ee25c7ed15969f6be219231820a2 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 23 Nov 2024 09:34:22 +0100 Subject: [PATCH] TEST --- .../de/juplo/kafka/ExampleConsumerTest.java | 230 ++++++++++++++++++ .../de/juplo/kafka/MockRecordHandler.java | 52 ++++ 2 files changed, 282 insertions(+) create mode 100644 src/test/java/de/juplo/kafka/ExampleConsumerTest.java create mode 100644 src/test/java/de/juplo/kafka/MockRecordHandler.java diff --git a/src/test/java/de/juplo/kafka/ExampleConsumerTest.java b/src/test/java/de/juplo/kafka/ExampleConsumerTest.java new file mode 100644 index 00000000..9bb9e3c2 --- /dev/null +++ b/src/test/java/de/juplo/kafka/ExampleConsumerTest.java @@ -0,0 +1,230 @@ +package de.juplo.kafka; + +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.RecordsToDelete; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.LongSerializer; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Import; +import org.springframework.context.annotation.Primary; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.test.context.EmbeddedKafka; + +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; + +import static de.juplo.kafka.ExampleConsumerTest.NUM_PARTITIONS; +import static de.juplo.kafka.ExampleConsumerTest.TOPIC; + + +@SpringBootTest( + classes = { + KafkaAutoConfiguration.class, + ExampleConsumerTest.ConsumerRunnableTestConfig.class, + }, + properties = { + "spring.main.allow-bean-definition-overriding=true", + "logging.level.de.juplo.kafka=TRACE", + "juplo.bootstrap-server=${spring.embedded.kafka.brokers}", + "juplo.consumer.topic=" + TOPIC, + "juplo.consumer.auto-offset-reset=earliest", + "spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer", + }) +@EmbeddedKafka(topics = TOPIC, partitions = NUM_PARTITIONS) +public class ExampleConsumerTest +{ + @DisplayName("All valid messages are consumed") + @Test + void testOnlyValidMessages() + { + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendValidMessage(3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendValidMessage(3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + + Awaitility + .await("All messages are received") + .atMost(Duration.ofSeconds(5)) + .until(() -> mockHandler.getNumRequestsHandled() == 20); + } + + @DisplayName("A deserialization exception is skipped") + @Test + void testDeserializationException() + { + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendNonDeserializableMessage(3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendValidMessage(3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + + Awaitility + .await("All valid messages are received") + .atMost(Duration.ofSeconds(5)) + .until(() -> mockHandler.getNumRequestsHandled() == 19); + } + + @DisplayName("A message, that triggers an unexpected exception in the domain-logic, exits the application") + @Test + void testDomainError() throws Exception + { + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendValidMessage(3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendMessageThatTriggersRuntimeExceptionInDomain(3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + + Awaitility + .await("The ConsumerRunnable was exited by an unexpected exception") + .atMost(Duration.ofSeconds(5)) + .pollInterval(Duration.ofMillis(250)) + .until(() -> isTerminatedExceptionally.get()); + } + + + static final String TOPIC = "ExampleConsumerTest_TEST"; + static final int NUM_PARTITIONS = 10; + + @Autowired + KafkaTemplate kafkaTemplate; + + LongSerializer serializer = new LongSerializer(); + long[] currentOffsets = new long[] { 0, 0 }; + + @Autowired + AdminClient adminClient; + @Autowired + MockRecordHandler mockHandler; + @Autowired + AtomicBoolean isTerminatedExceptionally; + + + @BeforeEach + void resetTopic() { + adminClient.deleteRecords(Map.of( + new TopicPartition(TOPIC, 0), deleteAllRecordsByPartition(0), + new TopicPartition(TOPIC, 1), deleteAllRecordsByPartition(1))); + mockHandler.clear(); + isTerminatedExceptionally.set(false); + } + + private RecordsToDelete deleteAllRecordsByPartition(int x) + { + return RecordsToDelete.beforeOffset(currentOffsets[x] + 1); + } + + private void sendValidMessage(int partition) + { + kafkaTemplate.send(TOPIC, partition, "EGAL", serializer.serialize(TOPIC, (long)partition)); + } + + private void sendNonDeserializableMessage(int partition) + { + kafkaTemplate.send(TOPIC, partition, "EGAL", "BOOM!".getBytes()); + } + + private void sendMessageThatTriggersRuntimeExceptionInDomain(int partition) + { + kafkaTemplate.send( + TOPIC, + partition, + "EGAL", + serializer.serialize(TOPIC, (long)VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION)); + } + + + public final static int VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION = -1; + + + @TestConfiguration + @Import(ApplicationConfiguration.class) + static class ConsumerRunnableTestConfig + { + @Bean + RecordHandler recordHandler() + { + return new MockRecordHandler(Duration.ofMillis(100)); + } + + @Primary + @Bean + Runnable errorCallback(AtomicBoolean isTerminatedExceptionally) + { + return () -> isTerminatedExceptionally.set(true); + } + + @Bean + AtomicBoolean isTerminatedExceptionally() + { + return new AtomicBoolean(); + } + + @Bean + AdminClient adminClient( + @Value("${spring.embedded.kafka.brokers}") + String kafkaBroker) + { + Map properties = new HashMap<>(); + properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBroker); + return AdminClient.create(properties); + } + } +} diff --git a/src/test/java/de/juplo/kafka/MockRecordHandler.java b/src/test/java/de/juplo/kafka/MockRecordHandler.java new file mode 100644 index 00000000..fe873617 --- /dev/null +++ b/src/test/java/de/juplo/kafka/MockRecordHandler.java @@ -0,0 +1,52 @@ +package de.juplo.kafka; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import java.time.Duration; + +import static de.juplo.kafka.ExampleConsumerTest.VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION; + + +@RequiredArgsConstructor +@Slf4j +public class MockRecordHandler implements RecordHandler { + + private final Duration exceptionDelay; + + @Getter + private int numRequestsHandled = 0; + + @Override + public void handleRecord( + String topic, + Integer partition, + Long offset, + String key, + Long value) { + + if (value < 0) + { + generateError(value); + } + + numRequestsHandled++; + log.trace("Handled request for topic {}, num requests handled so far: {}", topic, numRequestsHandled); + } + + private void generateError(long value) + { + if (value == VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION) + { + throw new RuntimeException("Unexpected application error!"); + } + + log.info("Not specifically mapped error: {}", value); + } + + public void clear() + { + numRequestsHandled = 0; + } +} -- 2.20.1