- Properties props = new Properties();
- props.put(StreamsConfig.APPLICATION_ID_CONFIG, properties.getApplicationId());
- props.put(StreamsConfig.APPLICATION_SERVER_CONFIG, properties.getApplicationServer());
- props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
- props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
- props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
- props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ JsonSerde valueSerde = new JsonSerde();
+ valueSerde.configure(Map.of(
+ JsonDeserializer.TYPE_MAPPINGS,
+ "user:" + Key.class.getName() + "," +
+ "ranking:" + Ranking.class.getName() + "," +
+ "userdata:" + User.class.getName() + "," +
+ "userranking:" + UserRanking.class.getName()
+ ), false);
+ KTable<String, User> users = builder.table(
+ usersInputTopic,
+ Consumed.with(null, valueSerde.copyWithType(User.class))
+ );
+ KStream<String, Ranking> rankings = builder.stream(rankingInputTopic);
+
+ rankings
+ .join(users, (ranking, user) -> UserRanking.of(
+ user.getFirstName(),
+ user.getLastName(),
+ ranking.getEntries()))
+ .toTable(
+ Materialized
+ .<String, UserRanking>as(storeSupplier)
+ .withValueSerde(new JsonSerde().copyWithType(UserRanking.class)));
+
+ Topology topology = builder.build();
+ log.info("\n\n{}", topology.describe());
+
+ return topology;
+ }