Springify: Gemeinsame DLQ für Poison Pills und Fachlogik-Fehler konfiguriert
[demos/kafka/training] / src / main / java / de / juplo / kafka / EndlessConsumer.java
index 09d08da..04a0a3a 100644 (file)
@@ -3,20 +3,24 @@ package de.juplo.kafka;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.kafka.annotation.KafkaListener;
 import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
+import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
 import org.springframework.stereotype.Component;
 
-import java.util.Optional;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.function.Consumer;
 
 
 @Component
 @Slf4j
 @RequiredArgsConstructor
-public class EndlessConsumer<K, V>
+public class EndlessConsumer<K, V> implements ConsumerAwareRebalanceListener
 {
   @Autowired
   private KafkaListenerEndpointRegistry registry;
@@ -24,11 +28,59 @@ public class EndlessConsumer<K, V>
   String id;
   @Autowired
   Consumer<ConsumerRecord<K, V>> handler;
-  @Autowired
-  ApplicationErrorHandler errorHandler;
 
   private long consumed = 0;
 
+  private final Map<Integer, Map<String, Long>> seen = new HashMap<>();
+  private final Map<Integer, Long> offsets = new HashMap<>();
+
+
+  @Override
+  public void onPartitionsRevokedBeforeCommit(
+      org.apache.kafka.clients.consumer.Consumer<?, ?> consumer,
+      Collection<TopicPartition> partitions)
+  {
+    partitions.forEach(tp ->
+    {
+      Integer partition = tp.partition();
+      Long newOffset = consumer.position(tp);
+      Long oldOffset = offsets.remove(partition);
+      log.info(
+          "{} - removing partition: {}, consumed {} records (offset {} -> {})",
+          id,
+          partition,
+          newOffset - oldOffset,
+          oldOffset,
+          newOffset);
+      Map<String, Long> removed = seen.remove(partition);
+      for (String key : removed.keySet())
+      {
+        log.info(
+            "{} - Seen {} messages for partition={}|key={}",
+            id,
+            removed.get(key),
+            partition,
+            key);
+      }
+    });
+  }
+
+  @Override
+  public void onPartitionsAssigned(
+      org.apache.kafka.clients.consumer.Consumer<?, ?> consumer,
+      Collection<TopicPartition> partitions)
+  {
+    partitions.forEach(tp ->
+    {
+      Integer partition = tp.partition();
+      Long offset = consumer.position(tp);
+      log.info("{} - adding partition: {}, offset={}", id, partition, offset);
+      offsets.put(partition, offset);
+      seen.put(partition, new HashMap<>());
+    });
+  }
+
+
   @KafkaListener(
       id = "${spring.kafka.consumer.client-id}",
       idIsGroup = false,
@@ -58,7 +110,6 @@ public class EndlessConsumer<K, V>
       throw new IllegalStateException("Consumer instance " + id + " is already running!");
 
     log.info("{} - Starting - consumed {} messages before", id, consumed);
-    errorHandler.clearException();
     registry.getListenerContainer(id).start();
   }
 
@@ -72,11 +123,8 @@ public class EndlessConsumer<K, V>
     log.info("{} - Stopped - consumed {} messages so far", id, consumed);
   }
 
-  public synchronized Optional<Exception> exitStatus()
+  public synchronized boolean isRunning()
   {
-    if (registry.getListenerContainer(id).isChildRunning())
-      throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!");
-
-    return errorHandler.getException();
+    return registry.getListenerContainer(id).isChildRunning();
   }
 }