From: Kai Moritz Date: Sat, 8 Jun 2024 13:19:23 +0000 (+0200) Subject: top10: 1.2.1 - `TestData` uses faked foreign classes for input-/output data X-Git-Tag: top10-1.2.1~5 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=6738dd374575d4d86a966972d2e25661c2ad1523;p=demos%2Fkafka%2Fwordcount top10: 1.2.1 - `TestData` uses faked foreign classes for input-/output data --- diff --git a/src/test/java/de/juplo/kafka/wordcount/query/TestEntry.java b/src/test/java/de/juplo/kafka/wordcount/query/TestEntry.java new file mode 100644 index 0000000..a5152e6 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/query/TestEntry.java @@ -0,0 +1,15 @@ +package de.juplo.kafka.wordcount.query; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + + +@AllArgsConstructor(staticName = "of") +@NoArgsConstructor +@Data +public class TestEntry +{ + String word; + long counter; +} diff --git a/src/test/java/de/juplo/kafka/wordcount/query/TestRanking.java b/src/test/java/de/juplo/kafka/wordcount/query/TestRanking.java index e7f8053..efad48b 100644 --- a/src/test/java/de/juplo/kafka/wordcount/query/TestRanking.java +++ b/src/test/java/de/juplo/kafka/wordcount/query/TestRanking.java @@ -1,11 +1,21 @@ package de.juplo.kafka.wordcount.query; import de.juplo.kafka.wordcount.top10.Entry; +import lombok.AccessLevel; +import lombok.AllArgsConstructor; import lombok.Data; +import lombok.NoArgsConstructor; +@AllArgsConstructor(access = AccessLevel.PRIVATE) +@NoArgsConstructor @Data public class TestRanking { - private Entry[] entries; + private TestEntry[] entries; + + public static TestRanking of(TestEntry... entries) + { + return new TestRanking(entries); + } } diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java b/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java index e0c53df..84c81f5 100644 --- a/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java +++ b/src/test/java/de/juplo/kafka/wordcount/top10/TestData.java @@ -2,11 +2,15 @@ package de.juplo.kafka.wordcount.top10; import de.juplo.kafka.wordcount.counter.TestCounter; import de.juplo.kafka.wordcount.counter.TestWord; +import de.juplo.kafka.wordcount.query.TestEntry; +import de.juplo.kafka.wordcount.query.TestRanking; +import de.juplo.kafka.wordcount.query.TestUser; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; import org.springframework.util.LinkedMultiValueMap; import org.springframework.util.MultiValueMap; +import java.util.Arrays; import java.util.stream.Stream; import static org.assertj.core.api.Assertions.assertThat; @@ -14,8 +18,8 @@ import static org.assertj.core.api.Assertions.assertThat; class TestData { - static final User PETER = User.of("peter"); - static final User KLAUS = User.of("klaus"); + static final TestUser PETER = TestUser.of("peter"); + static final TestUser KLAUS = TestUser.of("klaus"); static final KeyValue[] INPUT_MESSAGES = new KeyValue[] { @@ -54,7 +58,7 @@ class TestData TestCounter.of(KLAUS.getUser(),"s",3)), }; - static void assertExpectedMessages(MultiValueMap receivedMessages) + static void assertExpectedMessages(MultiValueMap receivedMessages) { expectedMessages().forEach( (user, rankings) -> @@ -64,39 +68,62 @@ class TestData static void assertExpectedState(ReadOnlyKeyValueStore store) { - assertRankingEqualsRankingFromLastMessage(PETER, store.get(PETER)); - assertRankingEqualsRankingFromLastMessage(KLAUS, store.get(KLAUS)); + assertRankingEqualsRankingFromLastMessage(PETER, store.get(userOf(PETER))); + assertRankingEqualsRankingFromLastMessage(KLAUS, store.get(userOf(KLAUS))); } - static void assertExpectedNumberOfMessagesForUsers(MultiValueMap receivedMessages) + static User userOf(TestUser user) + { + return User.of(user.getUser()); + } + + static void assertExpectedNumberOfMessagesForUsers(MultiValueMap receivedMessages) { assertThat(countMessagesForUser(PETER, receivedMessages)); assertThat(countMessagesForUser(KLAUS, receivedMessages)); } - static int countMessagesForUser(User user, MultiValueMap messagesForUsers) + static int countMessagesForUser(TestUser user, MultiValueMap messagesForUsers) { return messagesForUsers.get(user).size(); } - static void assertExpectedLastMessagesForUsers(MultiValueMap receivedMessages) + static void assertExpectedLastMessagesForUsers(MultiValueMap receivedMessages) { assertRankingEqualsRankingFromLastMessage(PETER, getLastMessageFor(PETER, receivedMessages)); assertRankingEqualsRankingFromLastMessage(KLAUS, getLastMessageFor(KLAUS, receivedMessages)); } - static void assertRankingEqualsRankingFromLastMessage(User user, Ranking ranking) + static void assertRankingEqualsRankingFromLastMessage(TestUser user, Ranking ranking) + { + TestRanking testRanking = TestRanking.of(testEntriesOf(ranking.getEntries())); + assertRankingEqualsRankingFromLastMessage(user, testRanking); + } + + static TestEntry[] testEntriesOf(Entry... entries) + { + return Arrays + .stream(entries) + .map(entry -> TestEntry.of( + entry.getWord(), + entry.getCounter() == null + ? -1l + : entry.getCounter())) + .toArray(size -> new TestEntry[size]); + } + + static void assertRankingEqualsRankingFromLastMessage(TestUser user, TestRanking ranking) { assertThat(ranking).isEqualTo(getLastMessageFor(user)); } - static Ranking getLastMessageFor(User user) + static TestRanking getLastMessageFor(TestUser user) { return getLastMessageFor(user, expectedMessages()); } - static Ranking getLastMessageFor(User user, MultiValueMap messagesForUsers) + static TestRanking getLastMessageFor(TestUser user, MultiValueMap messagesForUsers) { return messagesForUsers .get(user) @@ -104,69 +131,69 @@ class TestData .reduce(null, (left, right) -> right); } - static KeyValue[] EXPECTED_MESSAGES = new KeyValue[] + static KeyValue[] EXPECTED_MESSAGES = new KeyValue[] { KeyValue.pair( // 0 PETER, - Ranking.of( - Entry.of("Hallo", 1l))), + TestRanking.of( + TestEntry.of("Hallo", 1l))), KeyValue.pair( // 1 KLAUS, - Ranking.of( - Entry.of("Müsch", 1l))), + TestRanking.of( + TestEntry.of("Müsch", 1l))), KeyValue.pair( // 2 PETER, - Ranking.of( - Entry.of("Hallo", 1l), - Entry.of("Welt", 1l))), + TestRanking.of( + TestEntry.of("Hallo", 1l), + TestEntry.of("Welt", 1l))), KeyValue.pair( // 3 KLAUS, - Ranking.of( - Entry.of("Müsch", 2l))), + TestRanking.of( + TestEntry.of("Müsch", 2l))), KeyValue.pair( // 4 KLAUS, - Ranking.of( - Entry.of("Müsch", 2l), - Entry.of("s", 1l))), + TestRanking.of( + TestEntry.of("Müsch", 2l), + TestEntry.of("s", 1l))), KeyValue.pair( // 5 PETER, - Ranking.of( - Entry.of("Hallo", 1l), - Entry.of("Welt", 1l), - Entry.of("Boäh", 1l))), + TestRanking.of( + TestEntry.of("Hallo", 1l), + TestEntry.of("Welt", 1l), + TestEntry.of("Boäh", 1l))), KeyValue.pair( // 6 PETER, - Ranking.of( - Entry.of("Welt", 2l), - Entry.of("Hallo", 1l), - Entry.of("Boäh", 1l))), + TestRanking.of( + TestEntry.of("Welt", 2l), + TestEntry.of("Hallo", 1l), + TestEntry.of("Boäh", 1l))), KeyValue.pair( // 7 PETER, - Ranking.of( - Entry.of("Welt", 2l), - Entry.of("Boäh", 2l), - Entry.of("Hallo", 1l))), + TestRanking.of( + TestEntry.of("Welt", 2l), + TestEntry.of("Boäh", 2l), + TestEntry.of("Hallo", 1l))), KeyValue.pair( // 8 KLAUS, - Ranking.of( - Entry.of("Müsch", 2l), - Entry.of("s", 2l))), + TestRanking.of( + TestEntry.of("Müsch", 2l), + TestEntry.of("s", 2l))), KeyValue.pair( // 9 PETER, - Ranking.of( - Entry.of("Boäh", 3l), - Entry.of("Welt", 2l), - Entry.of("Hallo", 1l))), + TestRanking.of( + TestEntry.of("Boäh", 3l), + TestEntry.of("Welt", 2l), + TestEntry.of("Hallo", 1l))), KeyValue.pair( // 10 KLAUS, - Ranking.of( - Entry.of("s", 3l), - Entry.of("Müsch", 2l))), + TestRanking.of( + TestEntry.of("s", 3l), + TestEntry.of("Müsch", 2l))), }; - static MultiValueMap expectedMessages() + static MultiValueMap expectedMessages() { - MultiValueMap expectedMessages = new LinkedMultiValueMap<>(); + MultiValueMap expectedMessages = new LinkedMultiValueMap<>(); Stream .of(EXPECTED_MESSAGES) .forEach(keyValue -> expectedMessages.add(keyValue.key, keyValue.value)); diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java index 1bec92d..5e1f45c 100644 --- a/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java @@ -128,7 +128,7 @@ public class Top10ApplicationIT static class Consumer { - private final MultiValueMap received = new LinkedMultiValueMap<>(); + private final MultiValueMap received = new LinkedMultiValueMap<>(); @KafkaListener(groupId = "TEST", topics = TOPIC_OUT) public synchronized void receive( @@ -136,10 +136,10 @@ public class Top10ApplicationIT @Payload TestRanking ranking) { log.debug("Received message: {} -> {}", user, ranking); - received.add(User.of(user.getUser()), Ranking.of(ranking.getEntries())); + received.add(user, ranking); } - synchronized MultiValueMap getReceivedMessages() + synchronized MultiValueMap getReceivedMessages() { return received; } diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java index 80fc0df..cd09c06 100644 --- a/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java @@ -2,6 +2,8 @@ package de.juplo.kafka.wordcount.top10; import de.juplo.kafka.wordcount.counter.TestCounter; import de.juplo.kafka.wordcount.counter.TestWord; +import de.juplo.kafka.wordcount.query.TestRanking; +import de.juplo.kafka.wordcount.query.TestUser; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.streams.TestInputTopic; import org.apache.kafka.streams.TestOutputTopic; @@ -32,8 +34,8 @@ public class Top10StreamProcessorTopologyTest TopologyTestDriver testDriver; - TestInputTopic in; - TestOutputTopic out; + TestInputTopic in; + TestOutputTopic out; @BeforeEach @@ -48,16 +50,16 @@ public class Top10StreamProcessorTopologyTest in = testDriver.createInputTopic( IN, - jsonSerializer(Key.class, true), - jsonSerializer(Entry.class,false)); + jsonSerializer(TestWord.class, true), + jsonSerializer(TestCounter.class,false)); out = testDriver.createOutputTopic( OUT, new JsonDeserializer() - .copyWithType(User.class) + .copyWithType(TestUser.class) .ignoreTypeHeaders(), new JsonDeserializer() - .copyWithType(Ranking.class) + .copyWithType(TestRanking.class) .ignoreTypeHeaders()); } @@ -68,11 +70,9 @@ public class Top10StreamProcessorTopologyTest { Stream .of(TestData.INPUT_MESSAGES) - .forEach(kv -> in.pipeInput( - Key.of(kv.key.getUser(), kv.key.getWord()), - Entry.of(kv.value.getWord(), kv.value.getCounter()))); + .forEach(kv -> in.pipeInput(kv.key, kv.value)); - MultiValueMap receivedMessages = new LinkedMultiValueMap<>(); + MultiValueMap receivedMessages = new LinkedMultiValueMap<>(); out .readRecordsToList() .forEach(record -> receivedMessages.add(record.key(), record.value()));