package de.juplo.kafka.wordcount.query;
+import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.Stores;
properties = {
"spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer",
"spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
- "spring.kafka.producer.properties.spring.json.add.type.headers=false",
+ "spring.kafka.producer.properties.spring.json.type.mapping=userdata:de.juplo.kafka.wordcount.users.TestUserData,ranking:de.juplo.kafka.wordcount.top10.TestRanking",
"logging.level.root=WARN",
"logging.level.de.juplo=DEBUG",
"logging.level.org.apache.kafka.clients=INFO",
@Autowired
MockMvc mockMvc;
@Autowired
+ ObjectMapper objectMapper;
+ @Autowired
QueryStreamProcessor streamProcessor;
.untilAsserted(() -> TestData.assertExpectedState(user -> requestUserRankingFor(user)));
}
- private String requestUserRankingFor(String user)
+ private UserRanking requestUserRankingFor(String user)
{
try
{
- return mockMvc
- .perform(MockMvcRequestBuilders.get("/{user}", user)
- .contentType(MediaType.APPLICATION_JSON))
- .andExpect(status().isOk())
- .andReturn()
- .getResponse()
- .getContentAsString(StandardCharsets.UTF_8);
+ return objectMapper.readValue(
+ mockMvc
+ .perform(MockMvcRequestBuilders.get("/{user}", user)
+ .contentType(MediaType.APPLICATION_JSON))
+ .andExpect(status().isOk())
+ .andReturn()
+ .getResponse()
+ .getContentAsString(StandardCharsets.UTF_8),
+ UserRanking.class);
}
catch (Exception e)
{