--- /dev/null
+package de.juplo.kafka.wordcount.query;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.wordcount.top10.TestRanking;
+import de.juplo.kafka.wordcount.users.TestUserData;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.TestInputTopic;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.TopologyTestDriver;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.Stores;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.springframework.kafka.support.serializer.JsonSerializer;
+
+import static de.juplo.kafka.wordcount.query.QueryApplicationConfiguration.serializationConfig;
+
+
+@Slf4j
+public class QueryStreamProcessorTopologyTest
+{
+  public static final String TOP10_IN = "TOP10-IN";
+  public static final String USERS_IN = "USERS-IN";
+  public static final String STORE_NAME = "TOPOLOGY-TEST";
+
+
+  TopologyTestDriver testDriver;
+  TestInputTopic<String, TestRanking> top10In;
+  TestInputTopic<String, TestUserData> userIn;
+
+
+  @BeforeEach
+  public void setUp()
+  {
+    Topology topology = QueryStreamProcessor.buildTopology(
+        USERS_IN,
+        TOP10_IN,
+        Stores.inMemoryKeyValueStore(STORE_NAME),
+        new ObjectMapper());
+
+    testDriver = new TopologyTestDriver(topology, serializationConfig());
+
+    top10In = testDriver.createInputTopic(
+        TOP10_IN,
+        new StringSerializer(),
+        jsonSerializer(TestRanking.class,false));
+
+    userIn = testDriver.createInputTopic(
+        USERS_IN,
+        new StringSerializer(),
+        jsonSerializer(TestUserData.class,false));
+  }
+
+
+  @Test
+  public void test()
+  {
+    TestData
+        .getUsersMessages()
+        .forEach(kv -> userIn.pipeInput(kv.key, kv.value));
+    TestData
+        .getTop10Messages()
+        .forEach(kv -> top10In.pipeInput(kv.key, kv.value));
+
+    KeyValueStore<String, String> store = testDriver.getKeyValueStore(STORE_NAME);
+    TestData.assertExpectedState(store);
+  }
+
+  @AfterEach
+  public void tearDown()
+  {
+    testDriver.close();
+  }
+
+  private <T> JsonSerializer<T> jsonSerializer(Class<T> type, boolean isKey)
+  {
+    JsonSerializer<T> jsonSerializer = new JsonSerializer<>();
+    return jsonSerializer;
+  }
+}
 
