X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fquery%2FQueryStreamProcessorTopologyTest.java;h=203c813875d6131c747af31960878870c4b6f995;hb=700f80444d14b201f7b696fb5b7bcab0d767f007;hp=6bdd8fa402460759f0dc9e45061a26b7c7244f20;hpb=cfd58858b9861e8d0e4c1d30896505a50f63255b;p=demos%2Fkafka%2Fwordcount diff --git a/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java index 6bdd8fa..203c813 100644 --- a/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java @@ -1,7 +1,7 @@ package de.juplo.kafka.wordcount.query; -import com.fasterxml.jackson.databind.ObjectMapper; import de.juplo.kafka.wordcount.top10.TestRanking; +import de.juplo.kafka.wordcount.top10.TestUser; import de.juplo.kafka.wordcount.users.TestUserData; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.serialization.StringSerializer; @@ -15,6 +15,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.kafka.support.serializer.JsonSerializer; +import java.util.Map; + import static de.juplo.kafka.wordcount.query.QueryApplicationConfiguration.serializationConfig; @@ -23,11 +25,12 @@ public class QueryStreamProcessorTopologyTest { public static final String TOP10_IN = "TOP10-IN"; public static final String USERS_IN = "USERS-IN"; - public static final String STORE_NAME = "TOPOLOGY-TEST"; + public static final String RANKING_STORE_NAME = "TOPOLOGY-TEST-RANKINGS"; + public static final String USERS_STORE_NAME = "TOPOLOGY-TEST-USERS"; TopologyTestDriver testDriver; - TestInputTopic top10In; + TestInputTopic top10In; TestInputTopic userIn; @@ -37,20 +40,20 @@ public class QueryStreamProcessorTopologyTest Topology topology = QueryStreamProcessor.buildTopology( USERS_IN, TOP10_IN, - Stores.inMemoryKeyValueStore(STORE_NAME), - new ObjectMapper()); + Stores.inMemoryKeyValueStore(USERS_STORE_NAME), + Stores.inMemoryKeyValueStore(RANKING_STORE_NAME)); testDriver = new TopologyTestDriver(topology, serializationConfig()); top10In = testDriver.createInputTopic( TOP10_IN, - new StringSerializer(), - new JsonSerializer()); + jsonSerializer(TestUser.class, true), + jsonSerializer(TestRanking.class,false)); userIn = testDriver.createInputTopic( USERS_IN, new StringSerializer(), - new JsonSerializer()); + jsonSerializer(TestUserData.class, false).noTypeInfo()); } @@ -64,7 +67,7 @@ public class QueryStreamProcessorTopologyTest .getTop10Messages() .forEach(kv -> top10In.pipeInput(kv.key, kv.value)); - KeyValueStore store = testDriver.getKeyValueStore(STORE_NAME); + KeyValueStore store = testDriver.getKeyValueStore(RANKING_STORE_NAME); TestData.assertExpectedState(user -> store.get(user)); } @@ -73,4 +76,16 @@ public class QueryStreamProcessorTopologyTest { testDriver.close(); } + + private JsonSerializer jsonSerializer(Class type, boolean isKey) + { + JsonSerializer jsonSerializer = new JsonSerializer<>(); + jsonSerializer.configure( + Map.of( + JsonSerializer.TYPE_MAPPINGS, + "user:" + TestUser.class.getName() + "," + + "ranking:" + TestRanking.class.getName()), + isKey); + return jsonSerializer; + } }