X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FEndlessConsumer.java;h=38dd36090eb7384eeb296b32027cf47b30bd6aad;hb=9259b912d4294b7b21b49536ed1cc72175e1b2eb;hp=0bf59256fe4aec064d66c379f9da02c3d5fd9767;hpb=5a2c467b5b299f975f22d6c0e761686067634adc;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 0bf5925..38dd360 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -24,6 +24,7 @@ public class EndlessConsumer implements Runnable private final String id; private final String topic; private final Consumer consumer; + private final java.util.function.Consumer> handler; private final Lock lock = new ReentrantLock(); private final Condition condition = lock.newCondition(); @@ -94,7 +95,6 @@ public class EndlessConsumer implements Runnable log.info("{} - Received {} messages", id, records.count()); for (ConsumerRecord record : records) { - consumed++; log.info( "{} - {}: {}/{} - {}={}", id, @@ -105,6 +105,10 @@ public class EndlessConsumer implements Runnable record.value() ); + handler.accept(record); + + consumed++; + Integer partition = record.partition(); String key = record.key() == null ? "NULL" : record.key(); Map byKey = seen.get(partition);