From: Kai Moritz Date: Sun, 13 Apr 2025 09:23:19 +0000 (+0200) Subject: Error-Handling für Deserialisierungs-Fehler implementiert und getestet X-Git-Tag: consumer/spring-consumer--error-handling--2025-04-signal-spickzettel~2 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=6055d5676f85b717781a591cd5ffbc255c1ad7b4;p=demos%2Fkafka%2Ftraining Error-Handling für Deserialisierungs-Fehler implementiert und getestet * Anwendung bereits entsprechend des Ziels umbenannt. * Hier erst mal nur die Fehlerbehandlung für Deserialisierungs-Fehler, so wie in den Übungen dazu. * Der Testfall ist schon auf die deutlich komplexeren Tests für die vollständige Fehlerbehandlung vorbereitet. --- diff --git a/README.sh b/README.sh index 203c22b..5ecc994 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/spring-consumer:1.1-record-handler-SNAPSHOT +IMAGE=juplo/spring-consumer:1.1-error-handling-SNAPSHOT if [ "$1" = "cleanup" ] then diff --git a/build.gradle b/build.gradle index f700918..3df5a51 100644 --- a/build.gradle +++ b/build.gradle @@ -8,7 +8,7 @@ plugins { } group = 'de.juplo.kafka' -version = '1.1-record-handler-SNAPSHOT' +version = '1.1-error-handling-SNAPSHOT' java { toolchain { diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index bf306f6..e4727a3 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -140,7 +140,7 @@ services: command: kafka:9092 test producer consumer: - image: juplo/spring-consumer:1.1-record-handler-SNAPSHOT + image: juplo/spring-consumer:1.1-error-handling-SNAPSHOT environment: spring.kafka.bootstrap-servers: kafka:9092 spring.kafka.client-id: consumer @@ -149,7 +149,7 @@ services: juplo.consumer.topic: test peter: - image: juplo/spring-consumer:1.1-record-handler-SNAPSHOT + image: juplo/spring-consumer:1.1-error-handling-SNAPSHOT environment: spring.kafka.bootstrap-servers: kafka:9092 spring.kafka.client-id: consumer @@ -158,7 +158,7 @@ services: juplo.consumer.topic: test ute: - image: juplo/spring-consumer:1.1-record-handler-SNAPSHOT + image: juplo/spring-consumer:1.1-error-handling-SNAPSHOT environment: spring.kafka.bootstrap-servers: kafka:9092 spring.kafka.client-id: consumer diff --git a/pom.xml b/pom.xml index 7590d25..7cd3cb0 100644 --- a/pom.xml +++ b/pom.xml @@ -15,7 +15,7 @@ spring-consumer Spring Consumer Super Simple Consumer-Group, that is implemented as Spring-Boot application and configured by Spring Kafka - 1.1-record-handler-SNAPSHOT + 1.1-error-handling-SNAPSHOT 21 diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index e5a8b3d..7e820ea 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -4,6 +4,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.errors.RecordDeserializationException; import org.apache.kafka.common.errors.WakeupException; import java.time.Duration; @@ -52,17 +53,30 @@ public class ExampleConsumer implements Runnable while (true) { - ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); - - log.info("{} - Received {} messages", id, records.count()); - for (ConsumerRecord record : records) + try + { + ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); + + log.info("{} - Received {} messages", id, records.count()); + for (ConsumerRecord record : records) + { + handleRecord( + record.topic(), + record.partition(), + record.offset(), + record.key(), + record.value()); + } + } + catch (RecordDeserializationException e) { - handleRecord( - record.topic(), - record.partition(), - record.offset(), - record.key(), - record.value()); + log.error( + "{} - Ignoring invalid record for offset {} on partition {}: {}", + id, + e.offset(), + e.topicPartition(), + e.getMessage()); + consumer.seek(e.topicPartition(), e.offset() + 1); } } } diff --git a/src/test/java/de/juplo/kafka/AbstractExampleConsumerTest.java b/src/test/java/de/juplo/kafka/AbstractExampleConsumerTest.java new file mode 100644 index 0000000..d87b175 --- /dev/null +++ b/src/test/java/de/juplo/kafka/AbstractExampleConsumerTest.java @@ -0,0 +1,257 @@ +package de.juplo.kafka; + +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.admin.RecordsToDelete; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.serialization.Serializer; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.kafka.core.KafkaTemplate; +import org.springframework.kafka.test.context.EmbeddedKafka; + +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static de.juplo.kafka.AbstractExampleConsumerTest.NUM_PARTITIONS; +import static de.juplo.kafka.AbstractExampleConsumerTest.TOPIC; +import static org.assertj.core.api.Assertions.assertThat; + + +@SpringBootTest( + classes = { + KafkaAutoConfiguration.class, + ApplicationProperties.class, + AbstractExampleConsumerTest.ConsumerRunnableTestConfig.class, + }, + properties = { + "spring.kafka.consumer.auto-offset-reset=earliest", + "spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer", + "logging.level.de.juplo.kafka=TRACE", + }) +@EmbeddedKafka(topics = TOPIC, partitions = NUM_PARTITIONS) +public abstract class AbstractExampleConsumerTest +{ + @DisplayName("All messages are consumed as expected") + @Test + void testOnlyValidMessages() + { + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendValidMessage(3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendValidMessage(3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + + Awaitility + .await("All messages are consumed") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(20)); + } + + @DisplayName("A deserialization exception is skipped and all valid messages are consumed") + @Test + void testDeserializationException() + { + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendNonDeserializableMessage(3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendValidMessage(3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + + Awaitility + .await("All valid messages are consumed") + .atMost(Duration.ofSeconds(15)) + .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(19)); + } + + @DisplayName("A message, that triggers an unexpected exception in the domain-logic, exits the application") + @Test + void testUnexpectedDomainError() throws Exception + { + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendValidMessage(3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + sendValidMessage(0); + sendValidMessage(1); + sendValidMessage(2); + sendMessageThatTriggersRuntimeExceptionInDomain(3); + sendValidMessage(4); + sendValidMessage(5); + sendValidMessage(6); + sendValidMessage(7); + sendValidMessage(8); + sendValidMessage(9); + + Awaitility + .await("The ConsumerRunnable is exited by an unexpected exception") + .atMost(Duration.ofSeconds(5)) + .pollInterval(Duration.ofMillis(250)) + .untilAsserted(() -> assertThat(isTerminatedExceptionally.get()).isTrue()); + } + + + static final String ID = "TEST"; + static final String TOPIC = "ExampleConsumerTest_TEST"; + static final int NUM_PARTITIONS = 10; + + @Autowired + KafkaTemplate kafkaTemplate; + + final Serializer serializer = createSerializer(); + final long[] currentOffsets = new long[NUM_PARTITIONS]; + + long nextMessage = 1; + + final AbstractMockRecordHandler mockRecordHandler = createMockRecordHandler(); + final AtomicBoolean isTerminatedExceptionally = new AtomicBoolean(); + + ExampleConsumer exampleConsumer; + + + abstract Serializer createSerializer(); + abstract AbstractMockRecordHandler createMockRecordHandler(); + abstract Consumer createConsumer(KafkaProperties properties); + abstract V createValidMessage(); + abstract V createMessageThatTriggersRuntimeException(); + + + @BeforeEach + void createExampleConsumer(@Autowired KafkaProperties properties) + { + exampleConsumer = new ExampleConsumer( + ID, + TOPIC, + createConsumer(properties), + mockRecordHandler, + () -> isTerminatedExceptionally.set(true)); + } + + @AfterEach + void resetSetup(@Autowired AdminClient adminClient) throws InterruptedException, ExecutionException + { + exampleConsumer.shutdown(); + adminClient + .deleteRecords(recordsToDelete()) + .all() + .get(); + mockRecordHandler.clear(); + nextMessage = 1; + isTerminatedExceptionally.set(false); + } + + private Map recordsToDelete() + { + return IntStream + .range(0, NUM_PARTITIONS) + .filter(i -> currentOffsets[i] > 0) + .mapToObj(i -> Integer.valueOf(i)) + .collect(Collectors.toMap( + i -> new TopicPartition(TOPIC, i), + i -> recordsToDelete(i))); + } + + private RecordsToDelete recordsToDelete(int partition) + { + return RecordsToDelete.beforeOffset(currentOffsets[partition] + 1); + } + + private void sendValidMessage(int partition) + { + send(partition, createValidMessage()); + } + + private void sendNonDeserializableMessage(int partition) + { + send(partition, "BOOM!".getBytes()); + } + + private void sendMessageThatTriggersRuntimeExceptionInDomain(int partition) + { + send(partition, createMessageThatTriggersRuntimeException()); + } + + private void send(int partition, V message) + { + send(partition, serializer.serialize(TOPIC, message)); + } + + private void send(int partition, byte[] bytes) + { + nextMessage++; + kafkaTemplate + .send(TOPIC, partition, "EGAL", bytes) + .thenAccept(result -> + { + RecordMetadata metadata = result.getRecordMetadata(); + currentOffsets[metadata.partition()] = metadata.offset(); + }); + } + + + + @TestConfiguration + static class ConsumerRunnableTestConfig + { + @Bean + AdminClient adminClient(@Value("${spring.embedded.kafka.brokers}") String kafkaBroker) + { + Map properties = new HashMap<>(); + properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBroker); + return AdminClient.create(properties); + } + } +} diff --git a/src/test/java/de/juplo/kafka/AbstractMockRecordHandler.java b/src/test/java/de/juplo/kafka/AbstractMockRecordHandler.java new file mode 100644 index 0000000..c861726 --- /dev/null +++ b/src/test/java/de/juplo/kafka/AbstractMockRecordHandler.java @@ -0,0 +1,37 @@ +package de.juplo.kafka; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + + +@RequiredArgsConstructor +@Slf4j +public abstract class AbstractMockRecordHandler implements RecordHandler +{ + private int numMessagesHandled = 0; + + public int getNumMessagesHandled() + { + return numMessagesHandled; + } + + @Override + public void handleRecord( + String topic, + Integer partition, + Long offset, + String key, + V value) + { + generateError(value); + numMessagesHandled++; + log.trace("Handled {} messages so far", numMessagesHandled); + } + + abstract void generateError(V value); + + public void clear() + { + numMessagesHandled = 0; + } +} diff --git a/src/test/java/de/juplo/kafka/LongExampleConsumerTest.java b/src/test/java/de/juplo/kafka/LongExampleConsumerTest.java new file mode 100644 index 0000000..09456b2 --- /dev/null +++ b/src/test/java/de/juplo/kafka/LongExampleConsumerTest.java @@ -0,0 +1,50 @@ +package de.juplo.kafka; + +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.Serializer; +import org.springframework.boot.autoconfigure.kafka.KafkaProperties; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; + +import java.util.Map; + + +public class LongExampleConsumerTest extends AbstractExampleConsumerTest +{ + @Override + AbstractMockRecordHandler createMockRecordHandler() + { + return new LongMockRecordHandler(); + } + + @Override + Serializer createSerializer() + { + return new LongSerializer(); + } + + @Override + Consumer createConsumer(KafkaProperties kafkaProperties) + { + Map properties = kafkaProperties.buildConsumerProperties(); + properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName()); + return new DefaultKafkaConsumerFactory<>(properties).createConsumer(); + } + + @Override + Long createValidMessage() + { + return nextMessage; + } + + @Override + Long createMessageThatTriggersRuntimeException() + { + return VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION; + } + + + public final static long VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION = -1; +} diff --git a/src/test/java/de/juplo/kafka/LongMockRecordHandler.java b/src/test/java/de/juplo/kafka/LongMockRecordHandler.java new file mode 100644 index 0000000..a6a1659 --- /dev/null +++ b/src/test/java/de/juplo/kafka/LongMockRecordHandler.java @@ -0,0 +1,20 @@ +package de.juplo.kafka; + +import lombok.extern.slf4j.Slf4j; + +import static de.juplo.kafka.LongExampleConsumerTest.VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION; + + +@Slf4j +public class LongMockRecordHandler extends AbstractMockRecordHandler +{ + void generateError(Long value) + { + if (value == VALUE_THAT_TRIGGERS_RUNTIMEEXCEPTION) + { + throw new RuntimeException("Unexpected application error!"); + } + + log.info("Not specifically mapped error: {}", value); + } +}