X-Git-Url: http://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationRebalanceListener.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2FApplicationRebalanceListener.java;h=0000000000000000000000000000000000000000;hb=1bf30f5890d9ab0a1c7550fe472dec44f486a473;hp=ba1522730280e813ca48416eb7996cf4315abda0;hpb=25c2044064722af20f64651a32e94fb392710bbc;p=demos%2Fkafka%2Ftraining diff --git a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java deleted file mode 100644 index ba15227..0000000 --- a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java +++ /dev/null @@ -1,71 +0,0 @@ -package de.juplo.kafka; - -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.common.TopicPartition; -import org.springframework.kafka.listener.ConsumerAwareRebalanceListener; - -import java.util.*; - - -@RequiredArgsConstructor -@Slf4j -public class ApplicationRebalanceListener implements ConsumerAwareRebalanceListener -{ - private final ApplicationRecordHandler recordHandler; - private final AdderResults adderResults; - private final StateRepository stateRepository; - private final String id; - - private final Set partitions = new HashSet<>(); - - @Override - public void onPartitionsAssigned(Collection partitions) - { - partitions.forEach(tp -> - { - Integer partition = tp.partition(); - log.info("{} - adding partition: {}", id, partition); - this.partitions.add(partition); - StateDocument document = - stateRepository - .findById(Integer.toString(partition)) - .orElse(new StateDocument(partition)); - recordHandler.addPartition(partition, document.state); - for (String user : document.state.keySet()) - { - log.info( - "{} - Restored state for partition={}|user={}: {}", - id, - partition, - user, - document.state.get(user)); - } - adderResults.addPartition(partition, document.results); - }); - } - - @Override - public void onPartitionsRevoked(Collection partitions) - { - partitions.forEach(tp -> - { - Integer partition = tp.partition(); - log.info("{} - removing partition: {}", id, partition); - this.partitions.remove(partition); - Map state = recordHandler.removePartition(partition); - for (String user : state.keySet()) - { - log.info( - "{} - Saved state for partition={}|user={}: {}", - id, - partition, - user, - state.get(user)); - } - Map> results = adderResults.removePartition(partition); - stateRepository.save(new StateDocument(partition, state, results)); - }); - } -}