From 238491ed4d33495202e79879954802e5d0836006 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 22 Jun 2024 17:07:15 +0200 Subject: [PATCH] top10: 1.4.0 - Refined output JSON -- ALIGN --- pom.xml | 2 +- .../de/juplo/kafka/wordcount/top10/Stats.java | 5 +- .../top10/Top10ApplicationConfiguration.java | 4 +- .../wordcount/top10/Top10StreamProcessor.java | 8 +- .../kafka/wordcount/query/TestStats.java | 5 +- .../juplo/kafka/wordcount/top10/TestData.java | 84 ++++++++++--------- .../wordcount/top10/Top10ApplicationIT.java | 18 ++-- .../Top10StreamProcessorTopologyTest.java | 18 ++-- 8 files changed, 75 insertions(+), 69 deletions(-) diff --git a/pom.xml b/pom.xml index e5cd268..0dce2d1 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ de.juplo.kafka.wordcount top10 - 1.3.0 + 1.4.0 Wordcount-Top-10 Top-10 stream-processor of the multi-user wordcount-example diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Stats.java b/src/main/java/de/juplo/kafka/wordcount/top10/Stats.java index 53c258d..05c2a91 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Stats.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Stats.java @@ -8,7 +8,8 @@ import lombok.NoArgsConstructor; @AllArgsConstructor(staticName = "of") @NoArgsConstructor @Data -public class User +public class Stats { - String user; + StatsType type; + String channel; } diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java index 57e5a47..aecd260 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java @@ -51,13 +51,13 @@ public class Top10ApplicationConfiguration props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName()); - props.put(JsonDeserializer.KEY_DEFAULT_TYPE, User.class.getName()); + props.put(JsonDeserializer.KEY_DEFAULT_TYPE, Stats.class.getName()); props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, Ranking.class.getName()); props.put( JsonDeserializer.TYPE_MAPPINGS, "key:" + Key.class.getName() + "," + "counter:" + Entry.class.getName() + "," + - "user:" + User.class.getName() + "," + + "stats:" + Stats.class.getName() + "," + "ranking:" + Ranking.class.getName()); return props; diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java index 907c7ff..1235132 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java @@ -41,11 +41,13 @@ public class Top10StreamProcessor builder .stream(inputTopic) - .map((key, entry) -> new KeyValue<>(User.of(key.getChannel()), entry)) + .map((key, entry) -> new KeyValue<>( + Stats.of(key.getType(), key.getChannel()), + entry)) .groupByKey() .aggregate( () -> new Ranking(), - (user, entry, ranking) -> ranking.add(entry), + (stats, entry, ranking) -> ranking.add(entry), Materialized.as(storeSupplier)) .toStream() .to(outputTopic); @@ -56,7 +58,7 @@ public class Top10StreamProcessor return topology; } - ReadOnlyKeyValueStore getStore() + ReadOnlyKeyValueStore getStore() { return streams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore())); } diff --git a/src/test/java/de/juplo/kafka/wordcount/query/TestStats.java b/src/test/java/de/juplo/kafka/wordcount/query/TestStats.java index 53a5992..6a47193 100644 --- a/src/test/java/de/juplo/kafka/wordcount/query/TestStats.java +++ b/src/test/java/de/juplo/kafka/wordcount/query/TestStats.java @@ -8,7 +8,8 @@ import lombok.NoArgsConstructor; @AllArgsConstructor(staticName = "of") @NoArgsConstructor @Data -public class TestUser +public class TestStats { - String user; + String type; + String channel; } diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java b/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java index 4fb229b..069f49a 100644 --- a/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java +++ b/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java @@ -4,7 +4,7 @@ import de.juplo.kafka.wordcount.counter.TestCounter; import de.juplo.kafka.wordcount.counter.TestWord; import de.juplo.kafka.wordcount.query.TestEntry; import de.juplo.kafka.wordcount.query.TestRanking; -import de.juplo.kafka.wordcount.query.TestUser; +import de.juplo.kafka.wordcount.query.TestStats; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import org.springframework.util.LinkedMultiValueMap; @@ -20,8 +20,8 @@ class TestData { static final String TYPE_COUNTER = "COUNTER"; - static final TestUser PETER = TestUser.of("peter"); - static final TestUser KLAUS = TestUser.of("klaus"); + static final TestStats PETER = TestStats.of(StatsType.COUNTER.name(), "peter"); + static final TestStats KLAUS = TestStats.of(StatsType.COUNTER.name(), "klaus"); static final Stream> getInputMessages() { @@ -31,83 +31,85 @@ class TestData private static final KeyValue[] INPUT_MESSAGES = new KeyValue[] { new KeyValue<>( - TestWord.of(TYPE_COUNTER, PETER.getUser(),"Hallo"), + TestWord.of(TYPE_COUNTER, PETER.getChannel(),"Hallo"), TestCounter.of("Hallo",1)), new KeyValue<>( - TestWord.of(TYPE_COUNTER, KLAUS.getUser(),"Müsch"), + TestWord.of(TYPE_COUNTER, KLAUS.getChannel(),"Müsch"), TestCounter.of("Müsch",1)), new KeyValue<>( - TestWord.of(TYPE_COUNTER, PETER.getUser(),"Welt"), + TestWord.of(TYPE_COUNTER, PETER.getChannel(),"Welt"), TestCounter.of("Welt",1)), new KeyValue<>( - TestWord.of(TYPE_COUNTER, KLAUS.getUser(),"Müsch"), + TestWord.of(TYPE_COUNTER, KLAUS.getChannel(),"Müsch"), TestCounter.of("Müsch",2)), new KeyValue<>( - TestWord.of(TYPE_COUNTER, KLAUS.getUser(),"s"), + TestWord.of(TYPE_COUNTER, KLAUS.getChannel(),"s"), TestCounter.of("s",1)), new KeyValue<>( - TestWord.of(TYPE_COUNTER, PETER.getUser(),"Boäh"), + TestWord.of(TYPE_COUNTER, PETER.getChannel(),"Boäh"), TestCounter.of("Boäh",1)), new KeyValue<>( - TestWord.of(TYPE_COUNTER, PETER.getUser(),"Welt"), + TestWord.of(TYPE_COUNTER, PETER.getChannel(),"Welt"), TestCounter.of("Welt",2)), new KeyValue<>( - TestWord.of(TYPE_COUNTER, PETER.getUser(),"Boäh"), + TestWord.of(TYPE_COUNTER, PETER.getChannel(),"Boäh"), TestCounter.of("Boäh",2)), new KeyValue<>( - TestWord.of(TYPE_COUNTER, KLAUS.getUser(),"s"), + TestWord.of(TYPE_COUNTER, KLAUS.getChannel(),"s"), TestCounter.of("s",2)), new KeyValue<>( - TestWord.of(TYPE_COUNTER, PETER.getUser(),"Boäh"), + TestWord.of(TYPE_COUNTER, PETER.getChannel(),"Boäh"), TestCounter.of("Boäh",3)), new KeyValue<>( - TestWord.of(TYPE_COUNTER, KLAUS.getUser(),"s"), + TestWord.of(TYPE_COUNTER, KLAUS.getChannel(),"s"), TestCounter.of("s",3)), }; - static void assertExpectedMessages(MultiValueMap receivedMessages) + static void assertExpectedMessages(MultiValueMap receivedMessages) { expectedMessages().forEach( - (user, rankings) -> - assertThat(receivedMessages.get(user)) + (stats, rankings) -> + assertThat(receivedMessages.get(stats)) .containsExactlyElementsOf(rankings)); } - static void assertExpectedState(ReadOnlyKeyValueStore store) + static void assertExpectedState(ReadOnlyKeyValueStore store) { - assertRankingEqualsRankingFromLastMessage(PETER, store.get(userOf(PETER))); - assertRankingEqualsRankingFromLastMessage(KLAUS, store.get(userOf(KLAUS))); + assertRankingEqualsRankingFromLastMessage(PETER, store.get(statsOf(PETER))); + assertRankingEqualsRankingFromLastMessage(KLAUS, store.get(statsOf(KLAUS))); } - private static User userOf(TestUser user) + private static Stats statsOf(TestStats stats) { - return User.of(user.getUser()); + return Stats.of( + StatsType.valueOf(stats.getType()), + stats.getChannel()); } - static void assertExpectedNumberOfMessagesForUsers(MultiValueMap receivedMessages) + static void assertExpectedNumberOfMessages(MultiValueMap receivedMessages) { - assertThat(countMessagesForUser(PETER, receivedMessages)); - assertThat(countMessagesForUser(KLAUS, receivedMessages)); + assertThat(countMessages(PETER, receivedMessages)); + assertThat(countMessages(KLAUS, receivedMessages)); } - private static int countMessagesForUser(TestUser user, MultiValueMap messagesForUsers) + private static int countMessages(TestStats stats, MultiValueMap messagesFor) { - return messagesForUsers.get(user) == null + return messagesFor.get(stats) == null ? 0 - : messagesForUsers.get(user).size(); + : messagesFor.get(stats).size(); } - static void assertExpectedLastMessagesForUsers(MultiValueMap receivedMessages) + static void assertExpectedLastMessages(MultiValueMap receivedMessages) { assertRankingEqualsRankingFromLastMessage(PETER, getLastMessageFor(PETER, receivedMessages)); assertRankingEqualsRankingFromLastMessage(KLAUS, getLastMessageFor(KLAUS, receivedMessages)); } - private static void assertRankingEqualsRankingFromLastMessage(TestUser user, Ranking ranking) + private static void assertRankingEqualsRankingFromLastMessage(TestStats stats, Ranking ranking) { TestRanking testRanking = TestRanking.of(testEntriesOf(ranking.getEntries())); - assertRankingEqualsRankingFromLastMessage(user, testRanking); + assertRankingEqualsRankingFromLastMessage(stats, testRanking); } private static TestEntry[] testEntriesOf(Entry... entries) @@ -122,25 +124,25 @@ class TestData .toArray(size -> new TestEntry[size]); } - private static void assertRankingEqualsRankingFromLastMessage(TestUser user, TestRanking ranking) + private static void assertRankingEqualsRankingFromLastMessage(TestStats stats, TestRanking ranking) { - assertThat(ranking).isEqualTo(getLastMessageFor(user)); + assertThat(ranking).isEqualTo(getLastMessageFor(stats)); } - private static TestRanking getLastMessageFor(TestUser user) + private static TestRanking getLastMessageFor(TestStats stats) { - return getLastMessageFor(user, expectedMessages()); + return getLastMessageFor(stats, expectedMessages()); } - private static TestRanking getLastMessageFor(TestUser user, MultiValueMap messagesForUsers) + private static TestRanking getLastMessageFor(TestStats stats, MultiValueMap messagesFor) { - return messagesForUsers - .get(user) + return messagesFor + .get(stats) .stream() .reduce(null, (left, right) -> right); } - private static KeyValue[] EXPECTED_MESSAGES = new KeyValue[] + private static KeyValue[] EXPECTED_MESSAGES = new KeyValue[] { KeyValue.pair( // 0 PETER, @@ -200,9 +202,9 @@ class TestData TestEntry.of("Müsch", 2l))), }; - private static MultiValueMap expectedMessages() + private static MultiValueMap expectedMessages() { - MultiValueMap expectedMessages = new LinkedMultiValueMap<>(); + MultiValueMap expectedMessages = new LinkedMultiValueMap<>(); Stream .of(EXPECTED_MESSAGES) .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value)); diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java index 5f0e817..51e424e 100644 --- a/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java @@ -3,7 +3,7 @@ package de.juplo.kafka.wordcount.top10; import de.juplo.kafka.wordcount.counter.TestCounter; import de.juplo.kafka.wordcount.counter.TestWord; import de.juplo.kafka.wordcount.query.TestRanking; -import de.juplo.kafka.wordcount.query.TestUser; +import de.juplo.kafka.wordcount.query.TestStats; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.Stores; @@ -40,7 +40,7 @@ import static org.awaitility.Awaitility.await; "spring.kafka.consumer.auto-offset-reset=earliest", "spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", - "spring.kafka.consumer.properties.spring.json.type.mapping=user:de.juplo.kafka.wordcount.query.TestUser,ranking:de.juplo.kafka.wordcount.query.TestRanking", + "spring.kafka.consumer.properties.spring.json.type.mapping=stats:de.juplo.kafka.wordcount.query.TestStats,ranking:de.juplo.kafka.wordcount.query.TestRanking", "logging.level.root=WARN", "logging.level.de.juplo=DEBUG", "logging.level.org.apache.kafka.clients=INFO", @@ -110,32 +110,32 @@ public class Top10ApplicationIT @DisplayName("Await the expected number of messages") @Test - public void testAwaitExpectedNumberOfMessagesForUsers() + public void testAwaitExpectedNumberOfMessages() { await("Expected number of messages") .atMost(Duration.ofSeconds(5)) .untilAsserted(() -> consumer.enforceAssertion( - receivedMessages -> TestData.assertExpectedNumberOfMessagesForUsers(receivedMessages))); + receivedMessages -> TestData.assertExpectedNumberOfMessages(receivedMessages))); } @DisplayName("Await the expected final output messages") @Test - public void testAwaitExpectedLastMessagesForUsers() + public void testAwaitExpectedLastMessages() { await("Expected final output messages") .atMost(Duration.ofSeconds(5)) .untilAsserted(() -> consumer.enforceAssertion( - receivedMessages -> TestData.assertExpectedLastMessagesForUsers(receivedMessages))); + receivedMessages -> TestData.assertExpectedLastMessages(receivedMessages))); } static class Consumer { - private final MultiValueMap received = new LinkedMultiValueMap<>(); + private final MultiValueMap received = new LinkedMultiValueMap<>(); @KafkaListener(groupId = "TEST", topics = TOPIC_OUT) public synchronized void receive( - @Header(KafkaHeaders.RECEIVED_KEY) TestUser user, + @Header(KafkaHeaders.RECEIVED_KEY) TestStats user, @Payload TestRanking ranking) { log.debug("Received message: {} -> {}", user, ranking); @@ -143,7 +143,7 @@ public class Top10ApplicationIT } synchronized void enforceAssertion( - java.util.function.Consumer> assertion) + java.util.function.Consumer> assertion) { assertion.accept(received); } diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java index a8fc859..559d742 100644 --- a/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java @@ -3,7 +3,7 @@ package de.juplo.kafka.wordcount.top10; import de.juplo.kafka.wordcount.counter.TestCounter; import de.juplo.kafka.wordcount.counter.TestWord; import de.juplo.kafka.wordcount.query.TestRanking; -import de.juplo.kafka.wordcount.query.TestUser; +import de.juplo.kafka.wordcount.query.TestStats; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.streams.TestInputTopic; import org.apache.kafka.streams.TestOutputTopic; @@ -33,7 +33,7 @@ public class Top10StreamProcessorTopologyTest public static final String OUT = "TEST-OUT"; static TopologyTestDriver testDriver; - static MultiValueMap receivedMessages = new LinkedMultiValueMap<>(); + static MultiValueMap receivedMessages = new LinkedMultiValueMap<>(); @BeforeAll @@ -51,10 +51,10 @@ public class Top10StreamProcessorTopologyTest jsonSerializer(TestWord.class, true), jsonSerializer(TestCounter.class,false)); - TestOutputTopic out = testDriver.createOutputTopic( + TestOutputTopic out = testDriver.createOutputTopic( OUT, new JsonDeserializer() - .copyWithType(TestUser.class) + .copyWithType(TestStats.class) .ignoreTypeHeaders(), new JsonDeserializer() .copyWithType(TestRanking.class) @@ -79,23 +79,23 @@ public class Top10StreamProcessorTopologyTest @DisplayName("Assert the expected number of messages") @Test - public void testExpectedNumberOfMessagesForUsers() + public void testExpectedNumberOfMessages() { - TestData.assertExpectedNumberOfMessagesForUsers(receivedMessages); + TestData.assertExpectedNumberOfMessages(receivedMessages); } @DisplayName("Assert the expected final output messages") @Test - public void testExpectedLastMessagesForUSers() + public void testExpectedLastMessages() { - TestData.assertExpectedLastMessagesForUsers(receivedMessages); + TestData.assertExpectedLastMessages(receivedMessages); } @DisplayName("Assert the expected state in the state-store") @Test public void testExpectedState() { - KeyValueStore store = testDriver.getKeyValueStore(STORE_NAME); + KeyValueStore store = testDriver.getKeyValueStore(STORE_NAME); TestData.assertExpectedState(store); } -- 2.20.1