query: 1.0.6 - Added `QueryStreamProcessorTopologyTest`
authorKai Moritz <kai@juplo.de>
Sun, 9 Jun 2024 10:09:49 +0000 (12:09 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 9 Jun 2024 17:53:05 +0000 (19:53 +0200)
pom.xml
src/main/java/de/juplo/kafka/wordcount/query/Entry.java
src/main/java/de/juplo/kafka/wordcount/query/Key.java
src/main/java/de/juplo/kafka/wordcount/query/UserRanking.java
src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/wordcount/query/TestData.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/wordcount/top10/TestEntry.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/wordcount/top10/TestRanking.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/wordcount/users/TestUserData.java [new file with mode: 0644]

diff --git a/pom.xml b/pom.xml
index b699a81..d74c826 100644 (file)
--- a/pom.xml
+++ b/pom.xml
                        <artifactId>spring-boot-starter-test</artifactId>
                        <scope>test</scope>
                </dependency>
+               <dependency>
+                       <groupId>org.springframework.kafka</groupId>
+                       <artifactId>spring-kafka</artifactId>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.springframework.kafka</groupId>
+                       <artifactId>spring-kafka-test</artifactId>
+                       <scope>test</scope>
+               </dependency>
+               <dependency>
+                       <groupId>org.assertj</groupId>
+                       <artifactId>assertj-core</artifactId>
+                       <scope>test</scope>
+               </dependency>
        </dependencies>
 
        <build>
index 4866e72..80b4daf 100644 (file)
@@ -1,11 +1,11 @@
 package de.juplo.kafka.wordcount.query;
 
-import lombok.Value;
+import lombok.Data;
 
 
-@Value(staticConstructor = "of")
+@Data
 public class Entry
 {
-  private final String word;
-  private final Long count;
+  private String word;
+  private Long count;
 }
index be34ba8..afeac4a 100644 (file)
@@ -1,11 +1,9 @@
 package de.juplo.kafka.wordcount.query;
 
-import lombok.Getter;
-import lombok.Setter;
+import lombok.Data;
 
 
-@Getter
-@Setter
+@Data
 public class Key
 {
   private String username;
index acffd5d..9ca765a 100644 (file)
@@ -1,13 +1,11 @@
 package de.juplo.kafka.wordcount.query;
 
-import lombok.AllArgsConstructor;
-import lombok.Getter;
-import lombok.Setter;
+import lombok.*;
 
 
-@Getter
-@Setter
+@Data
 @AllArgsConstructor(staticName = "of")
+@NoArgsConstructor
 public class UserRanking
 {
   private String firstName;
diff --git a/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java
new file mode 100644 (file)
index 0000000..f4164a6
--- /dev/null
@@ -0,0 +1,82 @@
+package de.juplo.kafka.wordcount.query;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.wordcount.top10.TestRanking;
+import de.juplo.kafka.wordcount.users.TestUserData;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Stores;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.kafka.support.serializer.JsonSerializer;
+
+import static de.juplo.kafka.wordcount.query.QueryApplicationConfiguration.serializationConfig;
+
+
+@Slf4j
+public class QueryStreamProcessorTopologyTest
+{
+  public static final String TOP10_IN = "TOP10-IN";
+  public static final String USERS_IN = "USERS-IN";
+  public static final String STORE_NAME = "TOPOLOGY-TEST";
+
+
+  TopologyTestDriver testDriver;
+  TestInputTopic<String, TestRanking> top10In;
+  TestInputTopic<String, TestUserData> userIn;
+
+
+  @BeforeEach
+  public void setUp()
+  {
+    Topology topology = QueryStreamProcessor.buildTopology(
+        USERS_IN,
+        TOP10_IN,
+        Stores.inMemoryKeyValueStore(STORE_NAME),
+        new ObjectMapper());
+
+    testDriver = new TopologyTestDriver(topology, serializationConfig());
+
+    top10In = testDriver.createInputTopic(
+        TOP10_IN,
+        new StringSerializer(),
+        jsonSerializer(TestRanking.class,false));
+
+    userIn = testDriver.createInputTopic(
+        USERS_IN,
+        new StringSerializer(),
+        jsonSerializer(TestUserData.class,false));
+  }
+
+
+  @Test
+  public void test()
+  {
+    TestData
+        .getUsersMessages()
+        .forEach(kv -> userIn.pipeInput(kv.key, kv.value));
+    TestData
+        .getTop10Messages()
+        .forEach(kv -> top10In.pipeInput(kv.key, kv.value));
+
+    KeyValueStore<String, String> store = testDriver.getKeyValueStore(STORE_NAME);
+    TestData.assertExpectedState(store);
+  }
+
+  @AfterEach
+  public void tearDown()
+  {
+    testDriver.close();
+  }
+
+  private <T> JsonSerializer<T> jsonSerializer(Class<T> type, boolean isKey)
+  {
+    JsonSerializer<T> jsonSerializer = new JsonSerializer<>();
+    return jsonSerializer;
+  }
+}
diff --git a/src/test/java/de/juplo/kafka/wordcount/query/TestData.java b/src/test/java/de/juplo/kafka/wordcount/query/TestData.java
new file mode 100644 (file)
index 0000000..82f7217
--- /dev/null
@@ -0,0 +1,159 @@
+package de.juplo.kafka.wordcount.query;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.wordcount.top10.TestEntry;
+import de.juplo.kafka.wordcount.top10.TestRanking;
+import de.juplo.kafka.wordcount.users.TestUserData;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+
+import java.util.Arrays;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+
+class TestData
+{
+       static final ObjectMapper objectMapper = new ObjectMapper();
+       static final String PETER = "peter";
+       static final String KLAUS = "klaus";
+
+       static final Stream<KeyValue<String, TestRanking>> getTop10Messages()
+       {
+               return Stream.of(TOP10_MESSAGES);
+       }
+
+       static final Stream<KeyValue<String, TestUserData>> getUsersMessages()
+       {
+               return Stream.of(USERS_MESSAGES);
+       }
+
+       static void assertExpectedState(ReadOnlyKeyValueStore<String, String> store)
+       {
+               assertRankingEqualsRankingFromLastMessage(PETER, store.get(PETER));
+               assertRankingEqualsRankingFromLastMessage(KLAUS, store.get(KLAUS));
+       }
+
+       private static void assertRankingEqualsRankingFromLastMessage(String user, String userRankingJson)
+       {
+               assertThat(userRankingOf(userRankingJson)).isEqualTo(getLastMessageFor(user));
+       }
+
+       private static UserRanking userRankingOf(String json)
+       {
+               try
+               {
+                       return objectMapper.readValue(json, UserRanking.class);
+               }
+               catch (Exception e)
+               {
+                       throw new RuntimeException(e);
+               }
+       }
+
+       private static UserRanking getLastMessageFor(String user)
+       {
+               return getTop10Messages()
+                               .filter(kv -> kv.key.equals(user))
+                               .map(kv -> kv.value)
+                               .map(testRanking -> userRankingFor(user, testRanking))
+                               .reduce(null, (left, right) -> right);
+       }
+
+       private static UserRanking userRankingFor(String user, TestRanking testRanking)
+       {
+               TestUserData testUserData = getUsersMessages()
+                               .filter(kv -> kv.key.equals(user))
+                               .map(kv -> kv.value)
+                               .reduce(null, (left, right) -> right);
+
+               Entry[] entries = Arrays
+                               .stream(testRanking.getEntries())
+                               .map(testEntry -> entryOf(testEntry))
+                               .toArray(size -> new Entry[size]);
+
+               return UserRanking.of(
+                               testUserData.getFirstName(),
+                               testUserData.getLastName(),
+                               entries);
+       }
+
+       private static Entry entryOf(TestEntry testEntry)
+       {
+               Entry entry = new Entry();
+               entry.setWord(testEntry.getWord());
+               entry.setCount(testEntry.getCount());
+               return entry;
+       }
+
+       private static KeyValue<String, TestRanking>[] TOP10_MESSAGES = new KeyValue[]
+       {
+                       KeyValue.pair( // 0
+                                       PETER,
+                                       TestRanking.of(
+                                                       TestEntry.of("Hallo", 1l))),
+                       KeyValue.pair( // 1
+                                       KLAUS,
+                                       TestRanking.of(
+                                                       TestEntry.of("Müsch", 1l))),
+                       KeyValue.pair( // 2
+                                       PETER,
+                                       TestRanking.of(
+                                                       TestEntry.of("Hallo", 1l),
+                                                       TestEntry.of("Welt", 1l))),
+                       KeyValue.pair( // 3
+                                       KLAUS,
+                                       TestRanking.of(
+                                                       TestEntry.of("Müsch", 2l))),
+                       KeyValue.pair( // 4
+                                       KLAUS,
+                                       TestRanking.of(
+                                                       TestEntry.of("Müsch", 2l),
+                                                       TestEntry.of("s", 1l))),
+                       KeyValue.pair( // 5
+                                       PETER,
+                                       TestRanking.of(
+                                                       TestEntry.of("Hallo", 1l),
+                                                       TestEntry.of("Welt", 1l),
+                                                       TestEntry.of("Boäh", 1l))),
+                       KeyValue.pair( // 6
+                                       PETER,
+                                       TestRanking.of(
+                                                       TestEntry.of("Welt", 2l),
+                                                       TestEntry.of("Hallo", 1l),
+                                                       TestEntry.of("Boäh", 1l))),
+                       KeyValue.pair( // 7
+                                       PETER,
+                                       TestRanking.of(
+                                                       TestEntry.of("Welt", 2l),
+                                                       TestEntry.of("Boäh", 2l),
+                                                       TestEntry.of("Hallo", 1l))),
+                       KeyValue.pair( // 8
+                                       KLAUS,
+                                       TestRanking.of(
+                                                       TestEntry.of("Müsch", 2l),
+                                                       TestEntry.of("s", 2l))),
+                       KeyValue.pair( // 9
+                                       PETER,
+                                       TestRanking.of(
+                                                       TestEntry.of("Boäh", 3l),
+                                                       TestEntry.of("Welt", 2l),
+                                                       TestEntry.of("Hallo", 1l))),
+                       KeyValue.pair( // 10
+                                       KLAUS,
+                                       TestRanking.of(
+                                                       TestEntry.of("s", 3l),
+                                                       TestEntry.of("Müsch", 2l))),
+       };
+
+       private static KeyValue<String, TestUserData>[] USERS_MESSAGES = new KeyValue[]
+       {
+                       KeyValue.pair(
+                                       PETER,
+                                       TestUserData.of(PETER, "Peter", "Pan", TestUserData.Sex.MALE)),
+                       KeyValue.pair(
+                                       KLAUS,
+                                       TestUserData.of(KLAUS, "Klaus", "Klüse", TestUserData.Sex.OTHER)),
+       };
+}
diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/TestEntry.java b/src/test/java/de/juplo/kafka/wordcount/top10/TestEntry.java
new file mode 100644 (file)
index 0000000..215327f
--- /dev/null
@@ -0,0 +1,15 @@
+package de.juplo.kafka.wordcount.top10;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+
+@AllArgsConstructor(staticName = "of")
+@NoArgsConstructor
+@Data
+public class TestEntry
+{
+  String word;
+  long count;
+}
diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/TestRanking.java b/src/test/java/de/juplo/kafka/wordcount/top10/TestRanking.java
new file mode 100644 (file)
index 0000000..2b49590
--- /dev/null
@@ -0,0 +1,20 @@
+package de.juplo.kafka.wordcount.top10;
+
+import lombok.AccessLevel;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+
+@AllArgsConstructor(access = AccessLevel.PRIVATE)
+@NoArgsConstructor
+@Data
+public class TestRanking
+{
+  private TestEntry[] entries;
+
+  public static TestRanking of(TestEntry... entries)
+  {
+    return new TestRanking(entries);
+  }
+}
diff --git a/src/test/java/de/juplo/kafka/wordcount/users/TestUserData.java b/src/test/java/de/juplo/kafka/wordcount/users/TestUserData.java
new file mode 100644 (file)
index 0000000..03bf041
--- /dev/null
@@ -0,0 +1,19 @@
+package de.juplo.kafka.wordcount.users;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+
+@AllArgsConstructor(staticName = "of")
+@NoArgsConstructor
+@Data
+public class TestUserData
+{
+  public enum Sex { FEMALE, MALE, OTHER }
+
+  String username;
+  String firstName;
+  String lastName;
+  Sex sex;
+}