</parent>
<groupId>de.juplo.kafka.wordcount</groupId>
<artifactId>query</artifactId>
- <version>1.0.6</version>
+ <version>2.0.0</version>
<name>Wordcount-Query</name>
<description>Query stream-processor of the multi-user wordcount-example</description>
<properties>
return topology;
}
- ReadOnlyKeyValueStore<String, String> getStore()
+ ReadOnlyKeyValueStore<String, UserRanking> getStore()
{
return streams.store(storeParameters);
}
package de.juplo.kafka.wordcount.query;
+import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.Stores;
properties = {
"spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer",
"spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
- "spring.kafka.producer.properties.spring.json.add.type.headers=false",
+ "spring.kafka.producer.properties.spring.json.type.mapping=userdata:de.juplo.kafka.wordcount.users.TestUserData,ranking:de.juplo.kafka.wordcount.top10.TestRanking",
"logging.level.root=WARN",
"logging.level.de.juplo=DEBUG",
"logging.level.org.apache.kafka.clients=INFO",
@Autowired
MockMvc mockMvc;
@Autowired
+ ObjectMapper objectMapper;
+ @Autowired
QueryStreamProcessor streamProcessor;
.untilAsserted(() -> TestData.assertExpectedState(user -> requestUserRankingFor(user)));
}
- private String requestUserRankingFor(String user)
+ private UserRanking requestUserRankingFor(String user)
{
try
{
- return mockMvc
- .perform(MockMvcRequestBuilders.get("/{user}", user)
- .contentType(MediaType.APPLICATION_JSON))
- .andExpect(status().isOk())
- .andReturn()
- .getResponse()
- .getContentAsString(StandardCharsets.UTF_8);
+ return objectMapper.readValue(
+ mockMvc
+ .perform(MockMvcRequestBuilders.get("/{user}", user)
+ .contentType(MediaType.APPLICATION_JSON))
+ .andExpect(status().isOk())
+ .andReturn()
+ .getResponse()
+ .getContentAsString(StandardCharsets.UTF_8),
+ UserRanking.class);
}
catch (Exception e)
{
import org.junit.jupiter.api.Test;
import org.springframework.kafka.support.serializer.JsonSerializer;
+import java.util.Map;
+
import static de.juplo.kafka.wordcount.query.QueryApplicationConfiguration.serializationConfig;
top10In = testDriver.createInputTopic(
TOP10_IN,
new StringSerializer(),
- new JsonSerializer());
+ jsonSerializer(TestRanking.class));
userIn = testDriver.createInputTopic(
USERS_IN,
new StringSerializer(),
- new JsonSerializer());
+ jsonSerializer(TestUserData.class));
}
.getTop10Messages()
.forEach(kv -> top10In.pipeInput(kv.key, kv.value));
- KeyValueStore<String, String> store = testDriver.getKeyValueStore(STORE_NAME);
+ KeyValueStore<String, UserRanking> store = testDriver.getKeyValueStore(STORE_NAME);
TestData.assertExpectedState(user -> store.get(user));
}
{
testDriver.close();
}
+
+ private <T> JsonSerializer<T> jsonSerializer(Class<T> type)
+ {
+ JsonSerializer<T> jsonSerializer = new JsonSerializer<>();
+ jsonSerializer.configure(
+ Map.of(
+ JsonSerializer.TYPE_MAPPINGS,
+ "userdata:" + TestUserData.class.getName() + "," +
+ "ranking:" + TestRanking.class.getName()),
+ false);
+ return jsonSerializer;
+ }
}
package de.juplo.kafka.wordcount.query;
-import com.fasterxml.jackson.databind.ObjectMapper;
import de.juplo.kafka.wordcount.top10.TestEntry;
import de.juplo.kafka.wordcount.top10.TestRanking;
import de.juplo.kafka.wordcount.users.TestUserData;
class TestData
{
- static final ObjectMapper objectMapper = new ObjectMapper();
static final String PETER = "peter";
static final String KLAUS = "klaus";
return Stream.of(USERS_MESSAGES);
}
- static void assertExpectedState(Function<String, String> function)
+ static void assertExpectedState(Function<String, UserRanking> function)
{
assertRankingEqualsRankingFromLastMessage(PETER, function.apply(PETER));
assertRankingEqualsRankingFromLastMessage(KLAUS, function.apply(KLAUS));
}
- private static void assertRankingEqualsRankingFromLastMessage(String user, String userRankingJson)
+ private static void assertRankingEqualsRankingFromLastMessage(String user, UserRanking rankingJson)
{
- assertThat(userRankingOf(userRankingJson)).isEqualTo(getLastMessageFor(user));
- }
-
- private static UserRanking userRankingOf(String json)
- {
- if (json == null)
- {
- return null;
- }
-
- try
- {
- return objectMapper.readValue(json, UserRanking.class);
- }
- catch (Exception e)
- {
- throw new RuntimeException(e);
- }
+ assertThat(rankingJson).isEqualTo(getLastMessageFor(user));
}
private static UserRanking getLastMessageFor(String user)