- .join(users, (rankingJson, userJson) ->
- {
- try
- {
- Ranking ranking = mapper.readValue(rankingJson, Ranking.class);
- User user = mapper.readValue(userJson, User.class);
-
- return mapper.writeValueAsString(
- UserRanking.of(
- user.getFirstName(),
- user.getLastName(),
- ranking.getEntries()));
- }
- catch (JsonProcessingException e)
- {
- throw new RuntimeException(e);
- }
- })
- .toTable(Materialized.as(storeName));
-
- streams = new KafkaStreams(builder.build(), props);
- hostInfo = applicationServer;
- storeParameters = StoreQueryParameters.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore());;
- this.mapper = mapper;
+ .join(users, (ranking, user) -> UserRanking.of(
+ user.getFirstName(),
+ user.getLastName(),
+ ranking.getEntries()))
+ .toTable(
+ Materialized
+ .<String, UserRanking>as(storeSupplier)
+ .withValueSerde(new JsonSerde().copyWithType(UserRanking.class)));
+
+ Topology topology = builder.build();
+ log.info("\n\n{}", topology.describe());
+
+ return topology;
+ }
+
+ ReadOnlyKeyValueStore<String, UserRanking> getStore()
+ {
+ return streams.store(storeParameters);