WIP
authorKai Moritz <kai@juplo.de>
Tue, 14 May 2024 22:07:33 +0000 (00:07 +0200)
committerKai Moritz <kai@juplo.de>
Tue, 14 May 2024 22:07:33 +0000 (00:07 +0200)
src/main/java/de/juplo/kafka/wordcount/top10/Counter.java [deleted file]
src/main/java/de/juplo/kafka/wordcount/top10/Entry.java
src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java
src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java
src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java
src/test/java/de/juplo/kafka/wordcount/top10/Counter.java [new file with mode: 0644]

diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Counter.java b/src/main/java/de/juplo/kafka/wordcount/top10/Counter.java
deleted file mode 100644 (file)
index 3dac384..0000000
+++ /dev/null
@@ -1,22 +0,0 @@
-package de.juplo.kafka.wordcount.top10;
-
-
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import lombok.AccessLevel;
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-
-@NoArgsConstructor
-@AllArgsConstructor(
-    staticName = "of",
-    access = AccessLevel.PACKAGE)
-@Data
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class Counter
-{
-  String user;
-  String word;
-  long counter;
-}
index 9f4e8ce..b25fc07 100644 (file)
@@ -1,13 +1,20 @@
 package de.juplo.kafka.wordcount.top10;
 
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.AccessLevel;
 import lombok.AllArgsConstructor;
 import lombok.Data;
+import lombok.NoArgsConstructor;
 
 
+@NoArgsConstructor
+@AllArgsConstructor(
+    staticName = "of",
+    access = AccessLevel.PACKAGE)
 @Data
-@AllArgsConstructor(staticName = "of")
+@JsonIgnoreProperties(ignoreUnknown = true)
 public class Entry
 {
   private String word;
-  private Long count;
+  private Long counter;
 }
index 1d64a57..80e8742 100644 (file)
@@ -14,13 +14,13 @@ public class Ranking
 {
   private Entry[] entries = new Entry[0];
 
-  public void add(Entry newEntry)
+  public Ranking add(Entry newEntry)
   {
     if (entries.length == 0)
     {
       entries = new Entry[1];
       entries[0] = newEntry;
-      return;
+      return this;
     }
 
     List<Entry> list = new LinkedList<>(Arrays.asList(entries));
@@ -29,7 +29,7 @@ public class Ranking
       Entry entry;
 
       entry = list.get(i);
-      if (entry.getCount() <= newEntry.getCount())
+      if (entry.getCounter() <= newEntry.getCounter())
       {
         list.add(i, newEntry);
         for (int j = i+1; j < list.size(); j++)
@@ -46,9 +46,11 @@ public class Ranking
           list = list.subList(0,10);
         }
         entries = list.toArray(num -> new Entry[num]);
-        return;
+        return this;
       }
     }
+
+    return this;
   }
 
   public static Ranking of(Entry... entries)
index 224258c..d6d9e76 100644 (file)
@@ -36,10 +36,10 @@ public class Top10ApplicationConfiguration
                props.put(
                                JsonDeserializer.TYPE_MAPPINGS,
                                "word:" + Key.class.getName() + "," +
-                               "counter:" + Counter.class.getName());
+                               "counter:" + Entry.class.getName());
                props.put(JsonDeserializer.REMOVE_TYPE_INFO_HEADERS, Boolean.FALSE);
                props.put(
-                               JsonDeserializer.TYPE_MAPPINGS,
+                               JsonSerializer.TYPE_MAPPINGS,
                                "ranking:" + Ranking.class.getName());
                props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
 
index 084e425..2b2cf93 100644 (file)
@@ -34,21 +34,12 @@ public class Top10StreamProcessor
                StreamsBuilder builder = new StreamsBuilder();
 
                builder
-                               .<Key, Counter>stream(inputTopic)
-                               .map((key, counter) ->
-                               {
-                                       Entry entry = Entry.of(key.getWord(), counter.getCounter());
-                                       return new KeyValue<>(key.getUser(), entry);
-                               })
+                               .<Key, Entry>stream(inputTopic)
+                               .map((key, entry) -> new KeyValue<>(key.getUser(), entry))
                                .groupByKey()
                                .aggregate(
                                                () -> new Ranking(),
-                                               (user, entry, ranking) ->
-                                               {
-                                                       ranking.add(entry);
-                                                       return ranking;
-                                               }
-                               )
+                                               (user, entry, ranking) ->ranking.add(entry))
                                .toStream()
                                .to(outputTopic);
 
diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/Counter.java b/src/test/java/de/juplo/kafka/wordcount/top10/Counter.java
new file mode 100644 (file)
index 0000000..c16b70b
--- /dev/null
@@ -0,0 +1,14 @@
+package de.juplo.kafka.wordcount.top10;
+
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.*;
+
+
+@Value(staticConstructor = "of")
+public class Counter
+{
+  String user;
+  String word;
+  long counter;
+}