WIP
authorKai Moritz <kai@juplo.de>
Tue, 14 May 2024 21:30:39 +0000 (23:30 +0200)
committerKai Moritz <kai@juplo.de>
Tue, 14 May 2024 21:30:39 +0000 (23:30 +0200)
src/main/java/de/juplo/kafka/wordcount/top10/Key.java
src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java
src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java
src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java
src/main/java/de/juplo/kafka/wordcount/top10/Word.java [deleted file]
src/test/java/de/juplo/kafka/wordcount/top10/TestData.java

index d09dbcc..e718eac 100644 (file)
@@ -1,13 +1,15 @@
 package de.juplo.kafka.wordcount.top10;
 
-import lombok.Getter;
-import lombok.Setter;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.*;
 
 
-@Getter
-@Setter
+@NoArgsConstructor
+@AllArgsConstructor(staticName = "of")
+@Data
+@JsonIgnoreProperties(ignoreUnknown = true)
 public class Key
 {
-  private String username;
+  private String user;
   private String word;
 }
index b748fe5..cc08f4b 100644 (file)
@@ -1,15 +1,15 @@
 package de.juplo.kafka.wordcount.top10;
 
-import lombok.Getter;
-import lombok.Setter;
+import lombok.*;
 
 import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.List;
 
 
