From 4b94d31fbd663cb277276def106be9873ec4a246 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 4 Sep 2021 17:20:44 +0200 Subject: [PATCH] top10:1.0.0 - ranks first 10 mostly used words, keyed by user --- pom.xml | 6 +-- .../counter/CounterApplicationProperties.java | 20 ------- .../de/juplo/kafka/wordcount/counter/Key.java | 11 ---- .../de/juplo/kafka/wordcount/top10/Entry.java | 11 ++++ .../de/juplo/kafka/wordcount/top10/Key.java | 13 +++++ .../juplo/kafka/wordcount/top10/Ranking.java | 53 +++++++++++++++++++ .../Top10Application.java} | 8 +-- .../top10/Top10ApplicationProperties.java | 20 +++++++ .../Top10StreamProcessor.java} | 42 ++++++++++----- 9 files changed, 132 insertions(+), 52 deletions(-) delete mode 100644 src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationProperties.java delete mode 100644 src/main/java/de/juplo/kafka/wordcount/counter/Key.java create mode 100644 src/main/java/de/juplo/kafka/wordcount/top10/Entry.java create mode 100644 src/main/java/de/juplo/kafka/wordcount/top10/Key.java create mode 100644 src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java rename src/main/java/de/juplo/kafka/wordcount/{counter/CounterApplication.java => top10/Top10Application.java} (58%) create mode 100644 src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationProperties.java rename src/main/java/de/juplo/kafka/wordcount/{counter/CounterStreamProcessor.java => top10/Top10StreamProcessor.java} (71%) diff --git a/pom.xml b/pom.xml index 71115c0..9bda638 100644 --- a/pom.xml +++ b/pom.xml @@ -9,10 +9,10 @@ de.juplo.kafka.wordcount - counter + top10 1.0.0 - Wordcount-Counter - Word-counting stream-processor of the multi-user wordcount-example + Wordcount-Top-10 + Top-10 stream-processor of the multi-user wordcount-example 0.33.0 11 diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationProperties.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationProperties.java deleted file mode 100644 index d670ba2..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplicationProperties.java +++ /dev/null @@ -1,20 +0,0 @@ -package de.juplo.kafka.wordcount.counter; - - -import lombok.Getter; -import lombok.Setter; -import lombok.ToString; -import org.springframework.boot.context.properties.ConfigurationProperties; - - -@ConfigurationProperties("juplo.wordcount.counter") -@Getter -@Setter -@ToString -public class CounterApplicationProperties -{ - private String bootstrapServer = "localhost:9092"; - private String applicationId = "counter"; - private String inputTopic = "recordings"; - private String outputTopic = "countings"; -} diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/Key.java b/src/main/java/de/juplo/kafka/wordcount/counter/Key.java deleted file mode 100644 index 1e00dca..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/counter/Key.java +++ /dev/null @@ -1,11 +0,0 @@ -package de.juplo.kafka.wordcount.counter; - -import lombok.Value; - - -@Value(staticConstructor = "of") -public class Key -{ - private final String username; - private final String word; -} diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Entry.java b/src/main/java/de/juplo/kafka/wordcount/top10/Entry.java new file mode 100644 index 0000000..67f45f2 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Entry.java @@ -0,0 +1,11 @@ +package de.juplo.kafka.wordcount.top10; + +import lombok.Value; + + +@Value(staticConstructor = "of") +public class Entry +{ + private final String word; + private final Long count; +} diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Key.java b/src/main/java/de/juplo/kafka/wordcount/top10/Key.java new file mode 100644 index 0000000..d09dbcc --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Key.java @@ -0,0 +1,13 @@ +package de.juplo.kafka.wordcount.top10; + +import lombok.Getter; +import lombok.Setter; + + +@Getter +@Setter +public class Key +{ + private String username; + private String word; +} diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java b/src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java new file mode 100644 index 0000000..b748fe5 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java @@ -0,0 +1,53 @@ +package de.juplo.kafka.wordcount.top10; + +import lombok.Getter; +import lombok.Setter; + +import java.util.Arrays; +import java.util.LinkedList; +import java.util.List; + + +@Getter +@Setter +public class Ranking +{ + private Entry[] entries = new Entry[0]; + + public void add(Entry newEntry) + { + if (entries.length == 0) + { + entries = new Entry[1]; + entries[0] = newEntry; + return; + } + + List list = new LinkedList<>(Arrays.asList(entries)); + for (int i = 0; i < list.size(); i++) + { + Entry entry; + + entry = list.get(i); + if (entry.getCount() <= newEntry.getCount()) + { + list.add(i, newEntry); + for (int j = i+1; j < list.size(); j++) + { + entry = list.get(j); + if(entry.getWord().equals(newEntry.getWord())) + { + list.remove(j); + break; + } + } + if (list.size() > 10) + { + list = list.subList(0,10); + } + entries = list.toArray(num -> new Entry[num]); + return; + } + } + } +} diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10Application.java similarity index 58% rename from src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java rename to src/main/java/de/juplo/kafka/wordcount/top10/Top10Application.java index 1f73d32..27dca95 100644 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterApplication.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10Application.java @@ -1,4 +1,4 @@ -package de.juplo.kafka.wordcount.counter; +package de.juplo.kafka.wordcount.top10; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @@ -6,11 +6,11 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties @SpringBootApplication -@EnableConfigurationProperties(CounterApplicationProperties.class) -public class CounterApplication +@EnableConfigurationProperties(Top10ApplicationProperties.class) +public class Top10Application { public static void main(String[] args) { - SpringApplication.run(CounterApplication.class, args); + SpringApplication.run(Top10Application.class, args); } } diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationProperties.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationProperties.java new file mode 100644 index 0000000..93b78ec --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationProperties.java @@ -0,0 +1,20 @@ +package de.juplo.kafka.wordcount.top10; + + +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; +import org.springframework.boot.context.properties.ConfigurationProperties; + + +@ConfigurationProperties("juplo.wordcount.top10") +@Getter +@Setter +@ToString +public class Top10ApplicationProperties +{ + private String bootstrapServer = "localhost:9092"; + private String applicationId = "top10"; + private String inputTopic = "countings"; + private String outputTopic = "top10"; +} diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java similarity index 71% rename from src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java rename to src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java index e8d7c11..862913a 100644 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java @@ -1,4 +1,4 @@ -package de.juplo.kafka.wordcount.counter; +package de.juplo.kafka.wordcount.top10; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; @@ -9,14 +9,12 @@ import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.kstream.KStream; import org.springframework.boot.SpringApplication; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; -import java.util.Arrays; import java.util.Properties; import java.util.concurrent.CompletableFuture; import java.util.regex.Pattern; @@ -26,29 +24,31 @@ import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.St @Slf4j @Component -public class CounterStreamProcessor +public class Top10StreamProcessor { final static Pattern PATTERN = Pattern.compile("\\W+"); public final KafkaStreams streams; - public CounterStreamProcessor( - CounterApplicationProperties properties, + public Top10StreamProcessor( + Top10ApplicationProperties properties, ObjectMapper mapper, ConfigurableApplicationContext context) { StreamsBuilder builder = new StreamsBuilder(); - KStream source = builder.stream(properties.getInputTopic()); - source - .flatMapValues(sentence -> Arrays.asList(PATTERN.split(sentence))) - .map((username, word) -> + builder + .stream(properties.getInputTopic()) + .map((keyJson, countStr) -> { try { - String key = mapper.writeValueAsString(Key.of(username, word)); - return new KeyValue<>(key, word); + Key key = mapper.readValue(keyJson, Key.class); + Long count = Long.parseLong(countStr); + Entry entry = Entry.of(key.getWord(), count); + String entryJson = mapper.writeValueAsString(entry); + return new KeyValue<>(key.getUsername(), entryJson); } catch (JsonProcessingException e) { @@ -56,8 +56,22 @@ public class CounterStreamProcessor } }) .groupByKey() - .count() - .mapValues(value->Long.toString(value)) + .aggregate( + () -> "{\"entries\" : []}", + (username, entryJson, rankingJson) -> + { + try + { + Ranking ranking = mapper.readValue(rankingJson, Ranking.class); + ranking.add(mapper.readValue(entryJson, Entry.class)); + return mapper.writeValueAsString(ranking); + } + catch (JsonProcessingException e) + { + throw new RuntimeException(e); + } + } + ) .toStream() .to(properties.getOutputTopic()); -- 2.20.1