--- /dev/null
+package de.juplo.kafka.wordcount.query;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.wordcount.top10.TestEntry;
+import de.juplo.kafka.wordcount.top10.TestRanking;
+import de.juplo.kafka.wordcount.users.TestUserData;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+
+import java.util.Arrays;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+
+class TestData
+{
+       static final ObjectMapper objectMapper = new ObjectMapper();
+       static final String PETER = "peter";
+       static final String KLAUS = "klaus";
+
+       static final Stream<KeyValue<String, TestRanking>> getTop10Messages()
+       {
+               return Stream.of(TOP10_MESSAGES);
+       }
+
+       static final Stream<KeyValue<String, TestUserData>> getUsersMessages()
+       {
+               return Stream.of(USERS_MESSAGES);
+       }
+
+       static void assertExpectedState(ReadOnlyKeyValueStore<String, String> store)
+       {
+               assertRankingEqualsRankingFromLastMessage(PETER, store.get(PETER));
+               assertRankingEqualsRankingFromLastMessage(KLAUS, store.get(KLAUS));
+       }
+
+       private static void assertRankingEqualsRankingFromLastMessage(String user, String userRankingJson)
+       {
+               assertThat(userRankingOf(userRankingJson)).isEqualTo(getLastMessageFor(user));
+       }
+
+       private static UserRanking userRankingOf(String json)
+       {
+               try
+               {
+                       return objectMapper.readValue(json, UserRanking.class);
+               }
+               catch (Exception e)
+               {
+                       throw new RuntimeException(e);
+               }
+       }
+
+       private static UserRanking getLastMessageFor(String user)
+       {
+               return getTop10Messages()
+                               .filter(kv -> kv.key.equals(user))
+                               .map(kv -> kv.value)
+                               .map(testRanking -> userRankingFor(user, testRanking))
+                               .reduce(null, (left, right) -> right);
+       }
+
+       private static UserRanking userRankingFor(String user, TestRanking testRanking)
+       {
+               TestUserData testUserData = getUsersMessages()
+                               .filter(kv -> kv.key.equals(user))
+                               .map(kv -> kv.value)
+                               .reduce(null, (left, right) -> right);
+
+               Entry[] entries = Arrays
+                               .stream(testRanking.getEntries())
+                               .map(testEntry -> entryOf(testEntry))
+                               .toArray(size -> new Entry[size]);
+
+               return UserRanking.of(
+                               testUserData.getFirstName(),
+                               testUserData.getLastName(),
+                               entries);
+       }
+
+       private static Entry entryOf(TestEntry testEntry)
+       {
+               Entry entry = new Entry();
+               entry.setWord(testEntry.getWord());
+               entry.setCount(testEntry.getCount());
+               return entry;
+       }
+
+       private static KeyValue<String, TestRanking>[] TOP10_MESSAGES = new KeyValue[]
+       {
+                       KeyValue.pair( // 0
+                                       PETER,
+                                       TestRanking.of(
+                                                       TestEntry.of("Hallo", 1l))),
+                       KeyValue.pair( // 1
+                                       KLAUS,
+                                       TestRanking.of(
+                                                       TestEntry.of("Müsch", 1l))),
+                       KeyValue.pair( // 2
+                                       PETER,
+                                       TestRanking.of(
+                                                       TestEntry.of("Hallo", 1l),
+                                                       TestEntry.of("Welt", 1l))),
+                       KeyValue.pair( // 3
+                                       KLAUS,
+                                       TestRanking.of(
+                                                       TestEntry.of("Müsch", 2l))),
+                       KeyValue.pair( // 4
+                                       KLAUS,
+                                       TestRanking.of(
+                                                       TestEntry.of("Müsch", 2l),
+                                                       TestEntry.of("s", 1l))),
+                       KeyValue.pair( // 5
+                                       PETER,
+                                       TestRanking.of(
+                                                       TestEntry.of("Hallo", 1l),
+                                                       TestEntry.of("Welt", 1l),
+                                                       TestEntry.of("Boäh", 1l))),
+                       KeyValue.pair( // 6
+                                       PETER,
+                                       TestRanking.of(
+                                                       TestEntry.of("Welt", 2l),
+                                                       TestEntry.of("Hallo", 1l),
+                                                       TestEntry.of("Boäh", 1l))),
+                       KeyValue.pair( // 7
+                                       PETER,
+                                       TestRanking.of(
+                                                       TestEntry.of("Welt", 2l),
+                                                       TestEntry.of("Boäh", 2l),
+                                                       TestEntry.of("Hallo", 1l))),
+                       KeyValue.pair( // 8
+                                       KLAUS,
+                                       TestRanking.of(
+                                                       TestEntry.of("Müsch", 2l),
+                                                       TestEntry.of("s", 2l))),
+                       KeyValue.pair( // 9
+                                       PETER,
+                                       TestRanking.of(
+                                                       TestEntry.of("Boäh", 3l),
+                                                       TestEntry.of("Welt", 2l),
+                                                       TestEntry.of("Hallo", 1l))),
+                       KeyValue.pair( // 10
+                                       KLAUS,
+                                       TestRanking.of(
+                                                       TestEntry.of("s", 3l),
+                                                       TestEntry.of("Müsch", 2l))),
+       };
+
+       private static KeyValue<String, TestUserData>[] USERS_MESSAGES = new KeyValue[]
+       {
+                       KeyValue.pair(
+                                       PETER,
+                                       TestUserData.of(PETER, "Peter", "Pan", TestUserData.Sex.MALE)),
+                       KeyValue.pair(
+                                       KLAUS,
+                                       TestUserData.of(KLAUS, "Klaus", "Klüse", TestUserData.Sex.OTHER)),
+       };
+}