Springify: Gemeinsame DLQ für Poison Pills und Fachlogik-Fehler konfiguriert
[demos/kafka/training] / src / main / java / de / juplo / kafka / EndlessConsumer.java
index 888805f..04a0a3a 100644 (file)
@@ -3,31 +3,86 @@ package de.juplo.kafka;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.kafka.annotation.KafkaListener;
 import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
+import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
 import org.springframework.stereotype.Component;
 
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.function.Consumer;
 
 
 @Component
 @Slf4j
 @RequiredArgsConstructor
-public class EndlessConsumer<K, V>
+public class EndlessConsumer<K, V> implements ConsumerAwareRebalanceListener
 {
   @Autowired
   private KafkaListenerEndpointRegistry registry;
-  @Value("${consumer.client-id}")
+  @Value("${spring.kafka.consumer.client-id}")
   String id;
   @Autowired
   Consumer<ConsumerRecord<K, V>> handler;
 
   private long consumed = 0;
 
+  private final Map<Integer, Map<String, Long>> seen = new HashMap<>();
+  private final Map<Integer, Long> offsets = new HashMap<>();
+
+
+  @Override
+  public void onPartitionsRevokedBeforeCommit(
+      org.apache.kafka.clients.consumer.Consumer<?, ?> consumer,
+      Collection<TopicPartition> partitions)
+  {
+    partitions.forEach(tp ->
+    {
+      Integer partition = tp.partition();
+      Long newOffset = consumer.position(tp);
+      Long oldOffset = offsets.remove(partition);
+      log.info(
+          "{} - removing partition: {}, consumed {} records (offset {} -> {})",
+          id,
+          partition,
+          newOffset - oldOffset,
+          oldOffset,
+          newOffset);
+      Map<String, Long> removed = seen.remove(partition);
+      for (String key : removed.keySet())
+      {
+        log.info(
+            "{} - Seen {} messages for partition={}|key={}",
+            id,
+            removed.get(key),
+            partition,
+            key);
+      }
+    });
+  }
+
+  @Override
+  public void onPartitionsAssigned(
+      org.apache.kafka.clients.consumer.Consumer<?, ?> consumer,
+      Collection<TopicPartition> partitions)
+  {
+    partitions.forEach(tp ->
+    {
+      Integer partition = tp.partition();
+      Long offset = consumer.position(tp);
+      log.info("{} - adding partition: {}, offset={}", id, partition, offset);
+      offsets.put(partition, offset);
+      seen.put(partition, new HashMap<>());
+    });
+  }
+
+
   @KafkaListener(
-      id = "${consumer.client-id}",
+      id = "${spring.kafka.consumer.client-id}",
       idIsGroup = false,
       topics = "${consumer.topic}",
       autoStartup = "false")
@@ -67,4 +122,9 @@ public class EndlessConsumer<K, V>
     registry.getListenerContainer(id).stop();
     log.info("{} - Stopped - consumed {} messages so far", id, consumed);
   }
+
+  public synchronized boolean isRunning()
+  {
+    return registry.getListenerContainer(id).isChildRunning();
+  }
 }