From 42bb92cf8fca73ddf940f2257c7fa5f693bc2ca4 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Thu, 14 Oct 2021 20:39:22 +0200 Subject: [PATCH] WIP: Dirty in-place implementation --- .../de/juplo/kafka/wordcount/top10/Entry.java | 11 ---- .../de/juplo/kafka/wordcount/top10/Key.java | 13 ----- .../juplo/kafka/wordcount/top10/Ranking.java | 53 ------------------- .../wordcount/top10/Top10StreamProcessor.java | 37 ++++++++++++- 4 files changed, 35 insertions(+), 79 deletions(-) delete mode 100644 src/main/java/de/juplo/kafka/wordcount/top10/Entry.java delete mode 100644 src/main/java/de/juplo/kafka/wordcount/top10/Key.java delete mode 100644 src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Entry.java b/src/main/java/de/juplo/kafka/wordcount/top10/Entry.java deleted file mode 100644 index 67f45f2..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Entry.java +++ /dev/null @@ -1,11 +0,0 @@ -package de.juplo.kafka.wordcount.top10; - -import lombok.Value; - - -@Value(staticConstructor = "of") -public class Entry -{ - private final String word; - private final Long count; -} diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Key.java b/src/main/java/de/juplo/kafka/wordcount/top10/Key.java deleted file mode 100644 index d09dbcc..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Key.java +++ /dev/null @@ -1,13 +0,0 @@ -package de.juplo.kafka.wordcount.top10; - -import lombok.Getter; -import lombok.Setter; - - -@Getter -@Setter -public class Key -{ - private String username; - private String word; -} diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java b/src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java deleted file mode 100644 index b748fe5..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java +++ /dev/null @@ -1,53 +0,0 @@ -package de.juplo.kafka.wordcount.top10; - -import lombok.Getter; -import lombok.Setter; - -import java.util.Arrays; -import java.util.LinkedList; -import java.util.List; - - -@Getter -@Setter -public class Ranking -{ - private Entry[] entries = new Entry[0]; - - public void add(Entry newEntry) - { - if (entries.length == 0) - { - entries = new Entry[1]; - entries[0] = newEntry; - return; - } - - List list = new LinkedList<>(Arrays.asList(entries)); - for (int i = 0; i < list.size(); i++) - { - Entry entry; - - entry = list.get(i); - if (entry.getCount() <= newEntry.getCount()) - { - list.add(i, newEntry); - for (int j = i+1; j < list.size(); j++) - { - entry = list.get(j); - if(entry.getWord().equals(newEntry.getWord())) - { - list.remove(j); - break; - } - } - if (list.size() > 10) - { - list = list.subList(0,10); - } - entries = list.toArray(num -> new Entry[num]); - return; - } - } - } -} diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java index 811cf98..2bc1da7 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java @@ -21,7 +21,9 @@ import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; +import java.util.Arrays; import java.util.LinkedList; +import java.util.List; import java.util.Properties; import java.util.concurrent.CompletableFuture; import java.util.regex.Pattern; @@ -53,8 +55,39 @@ public class Top10StreamProcessor .groupByKey(Grouped.keySerde(Serdes.String())) .aggregate( () -> Ranking.newBuilder().setEntries(new LinkedList()).build(), - (username, entry, ranking) -> { - ranking.getEntries().add(entry); + (username, newEntry, ranking) -> { + List entries = new LinkedList<>(ranking.getEntries()); + + if (entries.isEmpty()) + { + entries.add(newEntry); + } + else + { + for (int i = 0; i < entries.size(); i++) + { + Entry entry = entries.get(i); + if (entry.getCount() <= newEntry.getCount()) + { + entries.add(i, newEntry); + for (int j = i + 1; j < entries.size(); j++) + { + entry = entries.get(j); + if (entry.getWord().equals(newEntry.getWord())) + { + entries.remove(j); + break; + } + } + if (entries.size() > 10) + { + entries = entries.subList(0, 10); + } + } + } + } + + ranking.setEntries(entries); return ranking; }) .toStream() -- 2.20.1