From: Kai Moritz Date: Sun, 10 Nov 2024 14:49:10 +0000 (+0100) Subject: Version des `spring-consumer`, die einen Anwendungsfehler fängt und ignoriert X-Git-Tag: consumer/spring-consumer--logic-error--COMMITS--2025-02 X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=8f53f51c691c53dd11ea0a321214c3e045436563;p=demos%2Fkafka%2Ftraining Version des `spring-consumer`, die einen Anwendungsfehler fängt und ignoriert --- diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 79382ef9..c4b9b7d0 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -57,12 +57,24 @@ public class ExampleConsumer implements Runnable log.info("{} - Received {} messages", id, records.count()); for (ConsumerRecord record : records) { - handleRecord( - record.topic(), - record.partition(), - record.offset(), - record.key(), - record.value()); + try + { + handleRecord( + record.topic(), + record.partition(), + record.offset(), + record.key(), + record.value()); + } + catch (NumberFormatException e) + { + log.error( + "{} - Ignoring invalid message for offset {} on partition {}: {}", + id, + record.offset(), + record.partition(), + record.value()); + } } } } @@ -90,8 +102,9 @@ public class ExampleConsumer implements Runnable Integer partition, Long offset, String key, - String value) + String message) { + long value = Long.parseLong(message); consumed++; log.info("{} - partition={}-{}, offset={}: {}={}", id, topic, partition, offset, key, value); }