Springify: BatchListener konfiguriert - Hilft nicht wirklich...
[demos/kafka/training] / src / main / java / de / juplo / kafka / EndlessConsumer.java
index 87780b4..929bdbd 100644 (file)
@@ -8,6 +8,7 @@ import org.springframework.beans.factory.annotation.Value;
 import org.springframework.kafka.annotation.KafkaListener;
 import org.springframework.stereotype.Component;
 
+import java.util.List;
 import java.util.function.Consumer;
 
 
@@ -22,19 +23,24 @@ public class EndlessConsumer<K, V>
   Consumer<ConsumerRecord<K, V>> handler;
 
 
-  @KafkaListener(topics = "${consumer.topic}")
-  public void receive(ConsumerRecord<K, V> record)
+  @KafkaListener(topics = "${consumer.topic}", containerFactory = "batchFactory")
+  public void receive(List<ConsumerRecord<K, V>> records)
   {
-    log.info(
-        "{} - {}: {}/{} - {}={}",
-        id,
-        record.offset(),
-        record.topic(),
-        record.partition(),
-        record.key(),
-        record.value()
-    );
+    // Do something with the data...
+    log.info("{} - Received {} messages", id, records.size());
+    for (ConsumerRecord<K, V> record : records)
+    {
+      log.info(
+          "{} - {}: {}/{} - {}={}",
+          id,
+          record.offset(),
+          record.topic(),
+          record.partition(),
+          record.key(),
+          record.value()
+      );
 
-    handler.accept(record);
+      handler.accept(record);
+    }
   }
 }