summary |
shortlog |
log |
commit | commitdiff |
tree
raw |
patch |
inline | side by side (from parent 1:
419be8a)
* Refactored the creation of the ``JsonSerde``s, that are used to consume
the incomming messages.
* All special ``Serdes``, that are used for incomming and outgoing messages,
are created in separted methods now.
* Removed unnecessary operatorx in the ``Materialized``-configuration for
the state store (the operator is not necessary, because no headers are
present, when deserializing from a store).
StreamsBuilder builder = new StreamsBuilder();
builder
StreamsBuilder builder = new StreamsBuilder();
builder
- .stream(
- inputTopic,
- Consumed.with(
- new JsonSerde<>(User.class),
- new JsonSerde<>(Word.class)))
+ .stream(inputTopic, Consumed.with(inKeySerde(), inValueSerde()))
.map((key, word) -> new KeyValue<>(word, word))
.groupByKey()
.count(
Materialized
.<Word, Long>as(storeSupplier)
.map((key, word) -> new KeyValue<>(word, word))
.groupByKey()
.count(
Materialized
.<Word, Long>as(storeSupplier)
- .withKeySerde(new JsonSerde<>().copyWithType(Word.class).forKeys()))
+ .withKeySerde(new JsonSerde<>(Word.class))) // No headers are present: fixed typing is needed!
.toStream()
.map((word, counter) -> new KeyValue<>(word, WordCounter.of(word, counter)))
.to(outputTopic, Produced.with(outKeySerde(), outValueSerde()));
.toStream()
.map((word, counter) -> new KeyValue<>(word, WordCounter.of(word, counter)))
.to(outputTopic, Produced.with(outKeySerde(), outValueSerde()));
+ public static JsonSerde<User> inKeySerde()
+ {
+ return new JsonSerde<>(User.class);
+ }
+
+ public static JsonSerde<Word> inValueSerde()
+ {
+ return new JsonSerde<>(Word.class);
+ }
+
public static JsonSerde<Word> outKeySerde()
{
return serde(true);
public static JsonSerde<Word> outKeySerde()
{
return serde(true);