]> juplo.de Git - demos/kafka/wordcount/commitdiff
query: 2.0.0 - (GREEN) Adjusted the implementation to the new expectations
authorKai Moritz <kai@juplo.de>
Wed, 12 Jun 2024 21:08:57 +0000 (23:08 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 15 Jun 2024 06:32:09 +0000 (08:32 +0200)
* Messages from the incomming topic, that is written by the `users` service
  can be serialized, although no type-information is conveyed via the
  headers.

src/main/java/de/juplo/kafka/wordcount/query/QueryApplicationConfiguration.java
src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java

index 2ece744facc6e7ee3c8db840d213488e22f6bc78..6c7844d823c88b549079639ea0d34d81d20fa703 100644 (file)
@@ -92,7 +92,6 @@ public class QueryApplicationConfiguration
                                JsonDeserializer.TYPE_MAPPINGS,
                                "user:" + Key.class.getName() + "," +
                                "ranking:" + Ranking.class.getName() + "," +
-                               "userdata:" + User.class.getName() + "," +
                                "userranking:" + UserRanking.class.getName());
 
                return props;
index 474926463c42d26ab4afe5d2349f8d9b4013f825..dcb123485fd17e35aaa77e1a2b4d8a8aa584fd44 100644 (file)
@@ -5,6 +5,7 @@ import jakarta.annotation.PreDestroy;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.*;
+import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.streams.kstream.Materialized;
@@ -57,7 +58,9 @@ public class QueryStreamProcessor
                StreamsBuilder builder = new StreamsBuilder();
 
                KTable<String, User> users = builder
-                               .stream(usersInputTopic)
+                               .stream(
+                                               usersInputTopic,
+                                               Consumed.with(null, new JsonSerde().copyWithType(User.class)))
                                .toTable(
                                                Materialized
                                                                .<String, User>as(userStoreSupplier)