From: Kai Moritz Date: Sun, 10 Apr 2022 12:50:50 +0000 (+0200) Subject: Tests: Umbau für einen Commit im Fehlerfall und Anpassung des Tests X-Git-Tag: deserialization-synchroner-test X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=1de9ff317978ecb46e48e2cf58a07b9cf19faad9;p=demos%2Fkafka%2Ftraining Tests: Umbau für einen Commit im Fehlerfall und Anpassung des Tests --- diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 0a95b2c..b173b12 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -4,6 +4,7 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.RecordDeserializationException; import org.apache.kafka.common.errors.WakeupException; import javax.annotation.PreDestroy; @@ -128,6 +129,20 @@ public class EndlessConsumer implements Runnable consumer.commitSync(); shutdown(); } + catch(RecordDeserializationException e) + { + TopicPartition tp = e.topicPartition(); + long offset = e.offset(); + log.error( + "{} - Could not deserialize message on topic {} with offset={}: {}", + id, + tp, + offset, + e.getCause().toString()); + + consumer.commitSync(); + shutdown(e); + } catch(Exception e) { log.error("{} - Unexpected error: {}", id, e.toString(), e); diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 6aaff52..17d8431 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -87,7 +87,7 @@ class ApplicationTests @Test @Order(2) - void commitsNoOffsetsOnError() + void commitsOffsetOfErrorForReprocessingOnError() { send100Messages(counter -> counter == 77 @@ -97,7 +97,7 @@ class ApplicationTests runEndlessConsumer((record) -> {}); checkSeenOffsetsForProgress(); - compareToCommitedOffsets(oldOffsets); + compareToCommitedOffsets(newOffsets); }