From d835b70fc4d49a50f42da9c21b3de1dbcd18cbaf Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 22 Jun 2024 19:05:45 +0200 Subject: [PATCH] query: 2.1.0 - Refined input JSON (adapted to general format for stats) * Adapted the app to the new type-mapping `stats` for the incomming keys. * Refined the class `Key`, that defines the JSON for the incomming key. ** Renamed attribute `user` to `channel`. ** Added attribute `type` of type `String`. * Refined the class `Entry`, that defines the JSON of an entry in the ranking, that is defined in the class `Ranking`. ** Renamed attribute `word` to `key`. * The `QueryStreamProcessor` filters the incomming messages by the field `type` of the `Key`: all messages are dropped, that are not of type `COUNTER`. --- pom.xml | 2 +- .../de/juplo/kafka/wordcount/query/Entry.java | 2 +- .../de/juplo/kafka/wordcount/query/Key.java | 3 ++- .../query/QueryApplicationConfiguration.java | 2 +- .../wordcount/query/QueryStreamProcessor.java | 4 +++- .../wordcount/query/QueryApplicationIT.java | 2 +- .../QueryStreamProcessorTopologyTest.java | 2 +- .../juplo/kafka/wordcount/query/TestData.java | 21 ++++++++++--------- .../kafka/wordcount/top10/TestEntry.java | 2 +- .../juplo/kafka/wordcount/top10/TestUser.java | 3 ++- 10 files changed, 24 insertions(+), 19 deletions(-) diff --git a/pom.xml b/pom.xml index 707d82d..ff775f7 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ de.juplo.kafka.wordcount query - 2.0.1 + 2.1.0 Wordcount-Query Query stream-processor of the multi-user wordcount-example diff --git a/src/main/java/de/juplo/kafka/wordcount/query/Entry.java b/src/main/java/de/juplo/kafka/wordcount/query/Entry.java index 4be314c..383b1a6 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/Entry.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/Entry.java @@ -6,6 +6,6 @@ import lombok.Data; @Data public class Entry { - private String word; + private String key; private Long counter; } diff --git a/src/main/java/de/juplo/kafka/wordcount/query/Key.java b/src/main/java/de/juplo/kafka/wordcount/query/Key.java index 57d095a..a2d85a1 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/Key.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/Key.java @@ -6,5 +6,6 @@ import lombok.Data; @Data public class Key { - private String user; + private String type; + private String channel; } diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java index 440d5c4..0f9cad1 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java @@ -89,7 +89,7 @@ public class QueryApplicationConfiguration props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); props.put( JsonDeserializer.TYPE_MAPPINGS, - "user:" + Key.class.getName() + "," + + "stats:" + Key.class.getName() + "," + "ranking:" + Ranking.class.getName() + "," + "userranking:" + UserRanking.class.getName()); diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java index e075eb7..5543a91 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java @@ -20,6 +20,7 @@ import java.util.Properties; @Slf4j public class QueryStreamProcessor { + public static final String STATS_TYPE = "COUNTER"; public static final String USER_STORE_NAME = "users"; public static final String RANKING_STORE_NAME = "rankings"; @@ -65,7 +66,8 @@ public class QueryStreamProcessor .withValueSerde(new JsonSerde().copyWithType(User.class))); KStream rankings = builder .stream(rankingInputTopic) - .map((key, value) -> new KeyValue<>(key.getUser(), value)); + .filter((key, value) -> STATS_TYPE.equals(key.getType())) + .map((key, value) -> new KeyValue<>(key.getChannel(), value)); rankings .join(users, (ranking, user) -> UserRanking.of( diff --git a/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java index 1315eae..fb12aee 100644 --- a/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java @@ -153,7 +153,7 @@ public class QueryApplicationIT Map properties = Map.of( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(), ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName(), - JsonSerializer.TYPE_MAPPINGS, "user:" + TestUser.class.getName() + ",ranking:" + TestRanking.class.getName()); + JsonSerializer.TYPE_MAPPINGS, "stats:" + TestUser.class.getName() + ",ranking:" + TestRanking.class.getName()); return new KafkaTemplate(producerFactory, properties); } diff --git a/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java index 203c813..fbeb19b 100644 --- a/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java @@ -83,7 +83,7 @@ public class QueryStreamProcessorTopologyTest jsonSerializer.configure( Map.of( JsonSerializer.TYPE_MAPPINGS, - "user:" + TestUser.class.getName() + "," + + "stats:" + TestUser.class.getName() + "," + "ranking:" + TestRanking.class.getName()), isKey); return jsonSerializer; diff --git a/src/test/java/de/juplo/kafka/wordcount/query/TestData.java b/src/test/java/de/juplo/kafka/wordcount/query/TestData.java index f5b8a00..44162a0 100644 --- a/src/test/java/de/juplo/kafka/wordcount/query/TestData.java +++ b/src/test/java/de/juplo/kafka/wordcount/query/TestData.java @@ -10,13 +10,14 @@ import java.util.Arrays; import java.util.function.Function; import java.util.stream.Stream; +import static de.juplo.kafka.wordcount.query.QueryStreamProcessor.STATS_TYPE; import static org.assertj.core.api.Assertions.assertThat; class TestData { - static final TestUser PETER = TestUser.of("peter"); - static final TestUser KLAUS = TestUser.of("klaus"); + static final TestUser PETER = TestUser.of(STATS_TYPE, "peter"); + static final TestUser KLAUS = TestUser.of(STATS_TYPE, "klaus"); static final Stream> getTop10Messages() { @@ -30,8 +31,8 @@ class TestData static void assertExpectedState(Function function) { - assertRankingEqualsRankingFromLastMessage(PETER.getUser(), function.apply(PETER.getUser())); - assertRankingEqualsRankingFromLastMessage(KLAUS.getUser(), function.apply(KLAUS.getUser())); + assertRankingEqualsRankingFromLastMessage(PETER.getChannel(), function.apply(PETER.getChannel())); + assertRankingEqualsRankingFromLastMessage(KLAUS.getChannel(), function.apply(KLAUS.getChannel())); } private static void assertRankingEqualsRankingFromLastMessage(String user, UserRanking rankingJson) @@ -42,7 +43,7 @@ class TestData private static UserRanking getLastMessageFor(String user) { return getTop10Messages() - .filter(kv -> kv.key.getUser().equals(user)) + .filter(kv -> kv.key.getChannel().equals(user)) .map(kv -> kv.value) .map(testRanking -> userRankingFor(user, testRanking)) .reduce(null, (left, right) -> right); @@ -69,7 +70,7 @@ class TestData private static Entry entryOf(TestEntry testEntry) { Entry entry = new Entry(); - entry.setWord(testEntry.getWord()); + entry.setKey(testEntry.getKey()); entry.setCounter(testEntry.getCounter()); return entry; } @@ -136,10 +137,10 @@ class TestData private static KeyValue[] USERS_MESSAGES = new KeyValue[] { KeyValue.pair( - PETER.getUser(), - TestUserData.of(PETER.getUser(), "Peter", "Pan", TestUserData.Sex.MALE)), + PETER.getChannel(), + TestUserData.of(PETER.getChannel(), "Peter", "Pan", TestUserData.Sex.MALE)), KeyValue.pair( - KLAUS.getUser(), - TestUserData.of(KLAUS.getUser(), "Klaus", "Klüse", TestUserData.Sex.OTHER)), + KLAUS.getChannel(), + TestUserData.of(KLAUS.getChannel(), "Klaus", "Klüse", TestUserData.Sex.OTHER)), }; } diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/TestEntry.java b/src/test/java/de/juplo/kafka/wordcount/top10/TestEntry.java index ceafa82..15a8aa4 100644 --- a/src/test/java/de/juplo/kafka/wordcount/top10/TestEntry.java +++ b/src/test/java/de/juplo/kafka/wordcount/top10/TestEntry.java @@ -10,6 +10,6 @@ import lombok.NoArgsConstructor; @Data public class TestEntry { - String word; + String key; long counter; } diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/TestUser.java b/src/test/java/de/juplo/kafka/wordcount/top10/TestUser.java index cc63c34..e58786a 100644 --- a/src/test/java/de/juplo/kafka/wordcount/top10/TestUser.java +++ b/src/test/java/de/juplo/kafka/wordcount/top10/TestUser.java @@ -10,5 +10,6 @@ import lombok.NoArgsConstructor; @Data public class TestUser { - String user; + String type; + String channel; } -- 2.20.1