From: Kai Moritz Date: Sun, 24 Jul 2022 13:35:14 +0000 (+0200) Subject: Merge der Refaktorisierung des EndlessConsumer (Branch 'stored-state') X-Git-Tag: wip-DEPRECATED~11 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=007ec5a0cb23a52abe56776d73648b0f283a24a6;p=demos%2Fkafka%2Ftraining Merge der Refaktorisierung des EndlessConsumer (Branch 'stored-state') --- 007ec5a0cb23a52abe56776d73648b0f283a24a6 diff --cc src/main/java/de/juplo/kafka/EndlessConsumer.java index c79b1e0,a21dd86..366a3c2 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@@ -35,30 -33,59 +33,58 @@@ public class EndlessConsumer impl private boolean running = false; private Exception exception; private long consumed = 0; - private KafkaConsumer consumer = null; + private final Map> seen = new HashMap<>(); + private final Map offsets = new HashMap<>(); - private final Map> seen = new HashMap<>(); + @Override + public void onPartitionsRevoked(Collection partitions) + { + partitions.forEach(tp -> + { + Integer partition = tp.partition(); + Long newOffset = consumer.position(tp); + Long oldOffset = offsets.remove(partition); + log.info( + "{} - removing partition: {}, consumed {} records (offset {} -> {})", + id, + partition, + newOffset - oldOffset, + oldOffset, + newOffset); + Map removed = seen.remove(partition); + for (String key : removed.keySet()) + { + log.info( + "{} - Seen {} messages for partition={}|key={}", + id, + removed.get(key), + partition, + key); + } - repository.save(new StatisticsDocument(partition, removed)); ++ repository.save(new StatisticsDocument(partition, removed, consumer.position(tp))); + }); + } - public EndlessConsumer( - ExecutorService executor, - PartitionStatisticsRepository repository, - String bootstrapServer, - String groupId, - String clientId, - String topic, - String autoOffsetReset) + @Override + public void onPartitionsAssigned(Collection partitions) { - this.executor = executor; - this.repository = repository; - this.bootstrapServer = bootstrapServer; - this.groupId = groupId; - this.id = clientId; - this.topic = topic; - this.autoOffsetReset = autoOffsetReset; + partitions.forEach(tp -> + { + Integer partition = tp.partition(); + Long offset = consumer.position(tp); + log.info("{} - adding partition: {}, offset={}", id, partition, offset); - offsets.put(partition, offset); - seen.put( - partition, ++ StatisticsDocument document = + repository - .findById(Integer.toString(tp.partition())) - .map(document -> document.statistics) - .orElse(new HashMap<>())); ++ .findById(Integer.toString(partition)) ++ .orElse(new StatisticsDocument(partition)); ++ consumer.seek(tp, document.offset); ++ seen.put(partition, document.statistics); + }); } + @Override public void run() { diff --cc src/main/java/de/juplo/kafka/StatisticsDocument.java index 96ebfb1,2416253..28264ec --- a/src/main/java/de/juplo/kafka/StatisticsDocument.java +++ b/src/main/java/de/juplo/kafka/StatisticsDocument.java @@@ -14,20 -14,13 +14,20 @@@ public class StatisticsDocumen { @Id public String id; + public long offset; - public Map statistics; + public Map statistics; public StatisticsDocument() { } - public StatisticsDocument(Integer partition, Map statistics) + public StatisticsDocument(Integer partition) + { + this.id = Integer.toString(partition); + this.statistics = new HashMap<>(); + } + - public StatisticsDocument(Integer partition, Map statistics, long offset) ++ public StatisticsDocument(Integer partition, Map statistics, long offset) { this.id = Integer.toString(partition); this.statistics = statistics;