From d69d1f5b0f49d3014fc1a99885dfd031e1413307 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 10 Nov 2024 15:49:10 +0100 Subject: [PATCH] =?utf8?q?Version=20des=20`spring-consumer`,=20die=20einen?= =?utf8?q?=20Anwendungsfehler=20f=C3=A4ngt=20und=20ignoriert?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit --- .../java/de/juplo/kafka/ExampleConsumer.java | 27 ++++++++++++++----- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index f832b45..9a1984f 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -57,12 +57,24 @@ public class ExampleConsumer implements Runnable log.info("{} - Received {} messages", id, records.count()); for (ConsumerRecord record : records) { - handleRecord( - record.topic(), - record.partition(), - record.offset(), - record.key(), - record.value()); + try + { + handleRecord( + record.topic(), + record.partition(), + record.offset(), + record.key(), + record.value()); + } + catch (NumberFormatException e) + { + log.error( + "{} - Ignoring invalid message for offset {} on partition {}: {}", + id, + record.offset(), + record.partition(), + record.value()); + } } } } @@ -93,7 +105,8 @@ public class ExampleConsumer implements Runnable String value) { consumed++; - log.info("{} - {}: {}/{} - {}={}", id, offset, topic, partition, key, value); + long message = Long.parseLong(value); + log.info("{} - {}: {}/{} - {}={}", id, offset, topic, partition, key, message); } -- 2.20.1