Code an die anderen Implementierungen angepasst
authorKai Moritz <kai@juplo.de>
Sat, 3 Sep 2022 06:29:53 +0000 (08:29 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 4 Sep 2022 05:50:20 +0000 (07:50 +0200)
src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java

index 8e8464f..f3e6c8a 100644 (file)
@@ -74,15 +74,15 @@ public class ApplicationRebalanceListener implements RebalanceListener
       Integer partition = tp.partition();
       log.info("{} - removing partition: {}", id, partition);
       this.partitions.remove(partition);
-      Long offset = consumer.position(tp);
       if (commitsEnabled)
       {
+        Map<String, AdderResult> state = recordHandler.removePartition(partition);
+        Long offset = consumer.position(tp);
         log.info(
-            "{} - Storing {} as offset of next message for partition {}",
+            "{} - offset of next unseen message for partition {} is {}",
             id,
-            offset,
-            partition);
-        Map<String, AdderResult> state = recordHandler.removePartition(partition);
+            partition,
+            offset);
         for (String user : state.keySet())
         {
           log.info(
@@ -114,13 +114,31 @@ public class ApplicationRebalanceListener implements RebalanceListener
 
     if (lastCommit.plus(commitInterval).isBefore(clock.instant()))
     {
-      log.debug("{} - Storing data and offsets, last commit: {}", id, lastCommit);
-      partitions.forEach(partition -> stateRepository.save(
-          new StateDocument(
+      partitions
+        .stream()
+        .forEach(partition ->
+        {
+          log.info("{} - persisting state & offset for partition: {}", id, partition);
+          Map<String, AdderResult> state = recordHandler.getState(partition).getState();
+          Long offset = consumer.position(new TopicPartition(topic, partition));
+          log.info(
+            "{} - offset of next unseen message for partition {} is {}",
+            id,
+            partition,
+            offset);
+          for (String user : state.keySet())
+          {
+            log.info(
+              "{} - Saved state for partition={}|user={}: {}",
+              id,
               partition,
-              recordHandler.getState(partition).getState(),
-              adderResults.getState(partition),
-              consumer.position(new TopicPartition(topic, partition)))));
+              user,
+              state.get(user));
+          }
+          Map<String, List<AdderResult>> results = adderResults.getState(partition);
+          stateRepository.save(new StateDocument(partition, state, results, offset));
+        });
+
       lastCommit = clock.instant();
     }
   }