From 0ed2d37b59812a2f7d4810206ac4cb66a75693a8 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 10 Nov 2024 17:42:58 +0100 Subject: [PATCH] =?utf8?q?Vorlage=20f=C3=BCr=20die=20=C3=9Cbung=20zu=20Des?= =?utf8?q?erialisieruns-Fehlern?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- README.sh | 42 ------------------- .../java/de/juplo/kafka/ExampleConsumer.java | 34 +++++---------- 2 files changed, 10 insertions(+), 66 deletions(-) delete mode 100755 README.sh diff --git a/README.sh b/README.sh deleted file mode 100755 index 392b237e..00000000 --- a/README.sh +++ /dev/null @@ -1,42 +0,0 @@ -#!/bin/bash - -IMAGE=juplo/spring-consumer:1.1-deserialization-error-SNAPSHOT - -if [ "$1" = "cleanup" ] -then - docker compose -f docker/docker-compose.yml down -t0 -v --remove-orphans - mvn clean - exit -fi - -docker compose -f docker/docker-compose.yml up -d --remove-orphans kafka-1 kafka-2 kafka-3 -docker compose -f docker/docker-compose.yml rm -svf peter ute - -if [[ - $(docker image ls -q $IMAGE) == "" || - "$1" = "build" -]] -then - mvn clean install || exit -else - echo "Using image existing images:" - docker image ls $IMAGE -fi - -docker compose -f docker/docker-compose.yml up --remove-orphans setup || exit 1 - - -docker compose -f docker/docker-compose.yml up -d producer -docker compose -f docker/docker-compose.yml up -d peter ute -sleep 15 - -docker compose -f docker/docker-compose.yml stop producer - -echo -echo "Von peter empfangen:" -docker compose -f docker/docker-compose.yml logs peter | grep '\ test\/.' -echo -echo "Von ute empfangen:" -docker compose -f docker/docker-compose.yml logs ute | grep '\ test\/.' - -docker compose -f docker/docker-compose.yml stop peter ute diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 6554da4b..a6691c3b 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -4,7 +4,6 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.common.errors.RecordDeserializationException; import org.apache.kafka.common.errors.WakeupException; import java.time.Duration; @@ -52,30 +51,17 @@ public class ExampleConsumer implements Runnable while (running) { - try - { - ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); - - log.info("{} - Received {} messages", id, records.count()); - for (ConsumerRecord record : records) - { - handleRecord( - record.topic(), - record.partition(), - record.offset(), - record.key(), - record.value()); - } - } - catch (RecordDeserializationException e) + ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); + + log.info("{} - Received {} messages", id, records.count()); + for (ConsumerRecord record : records) { - log.error( - "{} - Ignoring invalid record for offset {} on partition {}: {}", - id, - e.offset(), - e.topicPartition(), - e.getMessage()); - consumer.seek(e.topicPartition(), e.offset() + 1); + handleRecord( + record.topic(), + record.partition(), + record.offset(), + record.key(), + record.value()); } } } -- 2.20.1