X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FEndlessConsumer.java;h=2310ccd7fe7306f4b0e7f5f5081fdd16c5846380;hb=f9c0ba7779552d8fcfc9cb29c8b689e20c314904;hp=adebff1045a649b9306198ff852c279c2d85939a;hpb=af66d390cd0a7f26e653cc129a89a63d11d8a7b6;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index adebff1..2310ccd 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -10,6 +10,8 @@ import org.apache.kafka.common.serialization.StringDeserializer; import javax.annotation.PreDestroy; import java.time.Duration; import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; import java.util.Optional; import java.util.Properties; import java.util.concurrent.ExecutionException; @@ -37,6 +39,9 @@ public class EndlessConsumer implements Runnable private KafkaConsumer consumer = null; + private Map> seen; + + public EndlessConsumer( ExecutorService executor, String bootstrapServer, @@ -63,6 +68,7 @@ public class EndlessConsumer implements Runnable props.put("group.id", groupId); props.put("client.id", id); props.put("auto.offset.reset", autoOffsetReset); + props.put("metadata.max.age.ms", "1000"); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); @@ -71,6 +77,8 @@ public class EndlessConsumer implements Runnable log.info("{} - Subscribing to topic {}", id, topic); consumer.subscribe(Arrays.asList(topic)); + seen = new HashMap<>(); + while (true) { ConsumerRecords records = @@ -90,6 +98,21 @@ public class EndlessConsumer implements Runnable record.key(), record.value() ); + + Integer partition = record.partition(); + String key = record.key() == null ? "NULL" : record.key(); + + if (!seen.containsKey(partition)) + seen.put(partition, new HashMap<>()); + + Map byKey = seen.get(partition); + + if (!byKey.containsKey(key)) + byKey.put(key, 0); + + int seenByKey = byKey.get(key); + seenByKey++; + byKey.put(key, seenByKey); } } } @@ -107,10 +130,31 @@ public class EndlessConsumer implements Runnable { log.info("{} - Closing the KafkaConsumer", id); consumer.close(); + + for (Integer partition : seen.keySet()) + { + Map byKey = seen.get(partition); + for (String key : byKey.keySet()) + { + log.info( + "{} - Seen {} messages for partition={}|key={}", + id, + byKey.get(key), + partition, + key); + } + } + seen = null; + log.info("{} - Consumer-Thread exiting", id); } } + public Map> getSeen() + { + return seen; + } + private void shutdown() { shutdown(null);