From 11a92b79b8cb212728efd6f4e8d756ed9aa66cda Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 21 Feb 2025 15:56:46 +0100 Subject: [PATCH] Handling der Nachricht in das Interface `RecordHandler` verlegt MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Tests für das Verhalten im Fehlerfall * Der Payload einer Nachricht wird über alle Nachrichten hochgezählt * Konfiguration über `ApplicationConiguration` - aber von Hand --- .../juplo/kafka/ApplicationConfiguration.java | 10 + .../java/de/juplo/kafka/ExampleConsumer.java | 4 + .../java/de/juplo/kafka/RecordHandler.java | 11 + .../de/juplo/kafka/ExampleConsumerTest.java | 253 ++++++++++++++++++ .../de/juplo/kafka/MockRecordHandler.java | 48 ++++ 5 files changed, 326 insertions(+) create mode 100644 src/main/java/de/juplo/kafka/RecordHandler.java create mode 100644 src/test/java/de/juplo/kafka/ExampleConsumerTest.java create mode 100644 src/test/java/de/juplo/kafka/MockRecordHandler.java diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 3c0d31d6..c4174842 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -1,5 +1,6 @@ package de.juplo.kafka; +import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.StickyAssignor; @@ -15,11 +16,13 @@ import java.util.Properties; @Configuration @EnableConfigurationProperties(ApplicationProperties.class) +@Slf4j public class ApplicationConfiguration { @Bean public ExampleConsumer exampleConsumer( Consumer kafkaConsumer, + RecordHandler recordHandler, ApplicationProperties properties, ConfigurableApplicationContext applicationContext) { @@ -28,9 +31,16 @@ public class ApplicationConfiguration properties.getClientId(), properties.getConsumerProperties().getTopic(), kafkaConsumer, + recordHandler, () -> applicationContext.close()); } + @Bean + public RecordHandler recordHandler() + { + return (topic, partition, offset, key, value) -> log.info("No-Ops Handler called for {}={}", key, value); + } + @Bean(destroyMethod = "") public KafkaConsumer kafkaConsumer(ApplicationProperties properties) { diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 6554da4b..b2c59f8a 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -17,6 +17,7 @@ public class ExampleConsumer implements Runnable private final String id; private final String topic; private final Consumer consumer; + private final RecordHandler recordHandler; private final Thread workerThread; private final Runnable closeCallback; @@ -28,11 +29,13 @@ public class ExampleConsumer implements Runnable String clientId, String topic, Consumer consumer, + RecordHandler recordHandler, Runnable closeCallback) { this.id = clientId; this.topic = topic; this.consumer = consumer; + this.recordHandler = recordHandler; workerThread = new Thread(this, "ExampleConsumer Worker-Thread"); workerThread.start(); @@ -107,6 +110,7 @@ public class ExampleConsumer implements Runnable { consumed++; log.info("{} - partition={}-{}, offset={}: {}={}", id, topic, partition, offset, key, value); + recordHandler.handleRecord(topic, partition, offset, key, value); } diff --git a/src/main/java/de/juplo/kafka/RecordHandler.java b/src/main/java/de/juplo/kafka/RecordHandler.java new file mode 100644 index 00000000..a7b65af2 --- /dev/null +++ b/src/main/java/de/juplo/kafka/RecordHandler.java @@ -0,0 +1,11 @@ +package de.juplo.kafka; + +public interface RecordHandler +{ + void handleRecord( + String topic, + Integer partition, + Long offset, + K key, + V value); +} diff --git a/src/test/java/de/juplo/kafka/ExampleConsumerTest.java b/src/test/java/de/juplo/kafka/ExampleConsumerTest.java new file mode 100644 index 00000000..590c9cdf --- /dev/null +++ b/src/test/java/de/juplo/kafka/ExampleConsumerTest.java @@ -0,0 +1,253 @@ +package de.juplo.kafka; + +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.RecordsToDelete; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.LongSerializer; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.test.context.EmbeddedKafka; + +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static de.juplo.kafka.ExampleConsumerTest.NUM_PARTITIONS; +import static de.juplo.kafka.ExampleConsumerTest.TOPIC; + + +@SpringBootTest( + classes = { + KafkaAutoConfiguration.class, + ApplicationProperties.class, + ExampleConsumerTest.ConsumerRunnableTestConfig.class, + }, + properties = { + "juplo.bootstrap-server=${spring.embedded.kafka.brokers}", + "spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer", + "logging.level.de.juplo.kafka=TRACE", + }) +@EmbeddedKafka(topics = TOPIC, partitions = NUM_PARTITIONS) +public class ExampleConsumerTest +{ + @DisplayName("All messages are consumed") + @Test + void testOnlyValidMessages() + { + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendValidMessage(3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendValidMessage(3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + + Awaitility + .await("All messages are consumed") + .atMost(Duration.ofSeconds(5)) + .until(() -> mockRecordHandler.getNumMessagesHandled() == 20); + } + + @DisplayName("A deserialization exception is skipped and all valid messages are consumed") + @Test + void testDeserializationException() + { + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendNonDeserializableMessage(3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendValidMessage(3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + + Awaitility + .await("All valid messages are consumed") + .atMost(Duration.ofSeconds(15)) + .until(() -> mockRecordHandler.getNumMessagesHandled() == 19); + } + + @DisplayName("A message, that triggers an unexpected exception in the domain-logic, exits the application") + @Test + void testUnexpectedDomainError() throws Exception + { + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendValidMessage(3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendMessageThatTriggersRuntimeExceptionInDomain(3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + + Awaitility + .await("The ConsumerRunnable is exited by an unexpected exception") + .atMost(Duration.ofSeconds(5)) + .pollInterval(Duration.ofMillis(250)) + .until(() -> isTerminatedExceptionally.get()); + } + + + static final String ID = "TEST"; + static final String TOPIC = "ExampleConsumerTest_TEST"; + static final int NUM_PARTITIONS = 10; + + @Autowired + KafkaTemplate kafkaTemplate; + + final LongSerializer serializer = new LongSerializer(); + final long[] currentOffsets = new long[NUM_PARTITIONS]; + + long nextMessage = 1; + + final MockRecordHandler mockRecordHandler = new MockRecordHandler(); + final AtomicBoolean isTerminatedExceptionally = new AtomicBoolean(); + + ExampleConsumer exampleConsumer; + + + @BeforeEach + void createExampleConsumer(@Autowired ApplicationProperties properties) + { + ApplicationConfiguration configuration = new ApplicationConfiguration(); + Consumer consumer = configuration.kafkaConsumer(properties); + + exampleConsumer = new ExampleConsumer( + ID, + TOPIC, + consumer, + mockRecordHandler, + () -> isTerminatedExceptionally.set(true)); + } + + @AfterEach + void resetSetup(@Autowired AdminClient adminClient) throws InterruptedException, ExecutionException + { + exampleConsumer.shutdown(); + adminClient + .deleteRecords(recordsToDelete()) + .all() + .get(); + mockRecordHandler.clear(); + nextMessage = 1; + isTerminatedExceptionally.set(false); + } + + private Map recordsToDelete() + { + return IntStream + .range(0, NUM_PARTITIONS) + .filter(i -> currentOffsets[i] > 0) + .mapToObj(i -> Integer.valueOf(i)) + .collect(Collectors.toMap( + i -> new TopicPartition(TOPIC, i), + i -> recordsToDelete(i))); + } + + private RecordsToDelete recordsToDelete(int partition) + { + return RecordsToDelete.beforeOffset(currentOffsets[partition] + 1); + } + + private void sendValidMessage(int partition) + { + send(partition, nextMessage); + } + + private void sendNonDeserializableMessage(int partition) + { + send(partition, "BOOM!".getBytes()); + } + + private void sendMessageThatTriggersRuntimeExceptionInDomain(int partition) + { + send(partition, VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION); + } + + private void send(int partition, long message) + { + send(partition, serializer.serialize(TOPIC, message)); + } + + private void send(int partition, byte[] bytes) + { + nextMessage++; + kafkaTemplate + .send(TOPIC, partition, "EGAL", bytes) + .thenAccept(result -> + { + RecordMetadata metadata = result.getRecordMetadata(); + currentOffsets[metadata.partition()] = metadata.offset(); + }); + } + + + public final static int VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION = -1; + + + @TestConfiguration + static class ConsumerRunnableTestConfig + { + @Bean + AdminClient adminClient(@Value("${spring.embedded.kafka.brokers}") String kafkaBroker) + { + Map properties = new HashMap<>(); + properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBroker); + return AdminClient.create(properties); + } + } +} diff --git a/src/test/java/de/juplo/kafka/MockRecordHandler.java b/src/test/java/de/juplo/kafka/MockRecordHandler.java new file mode 100644 index 00000000..9f540f0e --- /dev/null +++ b/src/test/java/de/juplo/kafka/MockRecordHandler.java @@ -0,0 +1,48 @@ +package de.juplo.kafka; + +import lombok.Getter; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import static de.juplo.kafka.ExampleConsumerTest.VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION; + + +@RequiredArgsConstructor +@Slf4j +public class MockRecordHandler implements RecordHandler +{ + @Getter + private int numMessagesHandled = 0; + + @Override + public void handleRecord( + String topic, + Integer partition, + Long offset, + String key, + Long value) + { + if (value != null && value < 0) + { + generateError(value); + } + + numMessagesHandled++; + log.trace("Handled {} messages so far", numMessagesHandled); + } + + private void generateError(long value) + { + if (value == VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION) + { + throw new RuntimeException("Unexpected application error!"); + } + + log.info("Not specifically mapped error: {}", value); + } + + public void clear() + { + numMessagesHandled = 0; + } +} -- 2.20.1