WIP
[demos/kafka/training] / src / main / java / de / juplo / kafka / SumRebalanceListener.java
index be752ae..83cb759 100644 (file)
@@ -43,7 +43,7 @@ public class SumRebalanceListener implements PollIntervalAwareConsumerRebalanceL
         // Otherwise: Use initial offset, generated by Kafka
         consumer.seek(tp, document.offset);
       }
-      handler.addPartition(partition, document);
+      handler.addPartition(partition, document.state);
     });
   }
 
@@ -59,7 +59,7 @@ public class SumRebalanceListener implements PollIntervalAwareConsumerRebalanceL
           id,
           partition,
           newOffset);
-      repository.save(handler.removePartition(partition));
+      repository.save(new StateDocument(partition, handler.removePartition(partition), newOffset));
     });
   }
 
@@ -70,10 +70,10 @@ public class SumRebalanceListener implements PollIntervalAwareConsumerRebalanceL
     if (lastCommit.plus(commitInterval).isBefore(clock.instant()))
     {
       log.debug("Storing data and offsets, last commit: {}", lastCommit);
-      handler.getSeen().forEach((partiton, statistics) -> repository.save(
+      handler.getState().forEach((partiton, sumBusinessLogic) -> repository.save(
           new StateDocument(
               partiton,
-              statistics,
+              sumBusinessLogic.getState(),
               consumer.position(new TopicPartition(topic, partiton)))));
       lastCommit = clock.instant();
     }