From 51c35ea80a1fa321d2fa599be843ea12e90a97df Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 10 Nov 2024 17:57:23 +0100 Subject: [PATCH] =?utf8?q?Version=20des=20``spring-consumer``,=20der=20Des?= =?utf8?q?erialisierungs-Fehler=20=C3=BCberspringt?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- README.sh | 2 +- docker/docker-compose.yml | 4 +-- pom.xml | 2 +- .../java/de/juplo/kafka/ExampleConsumer.java | 32 +++++++++++++------ 4 files changed, 27 insertions(+), 13 deletions(-) diff --git a/README.sh b/README.sh index f4696e0..ca8553a 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/spring-consumer:1.1-long-SNAPSHOT +IMAGE=juplo/spring-consumer:1.1-deserialization-error-SNAPSHOT if [ "$1" = "cleanup" ] then diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index b899570..6d6b7b7 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -197,14 +197,14 @@ services: juplo.producer.topic: test consumer-1: - image: juplo/spring-consumer:1.1-long-SNAPSHOT + image: juplo/spring-consumer:1.1-deserialization-error-SNAPSHOT environment: juplo.bootstrap-server: kafka:9092 juplo.client-id: consumer-1 juplo.consumer.topic: test consumer-2: - image: juplo/spring-consumer:1.1-long-SNAPSHOT + image: juplo/spring-consumer:1.1-deserialization-error-SNAPSHOT environment: juplo.bootstrap-server: kafka:9092 juplo.client-id: consumer-2 diff --git a/pom.xml b/pom.xml index c32791f..abcf5cb 100644 --- a/pom.xml +++ b/pom.xml @@ -15,7 +15,7 @@ spring-consumer Spring Consumer Super Simple Consumer-Group, that is implemented as Spring-Boot application and configured by Spring Kafka - 1.1-long-SNAPSHOT + 1.1-deserialization-error-SNAPSHOT 21 diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index faa4c0a..bb0b619 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -4,6 +4,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.common.errors.RecordDeserializationException; import org.apache.kafka.common.errors.WakeupException; import java.time.Duration; @@ -51,18 +52,31 @@ public class ExampleConsumer implements Runnable while (running) { - ConsumerRecords records = + try + { + ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); - log.info("{} - Received {} messages", id, records.count()); - for (ConsumerRecord record : records) + log.info("{} - Received {} messages", id, records.count()); + for (ConsumerRecord record : records) + { + handleRecord( + record.topic(), + record.partition(), + record.offset(), + record.key(), + record.value()); + } + } + catch (RecordDeserializationException e) { - handleRecord( - record.topic(), - record.partition(), - record.offset(), - record.key(), - record.value()); + log.error( + "{} - Ignoring invalid record for offset {} on partition {}: {}", + id, + e.offset(), + e.topicPartition(), + e.getMessage()); + consumer.seek(e.topicPartition(), e.offset() + 1); } } } -- 2.20.1