From 5f35354fd694f78599d66ee9e01fb4c0d89cc5bb Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 13 Aug 2022 16:53:54 +0200 Subject: [PATCH] WIP --- .../java/de/juplo/kafka/DriverController.java | 17 +++--- .../kafka/PartitionStatisticsRepository.java | 4 +- ...isticsDocument.java => StateDocument.java} | 22 ++++--- .../java/de/juplo/kafka/SumBusinessLogic.java | 5 ++ .../de/juplo/kafka/SumRebalanceListener.java | 12 ++-- .../java/de/juplo/kafka/SumRecordHandler.java | 57 +++++++++---------- .../java/de/juplo/kafka/ApplicationTests.java | 4 +- 7 files changed, 63 insertions(+), 58 deletions(-) rename src/main/java/de/juplo/kafka/{StatisticsDocument.java => StateDocument.java} (50%) diff --git a/src/main/java/de/juplo/kafka/DriverController.java b/src/main/java/de/juplo/kafka/DriverController.java index 5a09c1b..fdae76f 100644 --- a/src/main/java/de/juplo/kafka/DriverController.java +++ b/src/main/java/de/juplo/kafka/DriverController.java @@ -5,6 +5,7 @@ import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.*; +import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; @@ -14,7 +15,7 @@ import java.util.concurrent.ExecutionException; public class DriverController { private final EndlessConsumer consumer; - private final SumRecordHandler wordcount; + private final SumRecordHandler sumRecordHandler; @PostMapping("start") @@ -31,19 +32,19 @@ public class DriverController @GetMapping("seen") - public Map>> seen() + public Map>> seen() { - return wordcount.getSeen(); + return sumRecordHandler.getSeen(); } @GetMapping("seen/{user}") - public ResponseEntity> seen(@PathVariable String user) + public ResponseEntity> seen(@PathVariable String user) { - for (Map> users : wordcount.getSeen().values()) + for (Map> users : sumRecordHandler.getSeen().values()) { - Map words = users.get(user); - if (words != null) - return ResponseEntity.ok(words); + List results = users.get(user); + if (results != null) + return ResponseEntity.ok(results); } return ResponseEntity.notFound().build(); diff --git a/src/main/java/de/juplo/kafka/PartitionStatisticsRepository.java b/src/main/java/de/juplo/kafka/PartitionStatisticsRepository.java index 0ccf3cd..9e26410 100644 --- a/src/main/java/de/juplo/kafka/PartitionStatisticsRepository.java +++ b/src/main/java/de/juplo/kafka/PartitionStatisticsRepository.java @@ -5,7 +5,7 @@ import org.springframework.data.mongodb.repository.MongoRepository; import java.util.Optional; -public interface PartitionStatisticsRepository extends MongoRepository +public interface PartitionStatisticsRepository extends MongoRepository { - public Optional findById(String partition); + public Optional findById(String partition); } diff --git a/src/main/java/de/juplo/kafka/StatisticsDocument.java b/src/main/java/de/juplo/kafka/StateDocument.java similarity index 50% rename from src/main/java/de/juplo/kafka/StatisticsDocument.java rename to src/main/java/de/juplo/kafka/StateDocument.java index 137c9bb..52968cd 100644 --- a/src/main/java/de/juplo/kafka/StatisticsDocument.java +++ b/src/main/java/de/juplo/kafka/StateDocument.java @@ -5,32 +5,38 @@ import org.springframework.data.annotation.Id; import org.springframework.data.mongodb.core.mapping.Document; import java.util.HashMap; +import java.util.List; import java.util.Map; @Document(collection = "statistics") @ToString -public class StatisticsDocument +public class StateDocument { @Id public String id; public long offset = -1l; - public Map> statistics; + public Map state; + public Map> seen; - public StatisticsDocument() + public StateDocument() { } - public StatisticsDocument(Integer partition) + public StateDocument(Integer partition) { this.id = Integer.toString(partition); - this.statistics = new HashMap<>(); + this.state = new HashMap<>(); + this.seen = new HashMap<>(); } - public StatisticsDocument(Integer partition, Map> statistics, long offset) + public StateDocument( + Integer partition, + Map state, + Map> seen) { this.id = Integer.toString(partition); - this.statistics = statistics; - this.offset = offset; + this.state = state; + this.seen = seen; } } diff --git a/src/main/java/de/juplo/kafka/SumBusinessLogic.java b/src/main/java/de/juplo/kafka/SumBusinessLogic.java index 74696a4..27ddebb 100644 --- a/src/main/java/de/juplo/kafka/SumBusinessLogic.java +++ b/src/main/java/de/juplo/kafka/SumBusinessLogic.java @@ -52,4 +52,9 @@ public class SumBusinessLogic return state.get(user); } + + protected Map getState() + { + return state; + } } diff --git a/src/main/java/de/juplo/kafka/SumRebalanceListener.java b/src/main/java/de/juplo/kafka/SumRebalanceListener.java index 1cd738f..be752ae 100644 --- a/src/main/java/de/juplo/kafka/SumRebalanceListener.java +++ b/src/main/java/de/juplo/kafka/SumRebalanceListener.java @@ -9,7 +9,6 @@ import java.time.Clock; import java.time.Duration; import java.time.Instant; import java.util.Collection; -import java.util.Map; @RequiredArgsConstructor @@ -34,17 +33,17 @@ public class SumRebalanceListener implements PollIntervalAwareConsumerRebalanceL Integer partition = tp.partition(); Long offset = consumer.position(tp); log.info("{} - adding partition: {}, offset={}", id, partition, offset); - StatisticsDocument document = + StateDocument document = repository .findById(Integer.toString(partition)) - .orElse(new StatisticsDocument(partition)); + .orElse(new StateDocument(partition)); if (document.offset >= 0) { // Only seek, if a stored offset was found // Otherwise: Use initial offset, generated by Kafka consumer.seek(tp, document.offset); } - handler.addPartition(partition, document.statistics); + handler.addPartition(partition, document); }); } @@ -60,8 +59,7 @@ public class SumRebalanceListener implements PollIntervalAwareConsumerRebalanceL id, partition, newOffset); - Map> removed = handler.removePartition(partition); - repository.save(new StatisticsDocument(partition, removed, consumer.position(tp))); + repository.save(handler.removePartition(partition)); }); } @@ -73,7 +71,7 @@ public class SumRebalanceListener implements PollIntervalAwareConsumerRebalanceL { log.debug("Storing data and offsets, last commit: {}", lastCommit); handler.getSeen().forEach((partiton, statistics) -> repository.save( - new StatisticsDocument( + new StateDocument( partiton, statistics, consumer.position(new TopicPartition(topic, partiton))))); diff --git a/src/main/java/de/juplo/kafka/SumRecordHandler.java b/src/main/java/de/juplo/kafka/SumRecordHandler.java index 82ada38..d4ec38f 100644 --- a/src/main/java/de/juplo/kafka/SumRecordHandler.java +++ b/src/main/java/de/juplo/kafka/SumRecordHandler.java @@ -4,17 +4,14 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import java.util.HashMap; +import java.util.List; import java.util.Map; -import java.util.regex.Pattern; - @Slf4j public class SumRecordHandler implements RecordHandler { - final static Pattern PATTERN = Pattern.compile("\\W+"); - - - private final Map>> seen = new HashMap<>(); + private final Map>> seen = new HashMap<>(); + private final Map state = new HashMap<>(); @Override @@ -22,42 +19,40 @@ public class SumRecordHandler implements RecordHandler { Integer partition = record.partition(); String user = record.key(); - Map> users = seen.get(partition); - - Map words = users.get(user); - if (words == null) - { - words = new HashMap<>(); - users.put(user, words); - } - - for (String word : PATTERN.split(record.value())) + String message = record.value(); + switch (message) { - Long num = words.get(word); - if (num == null) - { - num = 1l; - } - else - { - num++; - } - words.put(word, num); + case "START": + state.get(partition).startSum(user); + return; + + case "END": + Long result = state.get(partition).endSum(user); + log.info("New result for {}: {}", user, result); + return; + + default: + state.get(partition).addToSum(user, Integer.parseInt(message)); + return; } } - public void addPartition(Integer partition, Map> statistics) + protected void addPartition(Integer partition, StateDocument document) { - seen.put(partition, statistics); + this.seen.put(partition, document.seen); + this.state.put(partition, new SumBusinessLogic(document.state)); } - public Map> removePartition(Integer partition) + protected StateDocument removePartition(Integer partition) { - return seen.remove(partition); + return new StateDocument( + partition, + this.state.remove(partition).getState(), + this.seen.remove(partition)); } - public Map>> getSeen() + public Map>> getSeen() { return seen; } diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index 09614b8..f19bfb1 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -156,10 +156,10 @@ class ApplicationTests Long offset = offsetConsumer.position(tp); log.info("New position for {}: {}", tp, offset); Integer partition = tp.partition(); - StatisticsDocument document = + StateDocument document = partitionStatisticsRepository .findById(partition.toString()) - .orElse(new StatisticsDocument(partition)); + .orElse(new StateDocument(partition)); document.offset = offset; partitionStatisticsRepository.save(document); }); -- 2.20.1