--
works, but is very confusing
* The default-type is specified as a consumption-parameter in the command,
that reads the input topic into the `KTable` via ``Consumed.with(..)``.
* The resulting code is confusing, because the ``Consumed``-parameter is
used for both, the consumption of the input topic _and_ the consumption
of stored values, if read from the state-store.
* Because of this, one might only think of the consumption of the stored
values from the state-store, when looking at the ``Consumed.with()``-
statement, and argue, why the type-mappings have to be specified here.
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class.getName());
- props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, User.class.getName()); // << Does not work without this!
props.put(
JsonDeserializer.TYPE_MAPPINGS,
"user:" + Key.class.getName() + "," +
props.put(
JsonDeserializer.TYPE_MAPPINGS,
"user:" + Key.class.getName() + "," +
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerde;
import java.net.URI;
import org.springframework.kafka.support.serializer.JsonSerde;
import java.net.URI;
import java.util.Optional;
import java.util.Properties;
import java.util.Optional;
import java.util.Properties;
{
StreamsBuilder builder = new StreamsBuilder();
{
StreamsBuilder builder = new StreamsBuilder();
- KTable<String, User> users = builder.table(usersInputTopic);
+ JsonSerde valueSerde = new JsonSerde();
+ valueSerde.configure(Map.of(
+ JsonDeserializer.TYPE_MAPPINGS,
+ "user:" + Key.class.getName() + "," +
+ "ranking:" + Ranking.class.getName() + "," +
+ "userdata:" + User.class.getName() + "," +
+ "userranking:" + UserRanking.class.getName()
+ ), false);
+ KTable<String, User> users = builder.table(
+ usersInputTopic,
+ Consumed.with(null, valueSerde.copyWithType(User.class))
+ );
KStream<String, Ranking> rankings = builder.stream(rankingInputTopic);
rankings
KStream<String, Ranking> rankings = builder.stream(rankingInputTopic);
rankings