From: Kai Moritz Date: Sun, 10 Nov 2024 16:57:23 +0000 (+0100) Subject: Version des ``spring-consumer``, der Deserialisierungs-Fehler überspringt X-Git-Tag: consumer/spring-consumer--deserialization-error--2024-11-13--si X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=refs%2Fheads%2Fconsumer%2Fspring-consumer--deserialization-error;p=demos%2Fkafka%2Ftraining Version des ``spring-consumer``, der Deserialisierungs-Fehler überspringt --- diff --git a/README.sh b/README.sh index f4696e0..ca8553a 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/spring-consumer:1.1-long-SNAPSHOT +IMAGE=juplo/spring-consumer:1.1-deserialization-error-SNAPSHOT if [ "$1" = "cleanup" ] then diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index b899570..6d6b7b7 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -197,14 +197,14 @@ services: juplo.producer.topic: test consumer-1: - image: juplo/spring-consumer:1.1-long-SNAPSHOT + image: juplo/spring-consumer:1.1-deserialization-error-SNAPSHOT environment: juplo.bootstrap-server: kafka:9092 juplo.client-id: consumer-1 juplo.consumer.topic: test consumer-2: - image: juplo/spring-consumer:1.1-long-SNAPSHOT + image: juplo/spring-consumer:1.1-deserialization-error-SNAPSHOT environment: juplo.bootstrap-server: kafka:9092 juplo.client-id: consumer-2 diff --git a/pom.xml b/pom.xml index c32791f..abcf5cb 100644 --- a/pom.xml +++ b/pom.xml @@ -15,7 +15,7 @@ spring-consumer Spring Consumer Super Simple Consumer-Group, that is implemented as Spring-Boot application and configured by Spring Kafka - 1.1-long-SNAPSHOT + 1.1-deserialization-error-SNAPSHOT 21 diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index faa4c0a..bb0b619 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -4,6 +4,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.errors.RecordDeserializationException; import org.apache.kafka.common.errors.WakeupException; import java.time.Duration; @@ -51,18 +52,31 @@ public class ExampleConsumer implements Runnable while (running) { - ConsumerRecords records = + try + { + ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); - log.info("{} - Received {} messages", id, records.count()); - for (ConsumerRecord record : records) + log.info("{} - Received {} messages", id, records.count()); + for (ConsumerRecord record : records) + { + handleRecord( + record.topic(), + record.partition(), + record.offset(), + record.key(), + record.value()); + } + } + catch (RecordDeserializationException e) { - handleRecord( - record.topic(), - record.partition(), - record.offset(), - record.key(), - record.value()); + log.error( + "{} - Ignoring invalid record for offset {} on partition {}: {}", + id, + e.offset(), + e.topicPartition(), + e.getMessage()); + consumer.seek(e.topicPartition(), e.offset() + 1); } } }