import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.streams.state.QueryableStoreTypes;
{
StreamsBuilder builder = new StreamsBuilder();
- builder.table(properties.getRankingInputTopic(), Materialized.as(storeName));
+ KTable<String, String> users = builder.table(properties.getUsersInputTopic());
+ KStream<String, String> rankings = builder.stream(properties.getRankingInputTopic());
+
+ rankings
+ .join(users, (rankingJson, userJson) ->
+ {
+ try
+ {
+ Ranking ranking = mapper.readValue(rankingJson, Ranking.class);
+ User user = mapper.readValue(userJson, User.class);
+
+ return mapper.writeValueAsString(
+ UserRanking.of(
+ user.getFirstName(),
+ user.getLastName(),
+ ranking.getEntries()));
+ }
+ catch (JsonProcessingException e)
+ {
+ throw new RuntimeException(e);
+ }
+ })
+ .toTable(Materialized.as(storeName));
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId());
return Optional.of(location);
}
- public Optional<Ranking> getRanking(String username)
+ public Optional<UserRanking> getUserRanking(String username)
{
return
Optional
{
try
{
- return mapper.readValue(json, Ranking.class);
+ return mapper.readValue(json, UserRanking.class);
}
catch (JsonProcessingException e)
{