X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FEndlessConsumer.java;h=063a09ebcc3e9ced137fda80af7d962c6f849580;hb=808bd074aaae940e5f81bc9f09c42d48d1fd2670;hp=b3dd446d84ddbc7fddd44d516cbd921eff1ece7e;hpb=8c734b77dd71f6b35707e8085d59ac5b6c43720d;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index b3dd446..063a09e 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -10,6 +10,8 @@ import org.apache.kafka.common.serialization.StringDeserializer; import javax.annotation.PreDestroy; import java.time.Duration; import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -32,6 +34,9 @@ public class EndlessConsumer implements Runnable private KafkaConsumer consumer = null; private Future future = null; + private Map> seen; + + public EndlessConsumer( ExecutorService executor, String bootstrapServer, @@ -66,6 +71,8 @@ public class EndlessConsumer implements Runnable log.info("{} - Subscribing to topic {}", id, topic); consumer.subscribe(Arrays.asList(topic)); + seen = new HashMap<>(); + while (true) { ConsumerRecords records = @@ -85,6 +92,21 @@ public class EndlessConsumer implements Runnable record.key(), record.value() ); + + Integer partition = record.partition(); + String key = record.key(); + + if (!seen.containsKey(partition)) + seen.put(partition, new HashMap<>()); + + Map byKey = seen.get(partition); + + if (!byKey.containsKey(key)) + byKey.put(key, 0); + + int seenByKey = byKey.get(key); + seenByKey++; + byKey.put(key, seenByKey); } } } @@ -101,10 +123,30 @@ public class EndlessConsumer implements Runnable { log.info("{} - Closing the KafkaConsumer", id); consumer.close(); + + for (Integer partition : seen.keySet()) + { + Map byKey = seen.get(partition); + for (String key : byKey.keySet()) + { + log.info( + "{} - Seen {} messages for partition={}|key={}", + id, + byKey.get(key), + partition, + key); + } + } + seen = null; + log.info("{} - Consumer-Thread exiting", id); } } + public Map> getSeen() + { + return seen; + } public synchronized void start() {