X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Ftop10%2FTop10StreamProcessor.java;h=2bc1da7ddd544864909a9fc882b8b84daaeef9d7;hb=42bb92cf8fca73ddf940f2257c7fa5f693bc2ca4;hp=811cf987147855fd443298e30a731a6c8210b48a;hpb=37e0703a25b489b0258f6ef019c57e82863394e4;p=demos%2Fkafka%2Fwordcount diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java index 811cf98..2bc1da7 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java @@ -21,7 +21,9 @@ import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.PreDestroy; +import java.util.Arrays; import java.util.LinkedList; +import java.util.List; import java.util.Properties; import java.util.concurrent.CompletableFuture; import java.util.regex.Pattern; @@ -53,8 +55,39 @@ public class Top10StreamProcessor .groupByKey(Grouped.keySerde(Serdes.String())) .aggregate( () -> Ranking.newBuilder().setEntries(new LinkedList()).build(), - (username, entry, ranking) -> { - ranking.getEntries().add(entry); + (username, newEntry, ranking) -> { + List entries = new LinkedList<>(ranking.getEntries()); + + if (entries.isEmpty()) + { + entries.add(newEntry); + } + else + { + for (int i = 0; i < entries.size(); i++) + { + Entry entry = entries.get(i); + if (entry.getCount() <= newEntry.getCount()) + { + entries.add(i, newEntry); + for (int j = i + 1; j < entries.size(); j++) + { + entry = entries.get(j); + if (entry.getWord().equals(newEntry.getWord())) + { + entries.remove(j); + break; + } + } + if (entries.size() > 10) + { + entries = entries.subList(0, 10); + } + } + } + } + + ranking.setEntries(entries); return ranking; }) .toStream()