X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fquery%2FQueryStreamProcessor.java;h=7dacd4b914e10717d539c69292300024e4b6c201;hb=0648885ec026d7434561060dc7edb703efea6853;hp=cc65fce7452acf68593e1a8a340d652c3018a520;hpb=278c7b8125c82120e1d80fa640bd16d375d4bf86;p=demos%2Fkafka%2Fwordcount diff --git a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java index cc65fce..7dacd4b 100644 --- a/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/query/QueryStreamProcessor.java @@ -5,7 +5,6 @@ import jakarta.annotation.PreDestroy; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.*; -import org.apache.kafka.streams.kstream.Consumed; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; @@ -53,7 +52,9 @@ public class QueryStreamProcessor { StreamsBuilder builder = new StreamsBuilder(); - KTable users = builder.table(usersInputTopic); + KTable users = builder + .stream(usersInputTopic) + .toTable(Materialized.with(null, new JsonSerde().copyWithType(User.class))); KStream rankings = builder.stream(rankingInputTopic); rankings