Ignoring poision pills with illeagal state-changes
[demos/kafka/demos-kafka-payment-system-transfer] / src / main / java / de / juplo / kafka / payment / transfer / adapter / TransferConsumer.java
index 63fbef5..18f5383 100644 (file)
@@ -105,6 +105,16 @@ public class TransferConsumer implements Runnable
           record.partition(),
           record.value());
     }
+    catch (IllegalArgumentException e)
+    {
+      log.error(
+          "ignoring invalid message #{} on {}/{}: {}, message={}",
+          record.offset(),
+          record.topic(),
+          record.partition(),
+          e.getMessage(),
+          record.value());
+    }
   }
 
   @EventListener