X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FEndlessConsumer.java;h=063a09ebcc3e9ced137fda80af7d962c6f849580;hb=808bd074aaae940e5f81bc9f09c42d48d1fd2670;hp=da2f8f0c29653a2420c1769c0db75cbc6db872c3;hpb=31530b8f210b923f5e010104b8a514a2060e7665;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index da2f8f0..063a09e 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -10,6 +10,8 @@ import org.apache.kafka.common.serialization.StringDeserializer; import javax.annotation.PreDestroy; import java.time.Duration; import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -25,24 +27,30 @@ public class EndlessConsumer implements Runnable private final String groupId; private final String id; private final String topic; + private final String autoOffsetReset; private AtomicBoolean running = new AtomicBoolean(); private long consumed = 0; private KafkaConsumer consumer = null; private Future future = null; + private Map> seen; + + public EndlessConsumer( ExecutorService executor, String bootstrapServer, String groupId, String clientId, - String topic) + String topic, + String autoOffsetReset) { this.executor = executor; this.bootstrapServer = bootstrapServer; this.groupId = groupId; this.id = clientId; this.topic = topic; + this.autoOffsetReset = autoOffsetReset; } @Override @@ -54,7 +62,7 @@ public class EndlessConsumer implements Runnable props.put("bootstrap.servers", bootstrapServer); props.put("group.id", groupId); props.put("client.id", id); - props.put("auto.offset.reset", "earliest"); + props.put("auto.offset.reset", autoOffsetReset); props.put("key.deserializer", StringDeserializer.class.getName()); props.put("value.deserializer", StringDeserializer.class.getName()); @@ -63,6 +71,8 @@ public class EndlessConsumer implements Runnable log.info("{} - Subscribing to topic {}", id, topic); consumer.subscribe(Arrays.asList(topic)); + seen = new HashMap<>(); + while (true) { ConsumerRecords records = @@ -82,6 +92,21 @@ public class EndlessConsumer implements Runnable record.key(), record.value() ); + + Integer partition = record.partition(); + String key = record.key(); + + if (!seen.containsKey(partition)) + seen.put(partition, new HashMap<>()); + + Map byKey = seen.get(partition); + + if (!byKey.containsKey(key)) + byKey.put(key, 0); + + int seenByKey = byKey.get(key); + seenByKey++; + byKey.put(key, seenByKey); } } } @@ -98,10 +123,30 @@ public class EndlessConsumer implements Runnable { log.info("{} - Closing the KafkaConsumer", id); consumer.close(); + + for (Integer partition : seen.keySet()) + { + Map byKey = seen.get(partition); + for (String key : byKey.keySet()) + { + log.info( + "{} - Seen {} messages for partition={}|key={}", + id, + byKey.get(key), + partition, + key); + } + } + seen = null; + log.info("{} - Consumer-Thread exiting", id); } } + public Map> getSeen() + { + return seen; + } public synchronized void start() {