From f350c2cafd9f0b290a021443cc7f5818974438e9 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 28 May 2024 22:59:38 +0200 Subject: [PATCH] top10: 1.2.0 - Switched the message-key from `String` to `User` --- pom.xml | 2 +- .../top10/Top10ApplicationConfiguration.java | 3 +- .../wordcount/top10/Top10StreamProcessor.java | 2 +- .../de/juplo/kafka/wordcount/top10/User.java | 14 +++++++++ .../juplo/kafka/wordcount/top10/TestData.java | 30 +++++++++---------- .../Top10StreamProcessorTopologyTest.java | 6 ++-- 6 files changed, 36 insertions(+), 21 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/wordcount/top10/User.java diff --git a/pom.xml b/pom.xml index a8abbc8..fd71ccd 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ de.juplo.kafka.wordcount top10 - 1.1.3 + 1.2.0 Wordcount-Top-10 Top-10 stream-processor of the multi-user wordcount-example diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java index 7749917..6f18339 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java @@ -33,7 +33,7 @@ public class Top10ApplicationConfiguration props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); props.put(JsonDeserializer.TRUSTED_PACKAGES, Top10Application.class.getPackageName()); - props.put(JsonDeserializer.KEY_DEFAULT_TYPE, String.class.getName()); + props.put(JsonDeserializer.KEY_DEFAULT_TYPE, User.class.getName()); props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Ranking.class.getName()); props.put( JsonDeserializer.TYPE_MAPPINGS, @@ -42,6 +42,7 @@ public class Top10ApplicationConfiguration props.put(JsonDeserializer.REMOVE_TYPE_INFO_HEADERS, Boolean.FALSE); props.put( JsonSerializer.TYPE_MAPPINGS, + "user:" + User.class.getName() + "," + "ranking:" + Ranking.class.getName()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java index a3900bf..d3846d8 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java @@ -35,7 +35,7 @@ public class Top10StreamProcessor builder .stream(inputTopic) - .map((key, entry) -> new KeyValue<>(key.getUser(), entry)) + .map((key, entry) -> new KeyValue<>(User.of(key.getUser()), entry)) .groupByKey() .aggregate( () -> new Ranking(), diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/User.java b/src/main/java/de/juplo/kafka/wordcount/top10/User.java new file mode 100644 index 0000000..53c258d --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/top10/User.java @@ -0,0 +1,14 @@ +package de.juplo.kafka.wordcount.top10; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + + +@AllArgsConstructor(staticName = "of") +@NoArgsConstructor +@Data +public class User +{ + String user; +} diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java b/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java index 73a405e..3bb6b54 100644 --- a/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java +++ b/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java @@ -55,7 +55,7 @@ class TestData TestCounter.of("klaus","s",3)), }; - static void assertExpectedMessages(MultiValueMap receivedMessages) + static void assertExpectedMessages(MultiValueMap receivedMessages) { expectedMessages().forEach( (user, rankings) -> @@ -63,69 +63,69 @@ class TestData .containsExactlyElementsOf(rankings)); } - static KeyValue[] EXPECTED_MESSAGES = new KeyValue[] + static KeyValue[] EXPECTED_MESSAGES = new KeyValue[] { KeyValue.pair( // 0 - "peter", + User.of("peter"), Ranking.of( Entry.of("Hallo", 1l))), KeyValue.pair( // 1 - "klaus", + User.of("klaus"), Ranking.of( Entry.of("Müsch", 1l))), KeyValue.pair( // 2 - "peter", + User.of("peter"), Ranking.of( Entry.of("Hallo", 1l), Entry.of("Welt", 1l))), KeyValue.pair( // 3 - "klaus", + User.of("klaus"), Ranking.of( Entry.of("Müsch", 2l))), KeyValue.pair( // 4 - "klaus", + User.of("klaus"), Ranking.of( Entry.of("Müsch", 2l), Entry.of("s", 1l))), KeyValue.pair( // 5 - "peter", + User.of("peter"), Ranking.of( Entry.of("Hallo", 1l), Entry.of("Welt", 1l), Entry.of("Boäh", 1l))), KeyValue.pair( // 6 - "peter", + User.of("peter"), Ranking.of( Entry.of("Welt", 2l), Entry.of("Hallo", 1l), Entry.of("Boäh", 1l))), KeyValue.pair( // 7 - "peter", + User.of("peter"), Ranking.of( Entry.of("Welt", 2l), Entry.of("Boäh", 2l), Entry.of("Hallo", 1l))), KeyValue.pair( // 8 - "klaus", + User.of("klaus"), Ranking.of( Entry.of("Müsch", 2l), Entry.of("s", 2l))), KeyValue.pair( // 9 - "peter", + User.of("peter"), Ranking.of( Entry.of("Boäh", 3l), Entry.of("Welt", 2l), Entry.of("Hallo", 1l))), KeyValue.pair( // 10 - "klaus", + User.of("klaus"), Ranking.of( Entry.of("s", 3l), Entry.of("Müsch", 2l))), }; - static MultiValueMap expectedMessages() + static MultiValueMap expectedMessages() { - MultiValueMap expectedMessages = new LinkedMultiValueMap<>(); + MultiValueMap expectedMessages = new LinkedMultiValueMap<>(); Stream .of(EXPECTED_MESSAGES) .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value)); diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java index 86314e5..01c1cf6 100644 --- a/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java @@ -33,7 +33,7 @@ public class Top10StreamProcessorTopologyTest TopologyTestDriver testDriver; TestInputTopic in; - TestOutputTopic out; + TestOutputTopic out; @BeforeEach @@ -61,7 +61,7 @@ public class Top10StreamProcessorTopologyTest out = testDriver.createOutputTopic( OUT, - (JsonDeserializer)keySerde.deserializer(), + (JsonDeserializer)keySerde.deserializer(), (JsonDeserializer)valueSerde.deserializer()); } @@ -76,7 +76,7 @@ public class Top10StreamProcessorTopologyTest Key.of(kv.key.getUser(), kv.key.getWord()), Entry.of(kv.value.getWord(), kv.value.getCounter()))); - MultiValueMap receivedMessages = new LinkedMultiValueMap<>(); + MultiValueMap receivedMessages = new LinkedMultiValueMap<>(); out .readRecordsToList() .forEach(record -> -- 2.20.1