Log-Meldungen zu gespeichertem und wiederhergestelltem Zustand
[demos/kafka/training] / src / main / java / de / juplo / kafka / ApplicationRebalanceListener.java
index 32e14e8..a89c633 100644 (file)
@@ -38,6 +38,15 @@ public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRe
               .findById(Integer.toString(partition))
               .orElse(new StateDocument(partition));
       recordHandler.addPartition(partition, document.state);
+      for (String user : document.state.keySet())
+      {
+        log.info(
+            "{} - Restored state for partition={}|user={}: {}",
+            id,
+            partition,
+            user,
+            document.state.get(user));
+      }
       adderResults.addPartition(partition, document.results);
     });
   }
@@ -51,14 +60,14 @@ public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRe
       log.info("{} - removing partition: {}", id, partition);
       this.partitions.remove(partition);
       Map<String, AdderResult> state = recordHandler.removePartition(partition);
-      for (String key : state.keySet())
+      for (String user : state.keySet())
       {
         log.info(
-            "{} - Seen {} messages for partition={}|key={}",
+            "{} - Saved state for partition={}|user={}: {}",
             id,
-            state.get(key),
             partition,
-            key);
+            user,
+            state.get(user));
       }
       Map<String, List<AdderResult>> results = adderResults.removePartition(partition);
       stateRepository.save(new StateDocument(partition, state, results));