--- /dev/null
+package de.juplo.kafka.wordcount.query;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.wordcount.top10.TestRanking;
+import de.juplo.kafka.wordcount.users.TestUserData;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Stores;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.kafka.support.serializer.JsonSerializer;
+
+import static de.juplo.kafka.wordcount.query.QueryApplicationConfiguration.serializationConfig;
+
+
+@Slf4j
+public class QueryStreamProcessorTopologyTest
+{
+ public static final String TOP10_IN = "TOP10-IN";
+ public static final String USERS_IN = "USERS-IN";
+ public static final String STORE_NAME = "TOPOLOGY-TEST";
+
+
+ TopologyTestDriver testDriver;
+ TestInputTopic<String, TestRanking> top10In;
+ TestInputTopic<String, TestUserData> userIn;
+
+
+ @BeforeEach
+ public void setUp()
+ {
+ Topology topology = QueryStreamProcessor.buildTopology(
+ USERS_IN,
+ TOP10_IN,
+ Stores.inMemoryKeyValueStore(STORE_NAME),
+ new ObjectMapper());
+
+ testDriver = new TopologyTestDriver(topology, serializationConfig());
+
+ top10In = testDriver.createInputTopic(
+ TOP10_IN,
+ new StringSerializer(),
+ new JsonSerializer());
+
+ userIn = testDriver.createInputTopic(
+ USERS_IN,
+ new StringSerializer(),
+ new JsonSerializer());
+ }
+
+
+ @Test
+ public void test()
+ {
+ TestData
+ .getUsersMessages()
+ .forEach(kv -> userIn.pipeInput(kv.key, kv.value));
+ TestData
+ .getTop10Messages()
+ .forEach(kv -> top10In.pipeInput(kv.key, kv.value));
+
+ KeyValueStore<String, String> store = testDriver.getKeyValueStore(STORE_NAME);
+ TestData.assertExpectedState(store);
+ }
+
+ @AfterEach
+ public void tearDown()
+ {
+ testDriver.close();
+ }
+}
--- /dev/null
+package de.juplo.kafka.wordcount.query;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.wordcount.top10.TestEntry;
+import de.juplo.kafka.wordcount.top10.TestRanking;
+import de.juplo.kafka.wordcount.users.TestUserData;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+
+import java.util.Arrays;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+
+class TestData
+{
+ static final ObjectMapper objectMapper = new ObjectMapper();
+ static final String PETER = "peter";
+ static final String KLAUS = "klaus";
+
+ static final Stream<KeyValue<String, TestRanking>> getTop10Messages()
+ {
+ return Stream.of(TOP10_MESSAGES);
+ }
+
+ static final Stream<KeyValue<String, TestUserData>> getUsersMessages()
+ {
+ return Stream.of(USERS_MESSAGES);
+ }
+
+ static void assertExpectedState(ReadOnlyKeyValueStore<String, String> store)
+ {
+ assertRankingEqualsRankingFromLastMessage(PETER, store.get(PETER));
+ assertRankingEqualsRankingFromLastMessage(KLAUS, store.get(KLAUS));
+ }
+
+ private static void assertRankingEqualsRankingFromLastMessage(String user, String userRankingJson)
+ {
+ assertThat(userRankingOf(userRankingJson)).isEqualTo(getLastMessageFor(user));
+ }
+
+ private static UserRanking userRankingOf(String json)
+ {
+ try
+ {
+ return objectMapper.readValue(json, UserRanking.class);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private static UserRanking getLastMessageFor(String user)
+ {
+ return getTop10Messages()
+ .filter(kv -> kv.key.equals(user))
+ .map(kv -> kv.value)
+ .map(testRanking -> userRankingFor(user, testRanking))
+ .reduce(null, (left, right) -> right);
+ }
+
+ private static UserRanking userRankingFor(String user, TestRanking testRanking)
+ {
+ TestUserData testUserData = getUsersMessages()
+ .filter(kv -> kv.key.equals(user))
+ .map(kv -> kv.value)
+ .reduce(null, (left, right) -> right);
+
+ Entry[] entries = Arrays
+ .stream(testRanking.getEntries())
+ .map(testEntry -> entryOf(testEntry))
+ .toArray(size -> new Entry[size]);
+
+ return UserRanking.of(
+ testUserData.getFirstName(),
+ testUserData.getLastName(),
+ entries);
+ }
+
+ private static Entry entryOf(TestEntry testEntry)
+ {
+ Entry entry = new Entry();
+ entry.setWord(testEntry.getWord());
+ entry.setCount(testEntry.getCount());
+ return entry;
+ }
+
+ private static KeyValue<String, TestRanking>[] TOP10_MESSAGES = new KeyValue[]
+ {
+ KeyValue.pair( // 0
+ PETER,
+ TestRanking.of(
+ TestEntry.of("Hallo", 1l))),
+ KeyValue.pair( // 1
+ KLAUS,
+ TestRanking.of(
+ TestEntry.of("Müsch", 1l))),
+ KeyValue.pair( // 2
+ PETER,
+ TestRanking.of(
+ TestEntry.of("Hallo", 1l),
+ TestEntry.of("Welt", 1l))),
+ KeyValue.pair( // 3
+ KLAUS,
+ TestRanking.of(
+ TestEntry.of("Müsch", 2l))),
+ KeyValue.pair( // 4
+ KLAUS,
+ TestRanking.of(
+ TestEntry.of("Müsch", 2l),
+ TestEntry.of("s", 1l))),
+ KeyValue.pair( // 5
+ PETER,
+ TestRanking.of(
+ TestEntry.of("Hallo", 1l),
+ TestEntry.of("Welt", 1l),
+ TestEntry.of("Boäh", 1l))),
+ KeyValue.pair( // 6
+ PETER,
+ TestRanking.of(
+ TestEntry.of("Welt", 2l),
+ TestEntry.of("Hallo", 1l),
+ TestEntry.of("Boäh", 1l))),
+ KeyValue.pair( // 7
+ PETER,
+ TestRanking.of(
+ TestEntry.of("Welt", 2l),
+ TestEntry.of("Boäh", 2l),
+ TestEntry.of("Hallo", 1l))),
+ KeyValue.pair( // 8
+ KLAUS,
+ TestRanking.of(
+ TestEntry.of("Müsch", 2l),
+ TestEntry.of("s", 2l))),
+ KeyValue.pair( // 9
+ PETER,
+ TestRanking.of(
+ TestEntry.of("Boäh", 3l),
+ TestEntry.of("Welt", 2l),
+ TestEntry.of("Hallo", 1l))),
+ KeyValue.pair( // 10
+ KLAUS,
+ TestRanking.of(
+ TestEntry.of("s", 3l),
+ TestEntry.of("Müsch", 2l))),
+ };
+
+ private static KeyValue<String, TestUserData>[] USERS_MESSAGES = new KeyValue[]
+ {
+ KeyValue.pair(
+ PETER,
+ TestUserData.of(PETER, "Peter", "Pan", TestUserData.Sex.MALE)),
+ KeyValue.pair(
+ KLAUS,
+ TestUserData.of(KLAUS, "Klaus", "Klüse", TestUserData.Sex.OTHER)),
+ };
+}