From 1de9ff317978ecb46e48e2cf58a07b9cf19faad9 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 10 Apr 2022 14:50:50 +0200 Subject: [PATCH] =?utf8?q?Tests:=20Umbau=20f=C3=BCr=20einen=20Commit=20im?= =?utf8?q?=20Fehlerfall=20und=20Anpassung=20des=20Tests?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- src/main/java/de/juplo/kafka/EndlessConsumer.java | 15 +++++++++++++++ .../java/de/juplo/kafka/ApplicationTests.java | 4 ++-- 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 0a95b2c..b173b12 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -4,6 +4,7 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.RecordDeserializationException; import org.apache.kafka.common.errors.WakeupException; import javax.annotation.PreDestroy; @@ -128,6 +129,20 @@ public class EndlessConsumer implements Runnable consumer.commitSync(); shutdown(); } + catch(RecordDeserializationException e) + { + TopicPartition tp = e.topicPartition(); + long offset = e.offset(); + log.error( + "{} - Could not deserialize message on topic {} with offset={}: {}", + id, + tp, + offset, + e.getCause().toString()); + + consumer.commitSync(); + shutdown(e); + } catch(Exception e) { log.error("{} - Unexpected error: {}", id, e.toString(), e); diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 6aaff52..17d8431 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -87,7 +87,7 @@ class ApplicationTests @Test @Order(2) - void commitsNoOffsetsOnError() + void commitsOffsetOfErrorForReprocessingOnError() { send100Messages(counter -> counter == 77 @@ -97,7 +97,7 @@ class ApplicationTests runEndlessConsumer((record) -> {}); checkSeenOffsetsForProgress(); - compareToCommitedOffsets(oldOffsets); + compareToCommitedOffsets(newOffsets); } -- 2.20.1