- JsonSerde valueSerde = new JsonSerde();
- valueSerde.configure(Map.of(
- JsonDeserializer.TYPE_MAPPINGS,
- "user:" + Key.class.getName() + "," +
- "ranking:" + Ranking.class.getName() + "," +
- "userdata:" + User.class.getName() + "," +
- "userranking:" + UserRanking.class.getName()
- ), false);
- KTable<String, User> users = builder.table(
- usersInputTopic,
- Consumed.with(null, valueSerde.copyWithType(User.class))
- );
- KStream<String, Ranking> rankings = builder.stream(rankingInputTopic);
+ KTable<String, User> users = builder
+ .stream(
+ usersInputTopic,
+ Consumed.with(Serdes.String(), new JsonSerde().copyWithType(User.class)))
+ .toTable(
+ Materialized
+ .<String, User>as(userStoreSupplier)
+ .withKeySerde(Serdes.String())
+ .withValueSerde(new JsonSerde().copyWithType(User.class)));
+ KStream<String, Ranking> rankings = builder
+ .<Key, Ranking>stream(rankingInputTopic)
+ .map((key, value) -> new KeyValue<>(key.getUsername(), value));