From 57f47fca712981116b726e437f589ac727c5d0a7 Mon Sep 17 00:00:00 2001
From: Kai Moritz <kai@juplo.de>
Date: Sun, 9 Jun 2024 20:44:35 +0200
Subject: [PATCH] query: 2.0.0 - (RED) Formulated expectations for JSON-values

---
 pom.xml                                       |  2 +-
 .../wordcount/query/QueryStreamProcessor.java |  2 +-
 .../wordcount/query/QueryApplicationIT.java   | 23 ++++++++++-------
 .../QueryStreamProcessorTopologyTest.java     | 20 ++++++++++++---
 .../juplo/kafka/wordcount/query/TestData.java | 25 +++----------------
 5 files changed, 36 insertions(+), 36 deletions(-)

diff --git a/pom.xml b/pom.xml
index d7f3a30..60ea716 100644
--- a/pom.xml
+++ b/pom.xml
@@ -10,7 +10,7 @@
 	</parent>
 	<groupId>de.juplo.kafka.wordcount</groupId>
 	<artifactId>query</artifactId>
-	<version>1.0.6</version>
+	<version>2.0.0</version>
 	<name>Wordcount-Query</name>
 	<description>Query stream-processor of the multi-user wordcount-example</description>
 	<properties>
diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java
index c4ae4ae..ff7c150 100644
--- a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java
+++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java
@@ -88,7 +88,7 @@ public class QueryStreamProcessor
 		return topology;
 	}
 
-	ReadOnlyKeyValueStore<String, String> getStore()
+	ReadOnlyKeyValueStore<String, UserRanking> getStore()
 	{
 		return streams.store(storeParameters);
 	}
diff --git a/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java
index e38871f..a9cca10 100644
--- a/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java
+++ b/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java
@@ -1,5 +1,6 @@
 package de.juplo.kafka.wordcount.query;
 
+import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.Stores;
@@ -32,7 +33,7 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.
 		properties = {
 				"spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer",
 				"spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
-				"spring.kafka.producer.properties.spring.json.add.type.headers=false",
+				"spring.kafka.producer.properties.spring.json.type.mapping=userdata:de.juplo.kafka.wordcount.users.TestUserData,ranking:de.juplo.kafka.wordcount.top10.TestRanking",
 				"logging.level.root=WARN",
 				"logging.level.de.juplo=DEBUG",
 				"logging.level.org.apache.kafka.clients=INFO",
@@ -52,6 +53,8 @@ public class QueryApplicationIT
 	@Autowired
 	MockMvc mockMvc;
 	@Autowired
+	ObjectMapper objectMapper;
+	@Autowired
 	QueryStreamProcessor streamProcessor;
 
 
@@ -103,17 +106,19 @@ public class QueryApplicationIT
 				.untilAsserted(() -> TestData.assertExpectedState(user -> requestUserRankingFor(user)));
 	}
 
-	private String requestUserRankingFor(String user)
+	private UserRanking requestUserRankingFor(String user)
 	{
 		try
 		{
-			return mockMvc
-					.perform(MockMvcRequestBuilders.get("/{user}", user)
-							.contentType(MediaType.APPLICATION_JSON))
-					.andExpect(status().isOk())
-					.andReturn()
-					.getResponse()
-					.getContentAsString(StandardCharsets.UTF_8);
+			return objectMapper.readValue(
+					mockMvc
+							.perform(MockMvcRequestBuilders.get("/{user}", user)
+									.contentType(MediaType.APPLICATION_JSON))
+							.andExpect(status().isOk())
+							.andReturn()
+							.getResponse()
+							.getContentAsString(StandardCharsets.UTF_8),
+					UserRanking.class);
 		}
 		catch (Exception e)
 		{
diff --git a/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java
index 6bdd8fa..eef1eec 100644
--- a/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java
+++ b/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java
@@ -15,6 +15,8 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.springframework.kafka.support.serializer.JsonSerializer;
 
+import java.util.Map;
+
 import static de.juplo.kafka.wordcount.query.QueryApplicationConfiguration.serializationConfig;
 
 
@@ -45,12 +47,12 @@ public class QueryStreamProcessorTopologyTest
     top10In = testDriver.createInputTopic(
         TOP10_IN,
         new StringSerializer(),
-        new JsonSerializer());
+        jsonSerializer(TestRanking.class));
 
     userIn = testDriver.createInputTopic(
         USERS_IN,
         new StringSerializer(),
-        new JsonSerializer());
+        jsonSerializer(TestUserData.class));
   }
 
 
