From: Kai Moritz Date: Sat, 22 Jun 2024 12:22:35 +0000 (+0200) Subject: counter: 1.4.0 - Refined output JSON to match the new general stats-format X-Git-Tag: counter-1.4.0 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=123d1d3df7e3665dbe511a2e7d7398e0aa235a63;p=demos%2Fkafka%2Fwordcount counter: 1.4.0 - Refined output JSON to match the new general stats-format * Changed the type-mapping for `Word` from `word` to `key`. * Refined the class `Word`, that defines the JSON for the output key. ** Added attribute `type` with fixed value `POPULAR`. ** Renamed attribute `user` to `channel`. ** Renamed attribute `word` to `key`. * Refined the class `WordCounter`, that defines the JSON for the output value. ** Renamed attribute `word` to `key`. * Adapted test-classes and -cases accordingly. --- diff --git a/pom.xml b/pom.xml index 03a7b40..722c663 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ de.juplo.kafka.wordcount counter - 1.3.1 + 1.4.0 Wordcount-Counter Word-counting stream-processor of the multi-user wordcount-example diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java index 2304e55..455d895 100644 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java @@ -19,6 +19,7 @@ import java.util.stream.Collectors; @Slf4j public class CounterStreamProcessor { + public static final String TYPE = "COUNTER"; public static final String STORE_NAME = "counter"; @@ -48,6 +49,7 @@ public class CounterStreamProcessor builder .stream(inputTopic, Consumed.with(inKeySerde(), inValueSerde())) + .mapValues(word -> Word.of(word.getUser(), word.getWord())) .map((key, word) -> new KeyValue<>(word, word)) .groupByKey() .count( @@ -88,9 +90,9 @@ public class CounterStreamProcessor return new JsonSerde<>(User.class); } - public static JsonSerde inValueSerde() + public static JsonSerde inValueSerde() { - return new JsonSerde<>(Word.class); + return new JsonSerde<>(UserWord.class); } public static JsonSerde outKeySerde() @@ -117,11 +119,11 @@ public class CounterStreamProcessor return typeMappingsConfig(Word.class, WordCounter.class); } - public static String typeMappingsConfig(Class wordClass, Class wordCounterClass) + public static String typeMappingsConfig(Class keyClass, Class counterClass) { return Map.of( - "word", wordClass, - "counter", wordCounterClass) + "key", keyClass, + "counter", counterClass) .entrySet() .stream() .map(entry -> entry.getKey() + ":" + entry.getValue().getName()) diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/UserWord.java b/src/main/java/de/juplo/kafka/wordcount/counter/UserWord.java new file mode 100644 index 0000000..db1ccb2 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/counter/UserWord.java @@ -0,0 +1,13 @@ +package de.juplo.kafka.wordcount.counter; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.Data; + + +@Data +@JsonIgnoreProperties(ignoreUnknown = true) +public class UserWord +{ + private String user; + private String word; +} diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/Word.java b/src/main/java/de/juplo/kafka/wordcount/counter/Word.java index 77287d5..a058ff8 100644 --- a/src/main/java/de/juplo/kafka/wordcount/counter/Word.java +++ b/src/main/java/de/juplo/kafka/wordcount/counter/Word.java @@ -1,13 +1,16 @@ package de.juplo.kafka.wordcount.counter; -import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.AllArgsConstructor; import lombok.Data; +import lombok.NoArgsConstructor; @Data -@JsonIgnoreProperties(ignoreUnknown = true) +@NoArgsConstructor +@AllArgsConstructor(staticName = "of") public class Word { - private String user; - private String word; + private final String type = CounterStreamProcessor.TYPE; + private String channel; + private String key; } diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/WordCounter.java b/src/main/java/de/juplo/kafka/wordcount/counter/WordCounter.java index f1fce71..211fa4c 100644 --- a/src/main/java/de/juplo/kafka/wordcount/counter/WordCounter.java +++ b/src/main/java/de/juplo/kafka/wordcount/counter/WordCounter.java @@ -12,11 +12,11 @@ import lombok.NoArgsConstructor; public class WordCounter { String user; - String word; + String key; long counter; public static WordCounter of(Word word, long counter) { - return new WordCounter(word.getUser(), word.getWord(), counter); + return new WordCounter(word.getChannel(), word.getKey(), counter); } } diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java index 0faa2de..ab395fd 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java @@ -41,7 +41,7 @@ import static org.awaitility.Awaitility.await; "spring.kafka.consumer.auto-offset-reset=earliest", "spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", - "spring.kafka.consumer.properties.spring.json.type.mapping=word:de.juplo.kafka.wordcount.top10.TestOutputWord,counter:de.juplo.kafka.wordcount.top10.TestOutputWordCounter", + "spring.kafka.consumer.properties.spring.json.type.mapping=key:de.juplo.kafka.wordcount.top10.TestOutputWord,counter:de.juplo.kafka.wordcount.top10.TestOutputWordCounter", "logging.level.root=WARN", "logging.level.de.juplo=DEBUG", "juplo.wordcount.counter.bootstrap-server=${spring.embedded.kafka.brokers}", diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java b/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java index 862eb2b..9b38dbc 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/TestData.java @@ -11,6 +11,7 @@ import org.springframework.util.MultiValueMap; import java.util.stream.Stream; +import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.TYPE; import static org.assertj.core.api.Assertions.assertThat; @@ -25,11 +26,11 @@ class TestData static final String WORD_S = "s"; static final String WORD_BOÄH = "Boäh"; - static final TestOutputWord PETER_HALLO = TestOutputWord.of(PETER, WORD_HALLO); - static final TestOutputWord PETER_WELT = TestOutputWord.of(PETER, WORD_WELT); - static final TestOutputWord PETER_BOÄH = TestOutputWord.of(PETER, WORD_BOÄH); - static final TestOutputWord KLAUS_MÜSCH = TestOutputWord.of(KLAUS, WORD_MÜSCH); - static final TestOutputWord KLAUS_S = TestOutputWord.of(KLAUS, WORD_S); + static final TestOutputWord PETER_HALLO = TestOutputWord.of(TYPE, PETER, WORD_HALLO); + static final TestOutputWord PETER_WELT = TestOutputWord.of(TYPE, PETER, WORD_WELT); + static final TestOutputWord PETER_BOÄH = TestOutputWord.of(TYPE, PETER, WORD_BOÄH); + static final TestOutputWord KLAUS_MÜSCH = TestOutputWord.of(TYPE, KLAUS, WORD_MÜSCH); + static final TestOutputWord KLAUS_S = TestOutputWord.of(TYPE, KLAUS, WORD_S); private static final KeyValue[] INPUT_MESSAGES = new KeyValue[] { @@ -108,12 +109,9 @@ class TestData private static Word wordOf(TestOutputWord testOutputWord) { - Word word = new Word(); - - word.setUser(testOutputWord.getUser()); - word.setWord(testOutputWord.getWord()); - - return word; + return Word.of( + testOutputWord.getChannel(), + testOutputWord.getKey()); } static void assertExpectedLastMessagesForWord(MultiValueMap receivedMessages) @@ -130,8 +128,8 @@ class TestData Long counter) { TestOutputWordCounter testOutputWordCounter = TestOutputWordCounter.of( - word.getUser(), - word.getWord(), + word.getChannel(), + word.getKey(), counter); assertWordCountEqualsWordCountFromLastMessage(word, testOutputWordCounter); } diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/TestOutputWord.java b/src/test/java/de/juplo/kafka/wordcount/top10/TestOutputWord.java index cfc2cae..132f6ba 100644 --- a/src/test/java/de/juplo/kafka/wordcount/top10/TestOutputWord.java +++ b/src/test/java/de/juplo/kafka/wordcount/top10/TestOutputWord.java @@ -10,6 +10,7 @@ import lombok.NoArgsConstructor; @AllArgsConstructor(staticName = "of") public class TestOutputWord { - String user; - String word; + String type; + String channel; + String key; } diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/TestOutputWordCounter.java b/src/test/java/de/juplo/kafka/wordcount/top10/TestOutputWordCounter.java index 1b59387..a5f5d43 100644 --- a/src/test/java/de/juplo/kafka/wordcount/top10/TestOutputWordCounter.java +++ b/src/test/java/de/juplo/kafka/wordcount/top10/TestOutputWordCounter.java @@ -11,6 +11,6 @@ import lombok.NoArgsConstructor; public class TestOutputWordCounter { String user; - String word; + String key; long counter; }