From: Kai Moritz Date: Tue, 14 May 2024 22:07:33 +0000 (+0200) Subject: WIP X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;ds=sidebyside;h=87f82fe35276666d298bc5100f0810b6aa6ce2d4;p=demos%2Fkafka%2Fwordcount WIP --- diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Counter.java b/src/main/java/de/juplo/kafka/wordcount/top10/Counter.java deleted file mode 100644 index 3dac384..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Counter.java +++ /dev/null @@ -1,22 +0,0 @@ -package de.juplo.kafka.wordcount.top10; - - -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import lombok.AccessLevel; -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; - - -@NoArgsConstructor -@AllArgsConstructor( - staticName = "of", - access = AccessLevel.PACKAGE) -@Data -@JsonIgnoreProperties(ignoreUnknown = true) -public class Counter -{ - String user; - String word; - long counter; -} diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Entry.java b/src/main/java/de/juplo/kafka/wordcount/top10/Entry.java index 9f4e8ce..b25fc07 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Entry.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Entry.java @@ -1,13 +1,20 @@ package de.juplo.kafka.wordcount.top10; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.AccessLevel; import lombok.AllArgsConstructor; import lombok.Data; +import lombok.NoArgsConstructor; +@NoArgsConstructor +@AllArgsConstructor( + staticName = "of", + access = AccessLevel.PACKAGE) @Data -@AllArgsConstructor(staticName = "of") +@JsonIgnoreProperties(ignoreUnknown = true) public class Entry { private String word; - private Long count; + private Long counter; } diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java b/src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java index 1d64a57..80e8742 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java @@ -14,13 +14,13 @@ public class Ranking { private Entry[] entries = new Entry[0]; - public void add(Entry newEntry) + public Ranking add(Entry newEntry) { if (entries.length == 0) { entries = new Entry[1]; entries[0] = newEntry; - return; + return this; } List list = new LinkedList<>(Arrays.asList(entries)); @@ -29,7 +29,7 @@ public class Ranking Entry entry; entry = list.get(i); - if (entry.getCount() <= newEntry.getCount()) + if (entry.getCounter() <= newEntry.getCounter()) { list.add(i, newEntry); for (int j = i+1; j < list.size(); j++) @@ -46,9 +46,11 @@ public class Ranking list = list.subList(0,10); } entries = list.toArray(num -> new Entry[num]); - return; + return this; } } + + return this; } public static Ranking of(Entry... entries) diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java index 224258c..d6d9e76 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java @@ -36,10 +36,10 @@ public class Top10ApplicationConfiguration props.put( JsonDeserializer.TYPE_MAPPINGS, "word:" + Key.class.getName() + "," + - "counter:" + Counter.class.getName()); + "counter:" + Entry.class.getName()); props.put(JsonDeserializer.REMOVE_TYPE_INFO_HEADERS, Boolean.FALSE); props.put( - JsonDeserializer.TYPE_MAPPINGS, + JsonSerializer.TYPE_MAPPINGS, "ranking:" + Ranking.class.getName()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java index 084e425..2b2cf93 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java @@ -34,21 +34,12 @@ public class Top10StreamProcessor StreamsBuilder builder = new StreamsBuilder(); builder - .stream(inputTopic) - .map((key, counter) -> - { - Entry entry = Entry.of(key.getWord(), counter.getCounter()); - return new KeyValue<>(key.getUser(), entry); - }) + .stream(inputTopic) + .map((key, entry) -> new KeyValue<>(key.getUser(), entry)) .groupByKey() .aggregate( () -> new Ranking(), - (user, entry, ranking) -> - { - ranking.add(entry); - return ranking; - } - ) + (user, entry, ranking) ->ranking.add(entry)) .toStream() .to(outputTopic); diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/Counter.java b/src/test/java/de/juplo/kafka/wordcount/top10/Counter.java new file mode 100644 index 0000000..c16b70b --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/top10/Counter.java @@ -0,0 +1,14 @@ +package de.juplo.kafka.wordcount.top10; + + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.*; + + +@Value(staticConstructor = "of") +public class Counter +{ + String user; + String word; + long counter; +}