From: Kai Moritz Date: Sun, 9 Jun 2024 18:44:35 +0000 (+0200) Subject: query: 2.0.0 - (RED) Formulated expectations for JSON-values X-Git-Tag: query-with-kafkaproducer~8 X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=57f47fca712981116b726e437f589ac727c5d0a7;p=demos%2Fkafka%2Fwordcount query: 2.0.0 - (RED) Formulated expectations for JSON-values --- diff --git a/pom.xml b/pom.xml index d7f3a30..60ea716 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ de.juplo.kafka.wordcount query - 1.0.6 + 2.0.0 Wordcount-Query Query stream-processor of the multi-user wordcount-example diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java index c4ae4ae..ff7c150 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java @@ -88,7 +88,7 @@ public class QueryStreamProcessor return topology; } - ReadOnlyKeyValueStore getStore() + ReadOnlyKeyValueStore getStore() { return streams.store(storeParameters); } diff --git a/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java index e38871f..a9cca10 100644 --- a/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/query/QueryApplicationIT.java @@ -1,5 +1,6 @@ package de.juplo.kafka.wordcount.query; +import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.Stores; @@ -32,7 +33,7 @@ import static org.springframework.test.web.servlet.result.MockMvcResultMatchers. properties = { "spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer", "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer", - "spring.kafka.producer.properties.spring.json.add.type.headers=false", + "spring.kafka.producer.properties.spring.json.type.mapping=userdata:de.juplo.kafka.wordcount.users.TestUserData,ranking:de.juplo.kafka.wordcount.top10.TestRanking", "logging.level.root=WARN", "logging.level.de.juplo=DEBUG", "logging.level.org.apache.kafka.clients=INFO", @@ -52,6 +53,8 @@ public class QueryApplicationIT @Autowired MockMvc mockMvc; @Autowired + ObjectMapper objectMapper; + @Autowired QueryStreamProcessor streamProcessor; @@ -103,17 +106,19 @@ public class QueryApplicationIT .untilAsserted(() -> TestData.assertExpectedState(user -> requestUserRankingFor(user))); } - private String requestUserRankingFor(String user) + private UserRanking requestUserRankingFor(String user) { try { - return mockMvc - .perform(MockMvcRequestBuilders.get("/{user}", user) - .contentType(MediaType.APPLICATION_JSON)) - .andExpect(status().isOk()) - .andReturn() - .getResponse() - .getContentAsString(StandardCharsets.UTF_8); + return objectMapper.readValue( + mockMvc + .perform(MockMvcRequestBuilders.get("/{user}", user) + .contentType(MediaType.APPLICATION_JSON)) + .andExpect(status().isOk()) + .andReturn() + .getResponse() + .getContentAsString(StandardCharsets.UTF_8), + UserRanking.class); } catch (Exception e) { diff --git a/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java index 6bdd8fa..eef1eec 100644 --- a/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/query/QueryStreamProcessorTopologyTest.java @@ -15,6 +15,8 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.springframework.kafka.support.serializer.JsonSerializer; +import java.util.Map; + import static de.juplo.kafka.wordcount.query.QueryApplicationConfiguration.serializationConfig; @@ -45,12 +47,12 @@ public class QueryStreamProcessorTopologyTest top10In = testDriver.createInputTopic( TOP10_IN, new StringSerializer(), - new JsonSerializer()); + jsonSerializer(TestRanking.class)); userIn = testDriver.createInputTopic( USERS_IN, new StringSerializer(), - new JsonSerializer()); + jsonSerializer(TestUserData.class)); } @@ -64,7 +66,7 @@ public class QueryStreamProcessorTopologyTest .getTop10Messages() .forEach(kv -> top10In.pipeInput(kv.key, kv.value)); - KeyValueStore store = testDriver.getKeyValueStore(STORE_NAME); + KeyValueStore store = testDriver.getKeyValueStore(STORE_NAME); TestData.assertExpectedState(user -> store.get(user)); } @@ -73,4 +75,16 @@ public class QueryStreamProcessorTopologyTest { testDriver.close(); } + + private JsonSerializer jsonSerializer(Class type) + { + JsonSerializer jsonSerializer = new JsonSerializer<>(); + jsonSerializer.configure( + Map.of( + JsonSerializer.TYPE_MAPPINGS, + "userdata:" + TestUserData.class.getName() + "," + + "ranking:" + TestRanking.class.getName()), + false); + return jsonSerializer; + } } diff --git a/src/test/java/de/juplo/kafka/wordcount/query/TestData.java b/src/test/java/de/juplo/kafka/wordcount/query/TestData.java index 3fcd7c9..1fe34d9 100644 --- a/src/test/java/de/juplo/kafka/wordcount/query/TestData.java +++ b/src/test/java/de/juplo/kafka/wordcount/query/TestData.java @@ -1,6 +1,5 @@ package de.juplo.kafka.wordcount.query; -import com.fasterxml.jackson.databind.ObjectMapper; import de.juplo.kafka.wordcount.top10.TestEntry; import de.juplo.kafka.wordcount.top10.TestRanking; import de.juplo.kafka.wordcount.users.TestUserData; @@ -15,7 +14,6 @@ import static org.assertj.core.api.Assertions.assertThat; class TestData { - static final ObjectMapper objectMapper = new ObjectMapper(); static final String PETER = "peter"; static final String KLAUS = "klaus"; @@ -29,32 +27,15 @@ class TestData return Stream.of(USERS_MESSAGES); } - static void assertExpectedState(Function function) + static void assertExpectedState(Function function) { assertRankingEqualsRankingFromLastMessage(PETER, function.apply(PETER)); assertRankingEqualsRankingFromLastMessage(KLAUS, function.apply(KLAUS)); } - private static void assertRankingEqualsRankingFromLastMessage(String user, String userRankingJson) + private static void assertRankingEqualsRankingFromLastMessage(String user, UserRanking rankingJson) { - assertThat(userRankingOf(userRankingJson)).isEqualTo(getLastMessageFor(user)); - } - - private static UserRanking userRankingOf(String json) - { - if (json == null) - { - return null; - } - - try - { - return objectMapper.readValue(json, UserRanking.class); - } - catch (Exception e) - { - throw new RuntimeException(e); - } + assertThat(rankingJson).isEqualTo(getLastMessageFor(user)); } private static UserRanking getLastMessageFor(String user)