Version des `spring-consumer`, die einen Anwendungsfehler fängt und ignoriert consumer/spring-consumer--logic-error consumer/spring-consumer--logic-error--2025-04-signal-spickzettel consumer/spring-consumer--logic-error--2025-05-lvm consumer/spring-consumer--logic-error--2025-05-lvm--spickzettel consumer/spring-consumer--logic-error--2025-05-signal-spickzettel consumer/spring-consumer--logic-error--2025-07-05
authorKai Moritz <kai@juplo.de>
Sun, 10 Nov 2024 14:49:10 +0000 (15:49 +0100)
committerKai Moritz <kai@juplo.de>
Sat, 12 Apr 2025 22:32:02 +0000 (00:32 +0200)
src/main/java/de/juplo/kafka/ExampleConsumer.java

index 1f5a570..8eb4af4 100644 (file)
@@ -54,12 +54,24 @@ public class ExampleConsumer implements Runnable
         log.info("{} - Received {} messages", id, records.count());
         for (ConsumerRecord<String, String> record : records)
         {
-          handleRecord(
-            record.topic(),
-            record.partition(),
-            record.offset(),
-            record.key(),
-            record.value());
+          try
+          {
+            handleRecord(
+              record.topic(),
+              record.partition(),
+              record.offset(),
+              record.key(),
+              record.value());
+          }
+          catch (NumberFormatException e)
+          {
+            log.error(
+              "{} - Ignoring invalid message for offset {} on partition {}: {}",
+              id,
+              record.offset(),
+              record.partition(),
+              record.value());
+          }
         }
       }
     }
@@ -87,8 +99,9 @@ public class ExampleConsumer implements Runnable
     Integer partition,
     Long offset,
     String key,
-    String value)
+    String message)
   {
+    long value = Long.parseLong(message.toString());
     consumed++;
     log.info("{} - partition={}-{}, offset={}: {}={}", id, topic, partition, offset, key, value);
   }