Refaktorisierung für Tests - Record-Handler als Bean konfigurierbar
[demos/kafka/training] / src / main / java / de / juplo / kafka / EndlessConsumer.java
index 0bf5925..38dd360 100644 (file)
@@ -24,6 +24,7 @@ public class EndlessConsumer implements Runnable
   private final String id;
   private final String topic;
   private final Consumer<String, String> consumer;
+  private final java.util.function.Consumer<ConsumerRecord<String, String>> handler;
 
   private final Lock lock = new ReentrantLock();
   private final Condition condition = lock.newCondition();
@@ -94,7 +95,6 @@ public class EndlessConsumer implements Runnable
         log.info("{} - Received {} messages", id, records.count());
         for (ConsumerRecord<String, String> record : records)
         {
-          consumed++;
           log.info(
               "{} - {}: {}/{} - {}={}",
               id,
@@ -105,6 +105,10 @@ public class EndlessConsumer implements Runnable
               record.value()
           );
 
+          handler.accept(record);
+
+          consumed++;
+
           Integer partition = record.partition();
           String key = record.key() == null ? "NULL" : record.key();
           Map<String, Long> byKey = seen.get(partition);