From: Kai Moritz Date: Tue, 14 May 2024 21:30:39 +0000 (+0200) Subject: WIP X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=5ece428364118199f8cf89a0521f38aeefbcbf0a;p=demos%2Fkafka%2Fwordcount WIP --- diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Key.java b/src/main/java/de/juplo/kafka/wordcount/top10/Key.java index d09dbcc..e718eac 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Key.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Key.java @@ -1,13 +1,15 @@ package de.juplo.kafka.wordcount.top10; -import lombok.Getter; -import lombok.Setter; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.*; -@Getter -@Setter +@NoArgsConstructor +@AllArgsConstructor(staticName = "of") +@Data +@JsonIgnoreProperties(ignoreUnknown = true) public class Key { - private String username; + private String user; private String word; } diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java b/src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java index b748fe5..cc08f4b 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Ranking.java @@ -1,15 +1,15 @@ package de.juplo.kafka.wordcount.top10; -import lombok.Getter; -import lombok.Setter; +import lombok.*; import java.util.Arrays; import java.util.LinkedList; import java.util.List; -@Getter -@Setter +@AllArgsConstructor(staticName = "of") +@NoArgsConstructor +@Data public class Ranking { private Entry[] entries = new Entry[0]; diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java index ea98aa2..b43d825 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java @@ -46,14 +46,12 @@ public class Top10ApplicationConfiguration @Bean(initMethod = "start", destroyMethod = "stop") public Top10StreamProcessor streamProcessor( Top10ApplicationProperties applicationProperties, - ObjectMapper objectMapper, Properties streamProcessorProperties, ConfigurableApplicationContext context) { Top10StreamProcessor streamProcessor = new Top10StreamProcessor( applicationProperties.getInputTopic(), applicationProperties.getOutputTopic(), - objectMapper, streamProcessorProperties); streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) -> diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java index b669344..e6deee0 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java @@ -1,6 +1,5 @@ package de.juplo.kafka.wordcount.top10; -import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; @@ -18,17 +17,16 @@ public class Top10StreamProcessor public Top10StreamProcessor( String inputTopic, String outputTopic, - ObjectMapper mapper, Properties props) { StreamsBuilder builder = new StreamsBuilder(); builder - .stream(inputTopic) - .map((word, counter) -> + .stream(inputTopic) + .map((key, counter) -> { - Entry entry = Entry.of(word.getWord(), counter.getCounter()); - return new KeyValue<>(word.getUser(), entry); + Entry entry = Entry.of(key.getWord(), counter.getCounter()); + return new KeyValue<>(key.getUser(), entry); }) .groupByKey() .aggregate( diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Word.java b/src/main/java/de/juplo/kafka/wordcount/top10/Word.java deleted file mode 100644 index 3ee1bb0..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Word.java +++ /dev/null @@ -1,17 +0,0 @@ -package de.juplo.kafka.wordcount.top10; - -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; -import lombok.AllArgsConstructor; -import lombok.Data; -import lombok.NoArgsConstructor; - - -@NoArgsConstructor -@AllArgsConstructor(staticName = "of") -@Data -@JsonIgnoreProperties(ignoreUnknown = true) -public class Word -{ - private String user; - private String word; -} diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java b/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java index f521869..fd18d26 100644 --- a/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java +++ b/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java @@ -15,44 +15,44 @@ import static org.assertj.core.api.Assertions.assertThat; class TestData { - static void writeInputData(BiConsumer consumer) + static void writeInputData(BiConsumer consumer) { consumer.accept( - Word.of("peter","Hallo"), + Key.of("peter","Hallo"), Counter.of("peter","Hallo",1)); consumer.accept( - Word.of("klaus","Müsch"), + Key.of("klaus","Müsch"), Counter.of("klaus","Müsch",1)); consumer.accept( - Word.of("peter","Welt"), + Key.of("peter","Welt"), Counter.of("peter","Welt",1)); consumer.accept( - Word.of("klaus","Müsch"), + Key.of("klaus","Müsch"), Counter.of("klaus","Müsch",2)); consumer.accept( - Word.of("klaus","s"), + Key.of("klaus","s"), Counter.of("klaus","s",1)); consumer.accept( - Word.of("peter","Boäh"), + Key.of("peter","Boäh"), Counter.of("peter","Boäh",1)); consumer.accept( - Word.of("peter","Welt"), + Key.of("peter","Welt"), Counter.of("peter","Welt",2)); consumer.accept( - Word.of("peter","Boäh"), + Key.of("peter","Boäh"), Counter.of("peter","Boäh",2)); consumer.accept( - Word.of("klaus","s"), + Key.of("klaus","s"), Counter.of("klaus","s",2)); consumer.accept( - Word.of("peter","Boäh"), + Key.of("peter","Boäh"), Counter.of("peter","Boäh",3)); consumer.accept( - Word.of("klaus","s"), + Key.of("klaus","s"), Counter.of("klaus","s",3)); } - static void assertExpectedResult(List> receivedMessages) + static void assertExpectedResult(List> receivedMessages) { assertThat(receivedMessages).hasSize(11); assertThat(receivedMessages).containsSubsequence( @@ -73,42 +73,12 @@ class TestData expectedMessages[9]); // Boäh } - static KeyValue[] expectedMessages = new KeyValue[] - { - KeyValue.pair( - Word.of("peter","Hallo"), - Counter.of("peter","Hallo",1)), - KeyValue.pair( - Word.of("klaus","Müsch"), - Counter.of("klaus","Müsch",1)), - KeyValue.pair( - Word.of("peter","Welt"), - Counter.of("peter","Welt",1)), - KeyValue.pair( - Word.of("klaus","Müsch"), - Counter.of("klaus","Müsch",2)), - KeyValue.pair( - Word.of("klaus","s"), - Counter.of("klaus","s",1)), - KeyValue.pair( - Word.of("peter","Boäh"), - Counter.of("peter","Boäh",1)), - KeyValue.pair( - Word.of("peter","Welt"), - Counter.of("peter","Welt",2)), - KeyValue.pair( - Word.of("peter","Boäh"), - Counter.of("peter","Boäh",2)), - KeyValue.pair( - Word.of("klaus","s"), - Counter.of("klaus","s",2)), - KeyValue.pair( - Word.of("peter","Boäh"), - Counter.of("peter","Boäh",3)), - KeyValue.pair( - Word.of("klaus","s"), - Counter.of("klaus","s",3)), - }; + static KeyValue[] expectedMessages = new KeyValue[] + { + KeyValue.pair( + "peter", + Ranking.of("peter","Hallo",1)), + }; static Map convertToMap(Properties properties) {