X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fquery%2FQueryStreamProcessorTopologyTest.java;h=fda74087e2b6927e0a479a0a2837dd6dbb4f8fba;hb=a07e20011b1a22d120920e84c36683a9d42c3ac5;hp=845792cfe0da43b216b3cddd6a4ff2b90facf986;hpb=278c7b8125c82120e1d80fa640bd16d375d4bf86;p=demos%2Fkafka%2Fwordcount diff --git a/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java index 845792c..fda7408 100644 --- a/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java @@ -24,7 +24,8 @@ public class QueryStreamProcessorTopologyTest { public static final String TOP10_IN = "TOP10-IN"; public static final String USERS_IN = "USERS-IN"; - public static final String STORE_NAME = "TOPOLOGY-TEST"; + public static final String RANKING_STORE_NAME = "TOPOLOGY-TEST-RANKINGS"; + public static final String USERS_STORE_NAME = "TOPOLOGY-TEST-USERS"; TopologyTestDriver testDriver; @@ -38,7 +39,8 @@ public class QueryStreamProcessorTopologyTest Topology topology = QueryStreamProcessor.buildTopology( USERS_IN, TOP10_IN, - Stores.inMemoryKeyValueStore(STORE_NAME)); + Stores.inMemoryKeyValueStore(USERS_STORE_NAME), + Stores.inMemoryKeyValueStore(RANKING_STORE_NAME)); testDriver = new TopologyTestDriver(topology, serializationConfig()); @@ -50,7 +52,7 @@ public class QueryStreamProcessorTopologyTest userIn = testDriver.createInputTopic( USERS_IN, new StringSerializer(), - jsonSerializer(TestUserData.class)); + jsonSerializer(TestUserData.class).noTypeInfo()); } @@ -64,7 +66,7 @@ public class QueryStreamProcessorTopologyTest .getTop10Messages() .forEach(kv -> top10In.pipeInput(kv.key, kv.value)); - KeyValueStore store = testDriver.getKeyValueStore(STORE_NAME); + KeyValueStore store = testDriver.getKeyValueStore(RANKING_STORE_NAME); TestData.assertExpectedState(user -> store.get(user)); } @@ -80,7 +82,6 @@ public class QueryStreamProcessorTopologyTest jsonSerializer.configure( Map.of( JsonSerializer.TYPE_MAPPINGS, - "userdata:" + TestUserData.class.getName() + "," + "ranking:" + TestRanking.class.getName()), false); return jsonSerializer;