- .<Word, Long>as(storeSupplier)
- .withKeySerde(new JsonSerde<>(Word.class))) // No headers are present: fixed typing is needed!
+ .<Word, Long>as(windowBytesStoreSupplier)
+ .withKeySerde(new JsonSerde<>(Word.class).noTypeInfo())
+ .withValueSerde(Serdes.Long()))
+ .toStream()
+ .map((windowedWord, counter) -> new KeyValue<>(
+ WindowedWord.of(
+ ZonedDateTime.ofInstant(windowedWord.window().startTime(), zone),
+ ZonedDateTime.ofInstant(windowedWord.window().endTime(), zone),
+ windowedWord.key().getWord()),
+ WordCounter.of(windowedWord.key().getWord(), counter)))
+ .toTable(
+ Materialized
+ .<WindowedWord, WordCounter>as(keyValueBytesStoreSupplier)
+ .withKeySerde(new JsonSerde<>(WindowedWord.class).noTypeInfo())
+ .withValueSerde(new JsonSerde<>(WordCounter.class).noTypeInfo()))