Springify: Die `@PreDestroy`-Methode wird nicht benötigt
[demos/kafka/training] / src / main / java / de / juplo / kafka / EndlessConsumer.java
index 87780b4..5e76865 100644 (file)
@@ -6,8 +6,10 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
 import org.springframework.stereotype.Component;
 
+import java.util.List;
 import java.util.function.Consumer;
 
 
@@ -16,25 +18,60 @@ import java.util.function.Consumer;
 @RequiredArgsConstructor
 public class EndlessConsumer<K, V>
 {
+  @Autowired
+  private KafkaListenerEndpointRegistry registry;
   @Value("${consumer.client-id}")
   String id;
   @Autowired
   Consumer<ConsumerRecord<K, V>> handler;
 
+  private long consumed = 0;
+
+  @KafkaListener(
+      id = "${consumer.client-id}",
+      idIsGroup = false,
+      topics = "${consumer.topic}",
+      containerFactory = "batchFactory",
+      autoStartup = "false")
+  public void receive(List<ConsumerRecord<K, V>> records)
+  {
+    // Do something with the data...
+    log.info("{} - Received {} messages", id, records.size());
+    for (ConsumerRecord<K, V> record : records)
+    {
+      log.info(
+          "{} - {}: {}/{} - {}={}",
+          id,
+          record.offset(),
+          record.topic(),
+          record.partition(),
+          record.key(),
+          record.value()
+      );
+
+      handler.accept(record);
+
+      consumed++;
+    }
+  }
+
+
+  public synchronized void start()
+  {
+    if (registry.getListenerContainer(id).isChildRunning())
+      throw new IllegalStateException("Consumer instance " + id + " is already running!");
+
+    log.info("{} - Starting - consumed {} messages before", id, consumed);
+    registry.getListenerContainer(id).start();
+  }
 
-  @KafkaListener(topics = "${consumer.topic}")
-  public void receive(ConsumerRecord<K, V> record)
+  public synchronized void stop()
   {
-    log.info(
-        "{} - {}: {}/{} - {}={}",
-        id,
-        record.offset(),
-        record.topic(),
-        record.partition(),
-        record.key(),
-        record.value()
-    );
-
-    handler.accept(record);
+    if (!registry.getListenerContainer(id).isChildRunning())
+      throw new IllegalStateException("Consumer instance " + id + " is not running!");
+
+    log.info("{} - Stopping", id);
+    registry.getListenerContainer(id).stop();
+    log.info("{} - Stopped - consumed {} messages so far", id, consumed);
   }
 }