From 3f1f93b04eb32a5bc319c318c2a8f6384c06d1ca Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 23 Nov 2024 09:34:22 +0100 Subject: [PATCH] =?utf8?q?Tests=20f=C3=BCr=20das=20Verhalten=20im=20Fehler?= =?utf8?q?fall?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- .../de/juplo/kafka/ExampleConsumerTest.java | 259 ++++++++++++++++++ .../de/juplo/kafka/MockRecordHandler.java | 48 ++++ 2 files changed, 307 insertions(+) create mode 100644 src/test/java/de/juplo/kafka/ExampleConsumerTest.java create mode 100644 src/test/java/de/juplo/kafka/MockRecordHandler.java diff --git a/src/test/java/de/juplo/kafka/ExampleConsumerTest.java b/src/test/java/de/juplo/kafka/ExampleConsumerTest.java new file mode 100644 index 00000000..b4322010 --- /dev/null +++ b/src/test/java/de/juplo/kafka/ExampleConsumerTest.java @@ -0,0 +1,259 @@ +package de.juplo.kafka; + +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.RecordsToDelete; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.test.context.EmbeddedKafka; + +import java.time.Duration; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static de.juplo.kafka.ExampleConsumerTest.NUM_PARTITIONS; +import static de.juplo.kafka.ExampleConsumerTest.TOPIC; + + +@SpringBootTest( + classes = { + KafkaAutoConfiguration.class, + ExampleConsumerTest.ConsumerRunnableTestConfig.class, + }, + properties = { + "spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer", + "logging.level.de.juplo.kafka=TRACE", + }) +@EmbeddedKafka(topics = TOPIC, partitions = NUM_PARTITIONS) +public class ExampleConsumerTest +{ + @DisplayName("All messages are consumed") + @Test + void testOnlyValidMessages() + { + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendValidMessage(3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendValidMessage(3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + + Awaitility + .await("All messages are consumed") + .atMost(Duration.ofSeconds(5)) + .until(() -> mockRecordHandler.getNumMessagesHandled() == 20); + } + + @DisplayName("A deserialization exception is skipped and all valid messages are consumed") + @Test + void testDeserializationException() + { + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendNonDeserializableMessage(3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendValidMessage(3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + + Awaitility + .await("All valid messages are consumed") + .atMost(Duration.ofSeconds(15)) + .until(() -> mockRecordHandler.getNumMessagesHandled() == 19); + } + + @DisplayName("A message, that triggers an unexpected exception in the domain-logic, exits the application") + @Test + void testUnexpectedDomainError() throws Exception + { + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendValidMessage(3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendMessageThatTriggersRuntimeExceptionInDomain(3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + + Awaitility + .await("The ConsumerRunnable is exited by an unexpected exception") + .atMost(Duration.ofSeconds(5)) + .pollInterval(Duration.ofMillis(250)) + .until(() -> isTerminatedExceptionally.get()); + } + + + static final String ID = "TEST"; + static final String TOPIC = "ExampleConsumerTest_TEST"; + static final int NUM_PARTITIONS = 10; + + @Autowired + KafkaTemplate kafkaTemplate; + + final LongSerializer serializer = new LongSerializer(); + final long[] currentOffsets = new long[NUM_PARTITIONS]; + + final MockRecordHandler mockRecordHandler = new MockRecordHandler(); + final AtomicBoolean isTerminatedExceptionally = new AtomicBoolean(); + + ExampleConsumer exampleConsumer; + + + @BeforeEach + void createExampleConsumer(@Value("${spring.embedded.kafka.brokers}") String kafkaBroker) + { + Properties props = new Properties(); + props.put("bootstrap.servers", kafkaBroker); + props.put("client.id", ID); + props.put("group.id", ID); + props.put("auto.offset.reset", "earliest"); + props.put("key.deserializer", StringDeserializer.class.getName()); + props.put("value.deserializer", LongDeserializer.class.getName()); + + Consumer consumer = new KafkaConsumer<>(props); + + exampleConsumer = new ExampleConsumer( + ID, + TOPIC, + consumer, + mockRecordHandler, + () -> isTerminatedExceptionally.set(true)); + } + + @AfterEach + void resetSetup(@Autowired AdminClient adminClient) throws InterruptedException, ExecutionException + { + exampleConsumer.shutdown(); + adminClient + .deleteRecords(recordsToDelete()) + .all() + .get(); + mockRecordHandler.clear(); + isTerminatedExceptionally.set(false); + } + + private Map recordsToDelete() + { + return IntStream + .range(0, NUM_PARTITIONS) + .filter(i -> currentOffsets[i] > 0) + .mapToObj(i -> Integer.valueOf(i)) + .collect(Collectors.toMap( + i -> new TopicPartition(TOPIC, i), + i -> recordsToDelete(i))); + } + + private RecordsToDelete recordsToDelete(int partition) + { + return RecordsToDelete.beforeOffset(currentOffsets[partition] + 1); + } + + private void sendValidMessage(int partition) + { + send(partition, partition); + } + + private void sendNonDeserializableMessage(int partition) + { + send(partition, "BOOM!".getBytes()); + } + + private void sendMessageThatTriggersRuntimeExceptionInDomain(int partition) + { + send(partition, VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION); + } + + private void send(int partition, long message) + { + send(partition, serializer.serialize(TOPIC, message)); + } + + private void send(int partition, byte[] bytes) + { + kafkaTemplate + .send(TOPIC, partition, "EGAL", bytes) + .thenAccept(result -> + { + RecordMetadata metadata = result.getRecordMetadata(); + currentOffsets[metadata.partition()] = metadata.offset(); + }); + } + + + public final static int VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION = -1; + + + @TestConfiguration + static class ConsumerRunnableTestConfig + { + @Bean + AdminClient adminClient(@Value("${spring.embedded.kafka.brokers}") String kafkaBroker) + { + Map properties = new HashMap<>(); + properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBroker); + return AdminClient.create(properties); + } + } +} diff --git a/src/test/java/de/juplo/kafka/MockRecordHandler.java b/src/test/java/de/juplo/kafka/MockRecordHandler.java new file mode 100644 index 00000000..9f540f0e --- /dev/null +++ b/src/test/java/de/juplo/kafka/MockRecordHandler.java @@ -0,0 +1,48 @@ +package de.juplo.kafka; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import static de.juplo.kafka.ExampleConsumerTest.VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION; + + +@RequiredArgsConstructor +@Slf4j +public class MockRecordHandler implements RecordHandler +{ + @Getter + private int numMessagesHandled = 0; + + @Override + public void handleRecord( + String topic, + Integer partition, + Long offset, + String key, + Long value) + { + if (value != null && value < 0) + { + generateError(value); + } + + numMessagesHandled++; + log.trace("Handled {} messages so far", numMessagesHandled); + } + + private void generateError(long value) + { + if (value == VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION) + { + throw new RuntimeException("Unexpected application error!"); + } + + log.info("Not specifically mapped error: {}", value); + } + + public void clear() + { + numMessagesHandled = 0; + } +} -- 2.20.1