- Properties props = new Properties();
- props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId());
- props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, properties.getApplicationServer());
- props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
- props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
- props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
- props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ KTable<String, User> users = builder
+ .stream(
+ usersInputTopic,
+ Consumed.with(Serdes.String(), new JsonSerde().copyWithType(User.class)))
+ .toTable(
+ Materialized
+ .<String, User>as(userStoreSupplier)
+ .withKeySerde(Serdes.String())
+ .withValueSerde(new JsonSerde().copyWithType(User.class)));
+ KStream<String, Ranking> rankings = builder
+ .<Key, Ranking>stream(rankingInputTopic)
+ .map((key, value) -> new KeyValue<>(key.getUser(), value));
+
+ rankings
+ .join(users, (ranking, user) -> UserRanking.of(
+ user.getFirstName(),
+ user.getLastName(),
+ ranking.getEntries()),
+ Joined.keySerde(Serdes.String()))
+ .toTable(
+ Materialized
+ .<String, UserRanking>as(rankingStoreSupplier)
+ .withKeySerde(Serdes.String())
+ .withValueSerde(new JsonSerde().copyWithType(UserRanking.class)));
+
+ Topology topology = builder.build();
+ log.info("\n\n{}", topology.describe());
+
+ return topology;
+ }