@@ -64,7 +66,7 @@ public class QueryStreamProcessorTopologyTest
         .getTop10Messages()
         .forEach(kv -> top10In.pipeInput(kv.key, kv.value));
 
-    KeyValueStore<String, String> store = testDriver.getKeyValueStore(STORE_NAME);
+    KeyValueStore<String, UserRanking> store = testDriver.getKeyValueStore(STORE_NAME);
     TestData.assertExpectedState(user -> store.get(user));
   }
 
@@ -73,4 +75,16 @@ public class QueryStreamProcessorTopologyTest
   {
     testDriver.close();
   }
+
+  private <T> JsonSerializer<T> jsonSerializer(Class<T> type)
+  {
+    JsonSerializer<T> jsonSerializer = new JsonSerializer<>();
+    jsonSerializer.configure(
+        Map.of(
+            JsonSerializer.TYPE_MAPPINGS,
+            "userdata:" + TestUserData.class.getName() + "," +
+            "ranking:" + TestRanking.class.getName()),
+        false);
+    return jsonSerializer;
+  }
 }
diff --git a/src/test/java/de/juplo/kafka/wordcount/query/TestData.java b/src/test/java/de/juplo/kafka/wordcount/query/TestData.java
index 3fcd7c9..1fe34d9 100644
--- a/src/test/java/de/juplo/kafka/wordcount/query/TestData.java
+++ b/src/test/java/de/juplo/kafka/wordcount/query/TestData.java
@@ -1,6 +1,5 @@
 package de.juplo.kafka.wordcount.query;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
 import de.juplo.kafka.wordcount.top10.TestEntry;
 import de.juplo.kafka.wordcount.top10.TestRanking;
 import de.juplo.kafka.wordcount.users.TestUserData;
@@ -15,7 +14,6 @@ import static org.assertj.core.api.Assertions.assertThat;
 
 class TestData
 {
-	static final ObjectMapper objectMapper = new ObjectMapper();
 	static final String PETER = "peter";
 	static final String KLAUS = "klaus";
 
@@ -29,32 +27,15 @@ class TestData
 		return Stream.of(USERS_MESSAGES);
 	}
 
-	static void assertExpectedState(Function<String, String> function)
+	static void assertExpectedState(Function<String, UserRanking> function)
 	{
 		assertRankingEqualsRankingFromLastMessage(PETER, function.apply(PETER));
 		assertRankingEqualsRankingFromLastMessage(KLAUS, function.apply(KLAUS));
 	}
 
-	private static void assertRankingEqualsRankingFromLastMessage(String user, String userRankingJson)
+	private static void assertRankingEqualsRankingFromLastMessage(String user, UserRanking rankingJson)
 	{
-		assertThat(userRankingOf(userRankingJson)).isEqualTo(getLastMessageFor(user));
-	}
-
-	private static UserRanking userRankingOf(String json)
-	{
-		if (json == null)
-		{
-			return null;
-		}
-
-		try
-		{
-			return objectMapper.readValue(json, UserRanking.class);
-		}
-		catch (Exception e)
-		{
-			throw new RuntimeException(e);
-		}
+		assertThat(rankingJson).isEqualTo(getLastMessageFor(user));
 	}
 
 	private static UserRanking getLastMessageFor(String user)
-- 
2.20.1