From a6a0a22a5fa34a01b0e8b2bc1e0e2b82d7b60f33 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 24 Jul 2022 21:34:43 +0200 Subject: [PATCH] Wordcount-Implementierung mit Kafka-Boardmitteln und MongoDB als Storage MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Zählt die Wörter pro Benutzer. * Simple Implementierung mit Maps. * Verwendet die bereits für das Speichern der Nachrichten-Zählung und der Offsets verwendete MonogoDB-Anbindung zum speichern. * Typisierung zurückgenommn: Immer String für Key/Value * Verwendet aus Bequemlichkeit den Seen-Endpoint von der Zählung. --- docker-compose.yml | 8 +-- pom.xml | 5 +- .../juplo/kafka/ApplicationConfiguration.java | 17 +---- .../kafka/ApplicationHealthIndicator.java | 2 +- .../java/de/juplo/kafka/DriverController.java | 22 +++++-- .../java/de/juplo/kafka/EndlessConsumer.java | 62 ++++++++++--------- .../de/juplo/kafka/StatisticsDocument.java | 4 +- .../java/de/juplo/kafka/ApplicationTests.java | 7 +-- 8 files changed, 65 insertions(+), 62 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 0e420b6..df41cb5 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -56,7 +56,7 @@ services: command: sleep infinity producer: - image: juplo/endless-producer:1.0-SNAPSHOT + image: juplo/rest-producer:1.0-SNAPSHOT ports: - 8080:8080 environment: @@ -64,11 +64,9 @@ services: producer.bootstrap-server: kafka:9092 producer.client-id: producer producer.topic: test - producer.throttle-ms: 500 - peter: - image: juplo/endless-consumer:1.0-SNAPSHOT + image: juplo/wordcount:1.0-SNAPSHOT ports: - 8081:8080 environment: @@ -80,7 +78,7 @@ services: spring.data.mongodb.database: juplo beate: - image: juplo/endless-consumer:1.0-SNAPSHOT + image: juplo/wordcount:1.0-SNAPSHOT ports: - 8082:8080 environment: diff --git a/pom.xml b/pom.xml index 701704d..fe06959 100644 --- a/pom.xml +++ b/pom.xml @@ -12,9 +12,10 @@ de.juplo.kafka - endless-consumer + wordcount 1.0-SNAPSHOT - Endless Consumer: a Simple Consumer-Group that reads and prints the topic and counts the received messages for each key by topic + Wordcount + Splits the incomming sentences into words and counts the words per user. diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 08c3955..2cf263e 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -19,32 +19,21 @@ import java.util.function.Consumer; public class ApplicationConfiguration { @Bean - public Consumer> consumer() - { - return (record) -> - { - // Handle record - }; - } - - @Bean - public EndlessConsumer endlessConsumer( + public EndlessConsumer endlessConsumer( KafkaConsumer kafkaConsumer, ExecutorService executor, - Consumer> handler, PartitionStatisticsRepository repository, ApplicationProperties properties) { return - new EndlessConsumer<>( + new EndlessConsumer( executor, repository, properties.getClientId(), properties.getTopic(), Clock.systemDefaultZone(), properties.getCommitInterval(), - kafkaConsumer, - handler); + kafkaConsumer); } @Bean diff --git a/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java b/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java index df4e653..ab9782c 100644 --- a/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java +++ b/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java @@ -10,7 +10,7 @@ import org.springframework.stereotype.Component; @RequiredArgsConstructor public class ApplicationHealthIndicator implements HealthIndicator { - private final EndlessConsumer consumer; + private final EndlessConsumer consumer; @Override diff --git a/src/main/java/de/juplo/kafka/DriverController.java b/src/main/java/de/juplo/kafka/DriverController.java index ed38080..e64d6b8 100644 --- a/src/main/java/de/juplo/kafka/DriverController.java +++ b/src/main/java/de/juplo/kafka/DriverController.java @@ -2,11 +2,8 @@ package de.juplo.kafka; import lombok.RequiredArgsConstructor; import org.springframework.http.HttpStatus; -import org.springframework.web.bind.annotation.ExceptionHandler; -import org.springframework.web.bind.annotation.GetMapping; -import org.springframework.web.bind.annotation.PostMapping; -import org.springframework.web.bind.annotation.ResponseStatus; -import org.springframework.web.bind.annotation.RestController; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.*; import java.util.Map; import java.util.concurrent.ExecutionException; @@ -33,11 +30,24 @@ public class DriverController @GetMapping("seen") - public Map> seen() + public Map>> seen() { return consumer.getSeen(); } + @GetMapping("seen/{user}") + public ResponseEntity> seen(@PathVariable String user) + { + for (Map> users : consumer.getSeen().values()) + { + Map words = users.get(user); + if (words != null) + return ResponseEntity.ok(words); + } + + return ResponseEntity.notFound().build(); + } + @ExceptionHandler @ResponseStatus(HttpStatus.BAD_REQUEST) diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index f9a9629..01f9057 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -17,20 +17,23 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.regex.Pattern; @Slf4j @RequiredArgsConstructor -public class EndlessConsumer implements ConsumerRebalanceListener, Runnable +public class EndlessConsumer implements ConsumerRebalanceListener, Runnable { + final static Pattern PATTERN = Pattern.compile("\\W+"); + + private final ExecutorService executor; private final PartitionStatisticsRepository repository; private final String id; private final String topic; private final Clock clock; private final Duration commitInterval; - private final Consumer consumer; - private final java.util.function.Consumer> handler; + private final Consumer consumer; private final Lock lock = new ReentrantLock(); private final Condition condition = lock.newCondition(); @@ -38,7 +41,7 @@ public class EndlessConsumer implements ConsumerRebalanceListener, Runnabl private Exception exception; private long consumed = 0; - private final Map> seen = new HashMap<>(); + private final Map>> seen = new HashMap<>(); @Override @@ -53,16 +56,7 @@ public class EndlessConsumer implements ConsumerRebalanceListener, Runnabl id, partition, newOffset); - Map removed = seen.remove(partition); - for (String key : removed.keySet()) - { - log.info( - "{} - Seen {} messages for partition={}|key={}", - id, - removed.get(key), - partition, - key); - } + Map> removed = seen.remove(partition); repository.save(new StatisticsDocument(partition, removed, consumer.position(tp))); }); } @@ -102,12 +96,12 @@ public class EndlessConsumer implements ConsumerRebalanceListener, Runnabl while (true) { - ConsumerRecords records = + ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); // Do something with the data... log.info("{} - Received {} messages", id, records.count()); - for (ConsumerRecord record : records) + for (ConsumerRecord record : records) { log.info( "{} - {}: {}/{} - {}={}", @@ -119,20 +113,32 @@ public class EndlessConsumer implements ConsumerRebalanceListener, Runnabl record.value() ); - handler.accept(record); - consumed++; Integer partition = record.partition(); - String key = record.key() == null ? "NULL" : record.key().toString(); - Map byKey = seen.get(partition); - - if (!byKey.containsKey(key)) - byKey.put(key, 0l); - - long seenByKey = byKey.get(key); - seenByKey++; - byKey.put(key, seenByKey); + String user = record.key(); + Map> users = seen.get(partition); + + Map words = users.get(user); + if (words == null) + { + words = new HashMap<>(); + users.put(user, words); + } + + for (String word : PATTERN.split(record.value())) + { + Long num = words.get(word); + if (num == null) + { + num = 1l; + } + else + { + num++; + } + words.put(word, num); + } } if (lastCommit.plus(commitInterval).isBefore(clock.instant())) @@ -212,7 +218,7 @@ public class EndlessConsumer implements ConsumerRebalanceListener, Runnabl } } - public Map> getSeen() + public Map>> getSeen() { return seen; } diff --git a/src/main/java/de/juplo/kafka/StatisticsDocument.java b/src/main/java/de/juplo/kafka/StatisticsDocument.java index 1244f45..137c9bb 100644 --- a/src/main/java/de/juplo/kafka/StatisticsDocument.java +++ b/src/main/java/de/juplo/kafka/StatisticsDocument.java @@ -15,7 +15,7 @@ public class StatisticsDocument @Id public String id; public long offset = -1l; - public Map statistics; + public Map> statistics; public StatisticsDocument() { @@ -27,7 +27,7 @@ public class StatisticsDocument this.statistics = new HashMap<>(); } - public StatisticsDocument(Integer partition, Map statistics, long offset) + public StatisticsDocument(Integer partition, Map> statistics, long offset) { this.id = Integer.toString(partition); this.statistics = statistics; diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java index ca72e3c..aa6dd4d 100644 --- a/src/test/java/de/juplo/kafka/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/ApplicationTests.java @@ -73,7 +73,7 @@ class ApplicationTests PartitionStatisticsRepository repository; Consumer> testHandler; - EndlessConsumer endlessConsumer; + EndlessConsumer endlessConsumer; Map oldOffsets; Map newOffsets; Set> receivedRecords; @@ -228,15 +228,14 @@ class ApplicationTests }; endlessConsumer = - new EndlessConsumer<>( + new EndlessConsumer( executor, repository, properties.getClientId(), properties.getTopic(), Clock.systemDefaultZone(), properties.getCommitInterval(), - kafkaConsumer, - captureOffsetAndExecuteTestHandler); + kafkaConsumer); endlessConsumer.start(); } -- 2.20.1