Springify: Gemeinsame DLQ für Poison Pills und Fachlogik-Fehler konfiguriert
[demos/kafka/training] / src / main / java / de / juplo / kafka / EndlessConsumer.java
index 472e07d..04a0a3a 100644 (file)
@@ -3,49 +3,128 @@ package de.juplo.kafka;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
+import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
 import org.springframework.stereotype.Component;
 
-import java.util.List;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.function.Consumer;
 
 
 @Component
 @Slf4j
 @RequiredArgsConstructor
-public class EndlessConsumer<K, V>
+public class EndlessConsumer<K, V> implements ConsumerAwareRebalanceListener
 {
-  @Value("${consumer.client-id}")
+  @Autowired
+  private KafkaListenerEndpointRegistry registry;
+  @Value("${spring.kafka.consumer.client-id}")
   String id;
   @Autowired
   Consumer<ConsumerRecord<K, V>> handler;
 
+  private long consumed = 0;
+
+  private final Map<Integer, Map<String, Long>> seen = new HashMap<>();
+  private final Map<Integer, Long> offsets = new HashMap<>();
+
+
+  @Override
+  public void onPartitionsRevokedBeforeCommit(
+      org.apache.kafka.clients.consumer.Consumer<?, ?> consumer,
+      Collection<TopicPartition> partitions)
+  {
+    partitions.forEach(tp ->
+    {
+      Integer partition = tp.partition();
+      Long newOffset = consumer.position(tp);
+      Long oldOffset = offsets.remove(partition);
+      log.info(
+          "{} - removing partition: {}, consumed {} records (offset {} -> {})",
+          id,
+          partition,
+          newOffset - oldOffset,
+          oldOffset,
+          newOffset);
+      Map<String, Long> removed = seen.remove(partition);
+      for (String key : removed.keySet())
+      {
+        log.info(
+            "{} - Seen {} messages for partition={}|key={}",
+            id,
+            removed.get(key),
+            partition,
+            key);
+      }
+    });
+  }
+
+  @Override
+  public void onPartitionsAssigned(
+      org.apache.kafka.clients.consumer.Consumer<?, ?> consumer,
+      Collection<TopicPartition> partitions)
+  {
+    partitions.forEach(tp ->
+    {
+      Integer partition = tp.partition();
+      Long offset = consumer.position(tp);
+      log.info("{} - adding partition: {}, offset={}", id, partition, offset);
+      offsets.put(partition, offset);
+      seen.put(partition, new HashMap<>());
+    });
+  }
+
 
   @KafkaListener(
-      id = "${consumer.client-id}",
+      id = "${spring.kafka.consumer.client-id}",
       idIsGroup = false,
       topics = "${consumer.topic}",
-      containerFactory = "batchFactory",
       autoStartup = "false")
-  public void receive(List<ConsumerRecord<K, V>> records)
+  public void receive(ConsumerRecord<K, V> record)
   {
-    // Do something with the data...
-    log.info("{} - Received {} messages", id, records.size());
-    for (ConsumerRecord<K, V> record : records)
-    {
-      log.info(
-          "{} - {}: {}/{} - {}={}",
-          id,
-          record.offset(),
-          record.topic(),
-          record.partition(),
-          record.key(),
-          record.value()
-      );
-
-      handler.accept(record);
-    }
+    log.info(
+        "{} - {}: {}/{} - {}={}",
+        id,
+        record.offset(),
+        record.topic(),
+        record.partition(),
+        record.key(),
+        record.value()
+    );
+
+    handler.accept(record);
+
+    consumed++;
+  }
+
+
+  public synchronized void start()
+  {
+    if (registry.getListenerContainer(id).isChildRunning())
+      throw new IllegalStateException("Consumer instance " + id + " is already running!");
+
+    log.info("{} - Starting - consumed {} messages before", id, consumed);
+    registry.getListenerContainer(id).start();
+  }
+
+  public synchronized void stop()
+  {
+    if (!registry.getListenerContainer(id).isChildRunning())
+      throw new IllegalStateException("Consumer instance " + id + " is not running!");
+
+    log.info("{} - Stopping", id);
+    registry.getListenerContainer(id).stop();
+    log.info("{} - Stopped - consumed {} messages so far", id, consumed);
+  }
+
+  public synchronized boolean isRunning()
+  {
+    return registry.getListenerContainer(id).isChildRunning();
   }
 }