X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fquery%2FQueryStreamProcessorTopologyTest.java;h=eef1eecc39642a5692a860fabef9ea37013223b3;hb=57f47fca712981116b726e437f589ac727c5d0a7;hp=8439be17ae94878505f9cae2f38d18a8cd299195;hpb=213321a01c3b53f86eb2dd97c3f6241e9cad9045;p=demos%2Fkafka%2Fwordcount diff --git a/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java index 8439be1..eef1eec 100644 --- a/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java @@ -15,6 +15,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.kafka.support.serializer.JsonSerializer; +import java.util.Map; + import static de.juplo.kafka.wordcount.query.QueryApplicationConfiguration.serializationConfig; @@ -45,12 +47,12 @@ public class QueryStreamProcessorTopologyTest top10In = testDriver.createInputTopic( TOP10_IN, new StringSerializer(), - new JsonSerializer()); + jsonSerializer(TestRanking.class)); userIn = testDriver.createInputTopic( USERS_IN, new StringSerializer(), - new JsonSerializer()); + jsonSerializer(TestUserData.class)); } @@ -64,8 +66,8 @@ public class QueryStreamProcessorTopologyTest .getTop10Messages() .forEach(kv -> top10In.pipeInput(kv.key, kv.value)); - KeyValueStore store = testDriver.getKeyValueStore(STORE_NAME); - TestData.assertExpectedState(store); + KeyValueStore store = testDriver.getKeyValueStore(STORE_NAME); + TestData.assertExpectedState(user -> store.get(user)); } @AfterEach @@ -73,4 +75,16 @@ public class QueryStreamProcessorTopologyTest { testDriver.close(); } + + private JsonSerializer jsonSerializer(Class type) + { + JsonSerializer jsonSerializer = new JsonSerializer<>(); + jsonSerializer.configure( + Map.of( + JsonSerializer.TYPE_MAPPINGS, + "userdata:" + TestUserData.class.getName() + "," + + "ranking:" + TestRanking.class.getName()), + false); + return jsonSerializer; + } }