props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
- props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, User.class.getName()); // << Does not work without this!
props.put(
JsonDeserializer.TYPE_MAPPINGS,
"user:" + Key.class.getName() + "," +
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerde;
import java.net.URI;
+import java.util.Map;
import java.util.Optional;
import java.util.Properties;
{
StreamsBuilder builder = new StreamsBuilder();
- KTable<String, User> users = builder.table(usersInputTopic);
+ JsonSerde valueSerde = new JsonSerde();
+ valueSerde.configure(Map.of(
+ JsonDeserializer.TYPE_MAPPINGS,
+ "user:" + Key.class.getName() + "," +
+ "ranking:" + Ranking.class.getName() + "," +
+ "userdata:" + User.class.getName() + "," +
+ "userranking:" + UserRanking.class.getName()
+ ), false);
+ KTable<String, User> users = builder.table(
+ usersInputTopic,
+ Consumed.with(null, valueSerde.copyWithType(User.class))
+ );
KStream<String, Ranking> rankings = builder.stream(rankingInputTopic);
rankings