X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FEndlessConsumer.java;h=a21dd86c925b028c8ac094f1e78713c2beb52405;hb=83a4bf324f5a7ec6010a7921118ec7d6e8f997cf;hp=8802df9326f23698713e25c5ca463278823a060f;hpb=28b729e55e2e0914b06c1dcb0a5defd9bcef4933;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 8802df9..a21dd86 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -22,6 +22,7 @@ import java.util.concurrent.locks.ReentrantLock; public class EndlessConsumer implements ConsumerRebalanceListener, Runnable { private final ExecutorService executor; + private final PartitionStatisticsRepository repository; private final String id; private final String topic; private final Consumer consumer; @@ -62,6 +63,7 @@ public class EndlessConsumer implements ConsumerRebalanceListener, Runnabl partition, key); } + repository.save(new StatisticsDocument(partition, removed)); }); } @@ -74,7 +76,12 @@ public class EndlessConsumer implements ConsumerRebalanceListener, Runnabl Long offset = consumer.position(tp); log.info("{} - adding partition: {}, offset={}", id, partition, offset); offsets.put(partition, offset); - seen.put(partition, new HashMap<>()); + seen.put( + partition, + repository + .findById(Integer.toString(tp.partition())) + .map(document -> document.statistics) + .orElse(new HashMap<>())); }); }