-@Getter
-@Setter
+@AllArgsConstructor(staticName = "of")
+@NoArgsConstructor
+@Data
 public class Ranking
 {
   private Entry[] entries = new Entry[0];
index ea98aa2..b43d825 100644 (file)
@@ -46,14 +46,12 @@ public class Top10ApplicationConfiguration
        @Bean(initMethod = "start", destroyMethod = "stop")
        public Top10StreamProcessor streamProcessor(
                        Top10ApplicationProperties applicationProperties,
-                       ObjectMapper objectMapper,
                        Properties streamProcessorProperties,
                        ConfigurableApplicationContext context)
        {
                Top10StreamProcessor streamProcessor = new Top10StreamProcessor(
                                applicationProperties.getInputTopic(),
                                applicationProperties.getOutputTopic(),
-                               objectMapper,
                                streamProcessorProperties);
 
                streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) ->
index b669344..e6deee0 100644 (file)
@@ -1,6 +1,5 @@
 package de.juplo.kafka.wordcount.top10;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
@@ -18,17 +17,16 @@ public class Top10StreamProcessor
        public Top10StreamProcessor(
                        String inputTopic,
                        String outputTopic,
-                       ObjectMapper mapper,
                        Properties props)
        {
                StreamsBuilder builder = new StreamsBuilder();
 
                builder
-                               .<Word, Counter>stream(inputTopic)
-                               .map((word, counter) ->
+                               .<Key, Counter>stream(inputTopic)
+                               .map((key, counter) ->
                                {
-                                       Entry entry = Entry.of(word.getWord(), counter.getCounter());
-                                       return new KeyValue<>(word.getUser(), entry);
+                                       Entry entry = Entry.of(key.getWord(), counter.getCounter());
+                                       return new KeyValue<>(key.getUser(), entry);
                                })
                                .groupByKey()
                                .aggregate(
diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Word.java b/src/main/java/de/juplo/kafka/wordcount/top10/Word.java
deleted file mode 100644 (file)
index 3ee1bb0..0000000
+++ /dev/null
@@ -1,17 +0,0 @@
-package de.juplo.kafka.wordcount.top10;
-
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
-import lombok.AllArgsConstructor;
-import lombok.Data;
-import lombok.NoArgsConstructor;
-
-
-@NoArgsConstructor
-@AllArgsConstructor(staticName = "of")
-@Data
-@JsonIgnoreProperties(ignoreUnknown = true)
-public class Word
-{
-  private String user;
-  private String word;
-}
index f521869..fd18d26 100644 (file)
@@ -15,44 +15,44 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 class TestData
 {
-       static void writeInputData(BiConsumer<Word, Counter> consumer)
+       static void writeInputData(BiConsumer<Key, Counter> consumer)
        {
                consumer.accept(
-                               Word.of("peter","Hallo"),
+                               Key.of("peter","Hallo"),
                                Counter.of("peter","Hallo",1));
                consumer.accept(
-                               Word.of("klaus","Müsch"),
+                               Key.of("klaus","Müsch"),
                                Counter.of("klaus","Müsch",1));
                consumer.accept(
-                               Word.of("peter","Welt"),
+                               Key.of("peter","Welt"),
                                Counter.of("peter","Welt",1));
                consumer.accept(
-                               Word.of("klaus","Müsch"),
+                               Key.of("klaus","Müsch"),
                                Counter.of("klaus","Müsch",2));
                consumer.accept(
-                               Word.of("klaus","s"),
+                               Key.of("klaus","s"),
                                Counter.of("klaus","s",1));
                consumer.accept(
-                               Word.of("peter","Boäh"),
+                               Key.of("peter","Boäh"),
                                Counter.of("peter","Boäh",1));
                consumer.accept(
-                               Word.of("peter","Welt"),
+                               Key.of("peter","Welt"),
                                Counter.of("peter","Welt",2));
                consumer.accept(
-                               Word.of("peter","Boäh"),
+                               Key.of("peter","Boäh"),
                                Counter.of("peter","Boäh",2));
                consumer.accept(
-                               Word.of("klaus","s"),
+                               Key.of("klaus","s"),
                                Counter.of("klaus","s",2));
                consumer.accept(
-                               Word.of("peter","Boäh"),
+                               Key.of("peter","Boäh"),
                                Counter.of("peter","Boäh",3));
                consumer.accept(
-                               Word.of("klaus","s"),
+                               Key.of("klaus","s"),
                                Counter.of("klaus","s",3));
        }
 
-       static void assertExpectedResult(List<KeyValue<Word, Counter>> receivedMessages)
+       static void assertExpectedResult(List<KeyValue<String, Ranking>> receivedMessages)
        {
                assertThat(receivedMessages).hasSize(11);
                assertThat(receivedMessages).containsSubsequence(
@@ -73,42 +73,12 @@ class TestData
                                expectedMessages[9]); // Boäh
        }
 
-       static KeyValue<Word, Counter>[] expectedMessages = new KeyValue[]
-                       {
-                                       KeyValue.pair(
-                                                       Word.of("peter","Hallo"),
-                                                       Counter.of("peter","Hallo",1)),
-                                       KeyValue.pair(
-                                                       Word.of("klaus","Müsch"),
-                                                       Counter.of("klaus","Müsch",1)),
-                                       KeyValue.pair(
-                                                       Word.of("peter","Welt"),
-                                                       Counter.of("peter","Welt",1)),
-                                       KeyValue.pair(
-                                                       Word.of("klaus","Müsch"),
-                                                       Counter.of("klaus","Müsch",2)),
-                                       KeyValue.pair(
-                                                       Word.of("klaus","s"),
-                                                       Counter.of("klaus","s",1)),
-                                       KeyValue.pair(
-                                                       Word.of("peter","Boäh"),
-                                                       Counter.of("peter","Boäh",1)),
-                                       KeyValue.pair(
-                                                       Word.of("peter","Welt"),
-                                                       Counter.of("peter","Welt",2)),
-                                       KeyValue.pair(
-                                                       Word.of("peter","Boäh"),
-                                                       Counter.of("peter","Boäh",2)),
-                                       KeyValue.pair(
-                                                       Word.of("klaus","s"),
-                                                       Counter.of("klaus","s",2)),
-                                       KeyValue.pair(
-                                                       Word.of("peter","Boäh"),
-                                                       Counter.of("peter","Boäh",3)),
-                                       KeyValue.pair(
-                                                       Word.of("klaus","s"),
-                                                       Counter.of("klaus","s",3)),
-                       };
+       static KeyValue<String, Ranking>[] expectedMessages = new KeyValue[]
+       {
+                       KeyValue.pair(
+                                       "peter",
+                                       Ranking.of("peter","Hallo",1)),
+       };
 
        static Map<String, Object> convertToMap(Properties properties)
        {