- streams = new KafkaStreams(builder.build(), props);
- streams.setUncaughtExceptionHandler((Throwable e) ->
- {
- log.error("Unexpected error!", e);
- CompletableFuture.runAsync(() ->
- {
- log.info("Stopping application...");
- SpringApplication.exit(context, () -> 1);
- });
- return SHUTDOWN_CLIENT;
- });
-
- hostInfo = HostInfo.buildFromEndpoint(properties.getApplicationServer());
- storeParameters = StoreQueryParameters.fromNameAndType(storeName, QueryableStoreTypes.keyValueStore());;
- this.mapper = mapper;
+ KTable<String, String> users = builder.table(usersInputTopic);
+ KStream<String, String> rankings = builder.stream(rankingInputTopic);
+
+ rankings
+ .join(users, (rankingJson, userJson) ->
+ {
+ try
+ {
+ Ranking ranking = mapper.readValue(rankingJson, Ranking.class);
+ User user = mapper.readValue(userJson, User.class);
+
+ return mapper.writeValueAsString(
+ UserRanking.of(
+ user.getFirstName(),
+ user.getLastName(),
+ ranking.getEntries()));
+ }
+ catch (JsonProcessingException e)
+ {
+ throw new RuntimeException(e);
+ }
+ })
+ .toTable(Materialized.as(storeSupplier));
+
+ Topology topology = builder.build();
+ log.info("\n\n{}", topology.describe());
+
+ return topology;