From: Kai Moritz Date: Sun, 10 Apr 2022 12:50:50 +0000 (+0200) Subject: Tests: Umbau für einen Commit im Fehlerfall und Anpassung des Tests X-Git-Tag: sumup-requests---lvm-2-tage~5^2^2^2~2^2~11 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;ds=sidebyside;h=fb966920f18413076a28876ffcd125c84f9a6f06;p=demos%2Fkafka%2Ftraining Tests: Umbau für einen Commit im Fehlerfall und Anpassung des Tests --- diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 0a95b2c..b173b12 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -4,6 +4,7 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.*; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.RecordDeserializationException; import org.apache.kafka.common.errors.WakeupException; import javax.annotation.PreDestroy; @@ -128,6 +129,20 @@ public class EndlessConsumer implements Runnable consumer.commitSync(); shutdown(); } + catch(RecordDeserializationException e) + { + TopicPartition tp = e.topicPartition(); + long offset = e.offset(); + log.error( + "{} - Could not deserialize message on topic {} with offset={}: {}", + id, + tp, + offset, + e.getCause().toString()); + + consumer.commitSync(); + shutdown(e); + } catch(Exception e) { log.error("{} - Unexpected error: {}", id, e.toString(), e); diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index e35b223..994d657 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -97,7 +97,7 @@ class ApplicationTests @Test @Order(2) - void commitsNoOffsetsOnError() + void commitsOffsetOfErrorForReprocessingOnError() { send100Messages(counter -> counter == 77 @@ -109,7 +109,7 @@ class ApplicationTests .until(() -> !endlessConsumer.running()); checkSeenOffsetsForProgress(); - compareToCommitedOffsets(oldOffsets); + compareToCommitedOffsets(newOffsets); }