WIP
[demos/kafka/training] / src / main / java / de / juplo / kafka / SumRebalanceListener.java
index 1cd738f..be752ae 100644 (file)
@@ -9,7 +9,6 @@ import java.time.Clock;
 import java.time.Duration;
 import java.time.Instant;
 import java.util.Collection;
-import java.util.Map;
 
 
 @RequiredArgsConstructor
@@ -34,17 +33,17 @@ public class SumRebalanceListener implements PollIntervalAwareConsumerRebalanceL
       Integer partition = tp.partition();
       Long offset = consumer.position(tp);
       log.info("{} - adding partition: {}, offset={}", id, partition, offset);
-      StatisticsDocument document =
+      StateDocument document =
           repository
               .findById(Integer.toString(partition))
-              .orElse(new StatisticsDocument(partition));
+              .orElse(new StateDocument(partition));
       if (document.offset >= 0)
       {
         // Only seek, if a stored offset was found
         // Otherwise: Use initial offset, generated by Kafka
         consumer.seek(tp, document.offset);
       }
-      handler.addPartition(partition, document.statistics);
+      handler.addPartition(partition, document);
     });
   }
 
@@ -60,8 +59,7 @@ public class SumRebalanceListener implements PollIntervalAwareConsumerRebalanceL
           id,
           partition,
           newOffset);
-      Map<String, Map<String, Long>> removed = handler.removePartition(partition);
-      repository.save(new StatisticsDocument(partition, removed, consumer.position(tp)));
+      repository.save(handler.removePartition(partition));
     });
   }
 
@@ -73,7 +71,7 @@ public class SumRebalanceListener implements PollIntervalAwareConsumerRebalanceL
     {
       log.debug("Storing data and offsets, last commit: {}", lastCommit);
       handler.getSeen().forEach((partiton, statistics) -> repository.save(
-          new StatisticsDocument(
+          new StateDocument(
               partiton,
               statistics,
               consumer.position(new TopicPartition(topic, partiton)))));