From 5684e2e2819181aed5949d8b1f1e8385b3c02576 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 9 Jun 2024 12:09:49 +0200 Subject: [PATCH] query: 1.0.6 - Added `QueryStreamProcessorTopologyTest` --- pom.xml | 15 ++ .../de/juplo/kafka/wordcount/query/Entry.java | 8 +- .../de/juplo/kafka/wordcount/query/Key.java | 6 +- .../kafka/wordcount/query/UserRanking.java | 8 +- .../QueryStreamProcessorTopologyTest.java | 82 +++++++++ .../juplo/kafka/wordcount/query/TestData.java | 159 ++++++++++++++++++ .../kafka/wordcount/top10/TestEntry.java | 15 ++ .../kafka/wordcount/top10/TestRanking.java | 20 +++ .../kafka/wordcount/users/TestUserData.java | 19 +++ 9 files changed, 319 insertions(+), 13 deletions(-) create mode 100644 src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java create mode 100644 src/test/java/de/juplo/kafka/wordcount/query/TestData.java create mode 100644 src/test/java/de/juplo/kafka/wordcount/top10/TestEntry.java create mode 100644 src/test/java/de/juplo/kafka/wordcount/top10/TestRanking.java create mode 100644 src/test/java/de/juplo/kafka/wordcount/users/TestUserData.java diff --git a/pom.xml b/pom.xml index b699a81..d74c826 100644 --- a/pom.xml +++ b/pom.xml @@ -51,6 +51,21 @@ spring-boot-starter-test test + + org.springframework.kafka + spring-kafka + test + + + org.springframework.kafka + spring-kafka-test + test + + + org.assertj + assertj-core + test + diff --git a/src/main/java/de/juplo/kafka/wordcount/query/Entry.java b/src/main/java/de/juplo/kafka/wordcount/query/Entry.java index 4866e72..80b4daf 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/Entry.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/Entry.java @@ -1,11 +1,11 @@ package de.juplo.kafka.wordcount.query; -import lombok.Value; +import lombok.Data; -@Value(staticConstructor = "of") +@Data public class Entry { - private final String word; - private final Long count; + private String word; + private Long count; } diff --git a/src/main/java/de/juplo/kafka/wordcount/query/Key.java b/src/main/java/de/juplo/kafka/wordcount/query/Key.java index be34ba8..afeac4a 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/Key.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/Key.java @@ -1,11 +1,9 @@ package de.juplo.kafka.wordcount.query; -import lombok.Getter; -import lombok.Setter; +import lombok.Data; -@Getter -@Setter +@Data public class Key { private String username; diff --git a/src/main/java/de/juplo/kafka/wordcount/query/UserRanking.java b/src/main/java/de/juplo/kafka/wordcount/query/UserRanking.java index acffd5d..9ca765a 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/UserRanking.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/UserRanking.java @@ -1,13 +1,11 @@ package de.juplo.kafka.wordcount.query; -import lombok.AllArgsConstructor; -import lombok.Getter; -import lombok.Setter; +import lombok.*; -@Getter -@Setter +@Data @AllArgsConstructor(staticName = "of") +@NoArgsConstructor public class UserRanking { private String firstName; diff --git a/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java new file mode 100644 index 0000000..f4164a6 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java @@ -0,0 +1,82 @@ +package de.juplo.kafka.wordcount.query; + +import com.fasterxml.jackson.databind.ObjectMapper; +import de.juplo.kafka.wordcount.top10.TestRanking; +import de.juplo.kafka.wordcount.users.TestUserData; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.TestInputTopic; +import org.apache.kafka.streams.Topology; +import org.apache.kafka.streams.TopologyTestDriver; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.Stores; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.kafka.support.serializer.JsonSerializer; + +import static de.juplo.kafka.wordcount.query.QueryApplicationConfiguration.serializationConfig; + + +@Slf4j +public class QueryStreamProcessorTopologyTest +{ + public static final String TOP10_IN = "TOP10-IN"; + public static final String USERS_IN = "USERS-IN"; + public static final String STORE_NAME = "TOPOLOGY-TEST"; + + + TopologyTestDriver testDriver; + TestInputTopic top10In; + TestInputTopic userIn; + + + @BeforeEach + public void setUp() + { + Topology topology = QueryStreamProcessor.buildTopology( + USERS_IN, + TOP10_IN, + Stores.inMemoryKeyValueStore(STORE_NAME), + new ObjectMapper()); + + testDriver = new TopologyTestDriver(topology, serializationConfig()); + + top10In = testDriver.createInputTopic( + TOP10_IN, + new StringSerializer(), + jsonSerializer(TestRanking.class,false)); + + userIn = testDriver.createInputTopic( + USERS_IN, + new StringSerializer(), + jsonSerializer(TestUserData.class,false)); + } + + + @Test + public void test() + { + TestData + .getUsersMessages() + .forEach(kv -> userIn.pipeInput(kv.key, kv.value)); + TestData + .getTop10Messages() + .forEach(kv -> top10In.pipeInput(kv.key, kv.value)); + + KeyValueStore store = testDriver.getKeyValueStore(STORE_NAME); + TestData.assertExpectedState(store); + } + + @AfterEach + public void tearDown() + { + testDriver.close(); + } + + private JsonSerializer jsonSerializer(Class type, boolean isKey) + { + JsonSerializer jsonSerializer = new JsonSerializer<>(); + return jsonSerializer; + } +} diff --git a/src/test/java/de/juplo/kafka/wordcount/query/TestData.java b/src/test/java/de/juplo/kafka/wordcount/query/TestData.java new file mode 100644 index 0000000..82f7217 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/query/TestData.java @@ -0,0 +1,159 @@ +package de.juplo.kafka.wordcount.query; + +import com.fasterxml.jackson.databind.ObjectMapper; +import de.juplo.kafka.wordcount.top10.TestEntry; +import de.juplo.kafka.wordcount.top10.TestRanking; +import de.juplo.kafka.wordcount.users.TestUserData; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.state.ReadOnlyKeyValueStore; + +import java.util.Arrays; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; + + +class TestData +{ + static final ObjectMapper objectMapper = new ObjectMapper(); + static final String PETER = "peter"; + static final String KLAUS = "klaus"; + + static final Stream> getTop10Messages() + { + return Stream.of(TOP10_MESSAGES); + } + + static final Stream> getUsersMessages() + { + return Stream.of(USERS_MESSAGES); + } + + static void assertExpectedState(ReadOnlyKeyValueStore store) + { + assertRankingEqualsRankingFromLastMessage(PETER, store.get(PETER)); + assertRankingEqualsRankingFromLastMessage(KLAUS, store.get(KLAUS)); + } + + private static void assertRankingEqualsRankingFromLastMessage(String user, String userRankingJson) + { + assertThat(userRankingOf(userRankingJson)).isEqualTo(getLastMessageFor(user)); + } + + private static UserRanking userRankingOf(String json) + { + try + { + return objectMapper.readValue(json, UserRanking.class); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + + private static UserRanking getLastMessageFor(String user) + { + return getTop10Messages() + .filter(kv -> kv.key.equals(user)) + .map(kv -> kv.value) + .map(testRanking -> userRankingFor(user, testRanking)) + .reduce(null, (left, right) -> right); + } + + private static UserRanking userRankingFor(String user, TestRanking testRanking) + { + TestUserData testUserData = getUsersMessages() + .filter(kv -> kv.key.equals(user)) + .map(kv -> kv.value) + .reduce(null, (left, right) -> right); + + Entry[] entries = Arrays + .stream(testRanking.getEntries()) + .map(testEntry -> entryOf(testEntry)) + .toArray(size -> new Entry[size]); + + return UserRanking.of( + testUserData.getFirstName(), + testUserData.getLastName(), + entries); + } + + private static Entry entryOf(TestEntry testEntry) + { + Entry entry = new Entry(); + entry.setWord(testEntry.getWord()); + entry.setCount(testEntry.getCount()); + return entry; + } + + private static KeyValue[] TOP10_MESSAGES = new KeyValue[] + { + KeyValue.pair( // 0 + PETER, + TestRanking.of( + TestEntry.of("Hallo", 1l))), + KeyValue.pair( // 1 + KLAUS, + TestRanking.of( + TestEntry.of("Müsch", 1l))), + KeyValue.pair( // 2 + PETER, + TestRanking.of( + TestEntry.of("Hallo", 1l), + TestEntry.of("Welt", 1l))), + KeyValue.pair( // 3 + KLAUS, + TestRanking.of( + TestEntry.of("Müsch", 2l))), + KeyValue.pair( // 4 + KLAUS, + TestRanking.of( + TestEntry.of("Müsch", 2l), + TestEntry.of("s", 1l))), + KeyValue.pair( // 5 + PETER, + TestRanking.of( + TestEntry.of("Hallo", 1l), + TestEntry.of("Welt", 1l), + TestEntry.of("Boäh", 1l))), + KeyValue.pair( // 6 + PETER, + TestRanking.of( + TestEntry.of("Welt", 2l), + TestEntry.of("Hallo", 1l), + TestEntry.of("Boäh", 1l))), + KeyValue.pair( // 7 + PETER, + TestRanking.of( + TestEntry.of("Welt", 2l), + TestEntry.of("Boäh", 2l), + TestEntry.of("Hallo", 1l))), + KeyValue.pair( // 8 + KLAUS, + TestRanking.of( + TestEntry.of("Müsch", 2l), + TestEntry.of("s", 2l))), + KeyValue.pair( // 9 + PETER, + TestRanking.of( + TestEntry.of("Boäh", 3l), + TestEntry.of("Welt", 2l), + TestEntry.of("Hallo", 1l))), + KeyValue.pair( // 10 + KLAUS, + TestRanking.of( + TestEntry.of("s", 3l), + TestEntry.of("Müsch", 2l))), + }; + + private static KeyValue[] USERS_MESSAGES = new KeyValue[] + { + KeyValue.pair( + PETER, + TestUserData.of(PETER, "Peter", "Pan", TestUserData.Sex.MALE)), + KeyValue.pair( + KLAUS, + TestUserData.of(KLAUS, "Klaus", "Klüse", TestUserData.Sex.OTHER)), + }; +} diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/TestEntry.java b/src/test/java/de/juplo/kafka/wordcount/top10/TestEntry.java new file mode 100644 index 0000000..215327f --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/top10/TestEntry.java @@ -0,0 +1,15 @@ +package de.juplo.kafka.wordcount.top10; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + + +@AllArgsConstructor(staticName = "of") +@NoArgsConstructor +@Data +public class TestEntry +{ + String word; + long count; +} diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/TestRanking.java b/src/test/java/de/juplo/kafka/wordcount/top10/TestRanking.java new file mode 100644 index 0000000..2b49590 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/top10/TestRanking.java @@ -0,0 +1,20 @@ +package de.juplo.kafka.wordcount.top10; + +import lombok.AccessLevel; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + + +@AllArgsConstructor(access = AccessLevel.PRIVATE) +@NoArgsConstructor +@Data +public class TestRanking +{ + private TestEntry[] entries; + + public static TestRanking of(TestEntry... entries) + { + return new TestRanking(entries); + } +} diff --git a/src/test/java/de/juplo/kafka/wordcount/users/TestUserData.java b/src/test/java/de/juplo/kafka/wordcount/users/TestUserData.java new file mode 100644 index 0000000..03bf041 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/users/TestUserData.java @@ -0,0 +1,19 @@ +package de.juplo.kafka.wordcount.users; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + + +@AllArgsConstructor(staticName = "of") +@NoArgsConstructor +@Data +public class TestUserData +{ + public enum Sex { FEMALE, MALE, OTHER } + + String username; + String firstName; + String lastName; + Sex sex; +} -- 2.20.1