Version des `spring-consumer`, die einen Anwendungsfehler fängt und ignoriert consumer/spring-consumer--logic-error--generics4all consumer/spring-consumer--logic-error--generics4some
authorKai Moritz <kai@juplo.de>
Sun, 10 Nov 2024 14:49:10 +0000 (15:49 +0100)
committerKai Moritz <kai@juplo.de>
Fri, 21 Feb 2025 14:44:07 +0000 (15:44 +0100)
src/main/java/de/juplo/kafka/ExampleConsumer.java

index a6691c3..48fa746 100644 (file)
@@ -56,12 +56,24 @@ public class ExampleConsumer<K, V> implements Runnable
         log.info("{} - Received {} messages", id, records.count());
         for (ConsumerRecord<K, V> record : records)
         {
-          handleRecord(
-            record.topic(),
-            record.partition(),
-            record.offset(),
-            record.key(),
-            record.value());
+          try
+          {
+            handleRecord(
+              record.topic(),
+              record.partition(),
+              record.offset(),
+              record.key(),
+              record.value());
+          }
+          catch (NumberFormatException e)
+          {
+            log.error(
+              "{} - Ignoring invalid message for offset {} on partition {}: {}",
+              id,
+              record.offset(),
+              record.partition(),
+              record.value());
+          }
         }
       }
     }
@@ -89,8 +101,9 @@ public class ExampleConsumer<K, V> implements Runnable
     Integer partition,
     Long offset,
     K key,
-    V value)
+    V message)
   {
+    long value = Long.parseLong(message.toString());
     consumed++;
     log.info("{} - partition={}-{}, offset={}: {}={}", id, topic, partition, offset, key, value);
   }