Version des `spring-consumer`, die einen Anwendungsfehler fängt und ignoriert consumer/spring-consumer--logic-error consumer/spring-consumer--logic-error--2025-03-18--19-42 consumer/spring-consumer--logic-error--2025-03-signal consumer/spring-consumer--logic-error--2025-04-signal
authorKai Moritz <kai@juplo.de>
Sun, 10 Nov 2024 14:49:10 +0000 (15:49 +0100)
committerKai Moritz <kai@juplo.de>
Sat, 15 Mar 2025 18:22:13 +0000 (19:22 +0100)
src/main/java/de/juplo/kafka/ExampleConsumer.java

index 101abd1..6522e3b 100644 (file)
@@ -54,12 +54,24 @@ public class ExampleConsumer<K, V> implements Runnable
         log.info("{} - Received {} messages", id, records.count());
         for (ConsumerRecord<K, V> record : records)
         {
-          handleRecord(
-            record.topic(),
-            record.partition(),
-            record.offset(),
-            record.key(),
-            record.value());
+          try
+          {
+            handleRecord(
+              record.topic(),
+              record.partition(),
+              record.offset(),
+              record.key(),
+              record.value());
+          }
+          catch (NumberFormatException e)
+          {
+            log.error(
+              "{} - Ignoring invalid message for offset {} on partition {}: {}",
+              id,
+              record.offset(),
+              record.partition(),
+              record.value());
+          }
         }
       }
     }
@@ -87,8 +99,9 @@ public class ExampleConsumer<K, V> implements Runnable
     Integer partition,
     Long offset,
     K key,
-    V value)
+    V message)
   {
+    long value = Long.parseLong(message.toString());
     consumed++;
     log.info("{} - partition={}-{}, offset={}: {}={}", id, topic, partition, offset, key, value);
   }