`EndlessConsumer` nimmt jetzt einzelne `ConsumerRecord`s entgegen
[demos/kafka/training] / src / main / java / de / juplo / kafka / ApplicationErrorHandler.java
index 6e15717..52c6a0c 100644 (file)
@@ -2,11 +2,15 @@ package de.juplo.kafka;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
 import org.springframework.kafka.listener.CommonErrorHandler;
 import org.springframework.kafka.listener.MessageListenerContainer;
-import org.springframework.util.Assert;
 
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 
 
@@ -16,6 +20,11 @@ public class ApplicationErrorHandler implements CommonErrorHandler
   private Exception exception;
   private boolean ack = true;
 
+  @Override
+  public boolean remainingRecords()
+  {
+    return true;
+  }
 
   @Override
   public void handleOtherException(
@@ -24,7 +33,22 @@ public class ApplicationErrorHandler implements CommonErrorHandler
     MessageListenerContainer container,
     boolean batchListener)
   {
-    Assert.isTrue(batchListener, getClass().getName() + " is only applicable for Batch-Listeners");
+    rememberExceptionAndStopContainer(thrownException, container);
+  }
+
+  @Override
+  public void handleRemaining(
+    Exception thrownException,
+    List<ConsumerRecord<?, ?>> records,
+    Consumer<?, ?> consumer,
+    MessageListenerContainer container)
+  {
+    Map<TopicPartition, Long> offsets = new HashMap<>();
+    records.forEach(record ->
+      offsets.computeIfAbsent(
+          new TopicPartition(record.topic(), record.partition()),
+          offset -> record.offset()));
+    offsets.forEach((tp, offset) -> consumer.seek(tp, offset));
     rememberExceptionAndStopContainer(thrownException, container);
   }