X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationRebalanceListener.java;h=a89c633c760c55e9ab36708b90b483cb2d96e718;hb=bfddb34a846a27a477d97eaa4db9221afbd6dbba;hp=32e14e8159774f1421cb65a045939ac27e7e0e28;hpb=1c6d263c619010d23bf502c14dda45db11a2baf6;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java index 32e14e8..a89c633 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java +++ b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java @@ -38,6 +38,15 @@ public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRe .findById(Integer.toString(partition)) .orElse(new StateDocument(partition)); recordHandler.addPartition(partition, document.state); + for (String user : document.state.keySet()) + { + log.info( + "{} - Restored state for partition={}|user={}: {}", + id, + partition, + user, + document.state.get(user)); + } adderResults.addPartition(partition, document.results); }); } @@ -51,14 +60,14 @@ public class ApplicationRebalanceListener implements PollIntervalAwareConsumerRe log.info("{} - removing partition: {}", id, partition); this.partitions.remove(partition); Map state = recordHandler.removePartition(partition); - for (String key : state.keySet()) + for (String user : state.keySet()) { log.info( - "{} - Seen {} messages for partition={}|key={}", + "{} - Saved state for partition={}|user={}: {}", id, - state.get(key), partition, - key); + user, + state.get(user)); } Map> results = adderResults.removePartition(partition); stateRepository.save(new StateDocument(partition, state, results));