JsonDeserializer.TYPE_MAPPINGS,
"user:" + Key.class.getName() + "," +
"ranking:" + Ranking.class.getName() + "," +
- "userdata:" + User.class.getName() + "," +
"userranking:" + UserRanking.class.getName());
return props;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;
+import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
StreamsBuilder builder = new StreamsBuilder();
KTable<String, User> users = builder
- .stream(usersInputTopic)
+ .stream(
+ usersInputTopic,
+ Consumed.with(null, new JsonSerde().copyWithType(User.class)))
.toTable(
Materialized
.<String, User>as(userStoreSupplier)