Verbesserte Tests und Korrekturen gemerged: stored-offsets -> stored-state
[demos/kafka/training] / src / main / java / de / juplo / kafka / KeyCountingRebalanceListener.java
diff --git a/src/main/java/de/juplo/kafka/KeyCountingRebalanceListener.java b/src/main/java/de/juplo/kafka/KeyCountingRebalanceListener.java
deleted file mode 100644 (file)
index 636ff86..0000000
+++ /dev/null
@@ -1,76 +0,0 @@
-package de.juplo.kafka;
-
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.common.TopicPartition;
-
-import java.time.Clock;
-import java.time.Duration;
-import java.time.Instant;
-import java.util.Collection;
-import java.util.Map;
-
-
-@RequiredArgsConstructor
-@Slf4j
-public class KeyCountingRebalanceListener implements PollIntervalAwareConsumerRebalanceListener
-{
-  private final KeyCountingRecordHandler handler;
-  private final PartitionStatisticsRepository repository;
-  private final String id;
-  private final Clock clock;
-  private final Duration commitInterval;
-
-  private Instant lastCommit = Instant.EPOCH;
-
-  @Override
-  public void onPartitionsAssigned(Collection<TopicPartition> partitions)
-  {
-    partitions.forEach(tp ->
-    {
-      Integer partition = tp.partition();
-      log.info("{} - adding partition: {}", id, partition);
-      StatisticsDocument document =
-          repository
-              .findById(Integer.toString(partition))
-              .orElse(new StatisticsDocument(partition));
-      handler.addPartition(partition, document.statistics);
-    });
-  }
-
-  @Override
-  public void onPartitionsRevoked(Collection<TopicPartition> partitions)
-  {
-    partitions.forEach(tp ->
-    {
-      Integer partition = tp.partition();
-      log.info("{} - removing partition: {}", id, partition);
-      Map<String, Long> removed = handler.removePartition(partition);
-      for (String key : removed.keySet())
-      {
-        log.info(
-            "{} - Seen {} messages for partition={}|key={}",
-            id,
-            removed.get(key),
-            partition,
-            key);
-      }
-      repository.save(new StatisticsDocument(partition, removed));
-    });
-  }
-
-
-  @Override
-  public void beforeNextPoll()
-  {
-    if (lastCommit.plus(commitInterval).isBefore(clock.instant()))
-    {
-      log.debug("Storing data, last commit: {}", lastCommit);
-      handler.getSeen().forEach((partiton, statistics) -> repository.save(
-          new StatisticsDocument(
-              partiton,
-              statistics)));
-      lastCommit = clock.instant();
-    }
-  }
-}