</parent>
<groupId>de.juplo.kafka.wordcount</groupId>
<artifactId>query</artifactId>
- <version>1.0.0</version>
+ <version>1.0.1</version>
<name>Wordcount-Query</name>
<description>Query stream-processor of the multi-user wordcount-example</description>
<properties>
private final QueryStreamProcessor processor;
@GetMapping("{username}")
- ResponseEntity<Ranking> queryFor(@PathVariable String username)
+ ResponseEntity<UserRanking> queryFor(@PathVariable String username)
{
Optional<URI> redirect = processor.getRedirect(username);
if (redirect.isPresent())
.build();
}
- return ResponseEntity.of(processor.getRanking(username));
+ return ResponseEntity.of(processor.getUserRanking(username));
}
}
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.HostInfo;
import org.apache.kafka.streams.state.QueryableStoreTypes;
{
StreamsBuilder builder = new StreamsBuilder();
- builder.table(properties.getRankingInputTopic(), Materialized.as(storeName));
+ KTable<String, String> users = builder.table(properties.getUsersInputTopic());
+ KStream<String, String> rankings = builder.stream(properties.getRankingInputTopic());
+
+ rankings
+ .join(users, (rankingJson, userJson) ->
+ {
+ try
+ {
+ Ranking ranking = mapper.readValue(rankingJson, Ranking.class);
+ User user = mapper.readValue(userJson, User.class);
+
+ return mapper.writeValueAsString(
+ UserRanking.of(
+ user.getFirstName(),
+ user.getLastName(),
+ ranking.getEntries()));
+ }
+ catch (JsonProcessingException e)
+ {
+ throw new RuntimeException(e);
+ }
+ })
+ .toTable(Materialized.as(storeName));
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId());
return Optional.of(location);
}
- public Optional<Ranking> getRanking(String username)
+ public Optional<UserRanking> getUserRanking(String username)
{
return
Optional
{
try
{
- return mapper.readValue(json, Ranking.class);
+ return mapper.readValue(json, UserRanking.class);
}
catch (JsonProcessingException e)
{
--- /dev/null
+package de.juplo.kafka.wordcount.query;
+
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+
+
+@Getter
+@Setter
+@ToString
+@EqualsAndHashCode(of = "username")
+public class User
+{
+ public enum Sex { FEMALE, MALE, OTHER }
+
+ private String username;
+
+ private String firstName;
+ private String lastName;
+ private Sex sex;
+}
--- /dev/null
+package de.juplo.kafka.wordcount.query;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+
+
+@Getter
+@Setter
+@AllArgsConstructor(staticName = "of")
+public class UserRanking
+{
+ private String firstName;
+ private String lastName;
+ private Entry[] top10;
+}