query: 2.0.0 - (GREEN) Adjusted the implementation to the new expectations
authorKai Moritz <kai@juplo.de>
Wed, 12 Jun 2024 21:08:57 +0000 (23:08 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 15 Jun 2024 06:32:09 +0000 (08:32 +0200)
* Messages from the incomming topic, that is written by the `users` service
  can be serialized, although no type-information is conveyed via the
  headers.

src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java
src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java

index 2ece744..6c7844d 100644 (file)
@@ -92,7 +92,6 @@ public class QueryApplicationConfiguration
                                JsonDeserializer.TYPE_MAPPINGS,
                                "user:" + Key.class.getName() + "," +
                                "ranking:" + Ranking.class.getName() + "," +
-                               "userdata:" + User.class.getName() + "," +
                                "userranking:" + UserRanking.class.getName());
 
                return props;
index 4749264..dcb1234 100644 (file)
@@ -5,6 +5,7 @@ import jakarta.annotation.PreDestroy;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.*;
+import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
@@ -57,7 +58,9 @@ public class QueryStreamProcessor
                StreamsBuilder builder = new StreamsBuilder();
 
                KTable<String, User> users = builder
-                               .stream(usersInputTopic)
+                               .stream(
+                                               usersInputTopic,
+                                               Consumed.with(null, new JsonSerde().copyWithType(User.class)))
                                .toTable(
                                                Materialized
                                                                .<String, User>as(userStoreSupplier)