X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationRebalanceListener.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationRebalanceListener.java;h=e214a14b763fb070b92d5da41d5b1ca6d3f3a1f0;hb=dd9103fdedc432ba861abeaf5cfb4acb66749f15;hp=0bfee676e7140ad6bb4c4aad87deca36a2369436;hpb=0c89ddec8156e2f44fe28c9d05fe06f548e9168e;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java index 0bfee67..e214a14 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java +++ b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java @@ -26,22 +26,6 @@ public class ApplicationRebalanceListener implements ConsumerRebalanceListener { Integer partition = tp.partition(); log.info("{} - adding partition: {}", id, partition); - this.partitions.add(partition); - StateDocument document = - stateRepository - .findById(Integer.toString(partition)) - .orElse(new StateDocument(partition)); - recordHandler.addPartition(partition, document.state); - for (String user : document.state.keySet()) - { - log.info( - "{} - Restored state for partition={}|user={}: {}", - id, - partition, - user, - document.state.get(user)); - } - adderResults.addPartition(partition, document.results); }); } @@ -52,8 +36,7 @@ public class ApplicationRebalanceListener implements ConsumerRebalanceListener { Integer partition = tp.partition(); log.info("{} - removing partition: {}", id, partition); - this.partitions.remove(partition); - Map state = recordHandler.removePartition(partition); + Map state = recordHandler.getState(partition).getState(); for (String user : state.keySet()) { log.info( @@ -63,8 +46,6 @@ public class ApplicationRebalanceListener implements ConsumerRebalanceListener user, state.get(user)); } - Map> results = adderResults.removePartition(partition); - stateRepository.save(new StateDocument(partition, state, results)); }); } }