import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.ConfigurableApplicationContext;
 import org.springframework.kafka.support.serializer.JsonDeserializer;
 import org.springframework.kafka.support.serializer.JsonSerde;
 
+import java.time.Duration;
+import java.time.ZoneId;
 import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
 
-import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.STORE_NAME;
+import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.*;
 import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
 
 
        public PopularStreamProcessor streamProcessor(
                        PopularApplicationProperties applicationProperties,
                        Properties streamProcessorProperties,
-                       KeyValueBytesStoreSupplier storeSupplier,
+                       ZoneId zone,
+                       KeyValueBytesStoreSupplier keyValueBytesStoreSupplier,
+                       WindowBytesStoreSupplier windowBytesStoreSupplier,
                        ConfigurableApplicationContext context)
        {
                PopularStreamProcessor streamProcessor = new PopularStreamProcessor(
                                applicationProperties.getInputTopic(),
                                applicationProperties.getOutputTopic(),
                                streamProcessorProperties,
-                               storeSupplier);
+                               zone,
+                               windowBytesStoreSupplier,
+                               keyValueBytesStoreSupplier);
 
                streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) ->
                {
        }
 
        @Bean
-       public KeyValueBytesStoreSupplier storeSupplier()
+       public ZoneId defaultZone()
        {
-               return Stores.persistentKeyValueStore(STORE_NAME);
+               return ZoneId.systemDefault();
+       }
+
+       @Bean
+       public WindowBytesStoreSupplier windowBytesStoreSupplier()
+       {
+               return Stores.persistentWindowStore(
+                               KEY_VALUE_STORE_NAME,
+                               WINDOW_SIZE.multipliedBy(2),
+                               WINDOW_SIZE,
+                               false); // << Must always be `false` for normal use-cases!
+       }
+
+       @Bean
+       public KeyValueBytesStoreSupplier keyValueBytesStoreSupplier()
+       {
+               return Stores.persistentKeyValueStore(WINDOW_STORE_NAME);
        }
 }
 
 package de.juplo.kafka.wordcount.popular;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.*;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
 import org.springframework.kafka.support.serializer.JsonSerde;
 import org.springframework.kafka.support.serializer.JsonSerializer;
 
+import java.time.Duration;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
 import java.util.Map;
 import java.util.Properties;
 import java.util.stream.Collectors;
 @Slf4j
 public class PopularStreamProcessor
 {
-       public static final String STORE_NAME = "popular";
+       public static final String KEY_VALUE_STORE_NAME = "popular";
+       public static final String WINDOW_STORE_NAME = "popular-windows";
+       public static final Duration WINDOW_SIZE = Duration.ofSeconds(30);
 
 
        public final KafkaStreams streams;
                        String inputTopic,
                        String outputTopic,
                        Properties properties,
-                       KeyValueBytesStoreSupplier storeSupplier)
+                       ZoneId zone,
+                       WindowBytesStoreSupplier windowBytesStoreSupplier,
+                       KeyValueBytesStoreSupplier keyValueBytesStoreSupplier)
        {
                Topology topology = PopularStreamProcessor.buildTopology(
                                inputTopic,
                                outputTopic,
-                               storeSupplier);
+                               zone,
+                               windowBytesStoreSupplier,
+                               keyValueBytesStoreSupplier);
 
                streams = new KafkaStreams(topology, properties);
        }
        static Topology buildTopology(
                        String inputTopic,
                        String outputTopic,
-                       KeyValueBytesStoreSupplier storeSupplier)
+                       ZoneId zone,
+                       WindowBytesStoreSupplier windowBytesStoreSupplier,
+                       KeyValueBytesStoreSupplier keyValueBytesStoreSupplier)
        {
                StreamsBuilder builder = new StreamsBuilder();
 
                builder
                                .stream(inputTopic, Consumed.with(inKeySerde(), inValueSerde()))
-                               .map((key, word) -> new KeyValue<>(word, word))
+                               .map((key, userWord) -> new KeyValue<>(Word.of(userWord.getWord()), Word.of(userWord.getWord())))
                                .groupByKey()
+                               .windowedBy(TimeWindows.ofSizeWithNoGrace(WINDOW_SIZE))
                                .count(
                                                Materialized
-                                                               .<Word, Long>as(storeSupplier)
-                                                               .withKeySerde(new JsonSerde<>(Word.class))) // No headers are present: fixed typing is needed!
+                                                               .<Word, Long>as(windowBytesStoreSupplier)
+                                                               .withKeySerde(new JsonSerde<>(Word.class).noTypeInfo())
+                                                               .withValueSerde(Serdes.Long()))
+                               .toStream()
+                               .map((windowedWord, counter) -> new KeyValue<>(
+                                               WindowedWord.of(
+                                                               ZonedDateTime.ofInstant(windowedWord.window().startTime(), zone),
+                                                               ZonedDateTime.ofInstant(windowedWord.window().endTime(), zone),
+                                                               windowedWord.key().getWord()),
+                                               WordCounter.of(windowedWord.key().getWord(), counter)))
+                               .toTable(
+                                               Materialized
+                                                               .<WindowedWord, WordCounter>as(keyValueBytesStoreSupplier)
+                                                               .withKeySerde(new JsonSerde<>(WindowedWord.class).noTypeInfo())
+                                                               .withValueSerde(new JsonSerde<>(WordCounter.class).noTypeInfo()))
                                .toStream()
-                               .map((word, counter) -> new KeyValue<>(word, WordCounter.of(word, counter)))
                                .to(outputTopic, Produced.with(outKeySerde(), outValueSerde()));
 
                Topology topology = builder.build();
                return topology;
        }
 
-       ReadOnlyKeyValueStore<Word, Long> getStore()
+       ReadOnlyKeyValueStore<WindowedWord, WordCounter> getStore()
        {
-               return streams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore()));
+               return streams.store(StoreQueryParameters.fromNameAndType(KEY_VALUE_STORE_NAME, QueryableStoreTypes.keyValueStore()));
        }
 
        public void start()
                return new JsonSerde<>(Word.class);
        }
 
-       public static JsonSerde<Word> outKeySerde()
+       public static JsonSerde<WindowedWord> outKeySerde()
        {
                return serde(true);
        }
 
        private static String typeMappingsConfig()
        {
-               return typeMappingsConfig(Word.class, WordCounter.class);
+               return typeMappingsConfig(WindowedWord.class, WordCounter.class);
        }
 
        public static String typeMappingsConfig(Class wordClass, Class wordCounterClass)
 
 import de.juplo.kafka.wordcount.splitter.InputWord;
 import de.juplo.kafka.wordcount.stats.OutputWindowedWord;
 import de.juplo.kafka.wordcount.stats.OutputWordCounter;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 import org.springframework.util.LinkedMultiValueMap;
 import org.springframework.util.MultiValueMap;
 
+import java.time.Clock;
+import java.time.Instant;
+import java.time.ZonedDateTime;
+import java.util.function.BiConsumer;
 import java.util.stream.Stream;
 
+import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.WINDOW_SIZE;
 import static org.assertj.core.api.Assertions.assertThat;
 
 
+@Slf4j
 class TestData
 {
+       static final Clock CLOCK = Clock.fixed(
+                       Clock.systemDefaultZone().instant(),
+                       Clock.systemDefaultZone().getZone());
        static final String PETER = "peter";
        static final String KLAUS = "klaus";
 
        static final String WORD_S = "s";
        static final String WORD_BOÄH = "Boäh";
 
-       static final OutputWindowedWord PETER_HALLO = OutputWindowedWord.of(PETER, WORD_HALLO);
-       static final OutputWindowedWord PETER_WELT = OutputWindowedWord.of(PETER, WORD_WELT);
-       static final OutputWindowedWord PETER_BOÄH = OutputWindowedWord.of(PETER, WORD_BOÄH);
-       static final OutputWindowedWord KLAUS_MÜSCH = OutputWindowedWord.of(KLAUS, WORD_MÜSCH);
-       static final OutputWindowedWord KLAUS_S = OutputWindowedWord.of(KLAUS, WORD_S);
+       static final OutputWindowedWord WINDOWED_WORD_HALLO = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_HALLO);
+       static final OutputWindowedWord WINDOWED_WORD_WELT = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_WELT);
+       static final OutputWindowedWord WINDOWED_WORD_BOÄH = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_BOÄH);
+       static final OutputWindowedWord WINDOWED_WORD_MÜSCH = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_MÜSCH);
+       static final OutputWindowedWord WINDOWED_WORD_S = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_S);
 
-       private static final KeyValue<InputUser, InputWord>[] INPUT_MESSAGES = new KeyValue[]
+       private static Instant windowStart()
        {
-                       KeyValue.pair(
+               return windowBoundFor(CLOCK.instant(), 0);
+       }
+
+       private static Instant windowEnd()
+       {
+               return windowBoundFor(CLOCK.instant(), WINDOW_SIZE.toSecondsPart());
+       }
+
+       private static Instant windowBoundFor(Instant instant, int second)
+       {
+               return instantOfSecond(second, 0);
+       }
+
+       private static final TestMessage<InputUser, InputWord>[] INPUT_MESSAGES = new TestMessage[]
+       {
+                       TestMessage.of(
+                                       instantOfSecond(0),
                                        InputUser.of(PETER),
                                        InputWord.of(PETER, WORD_HALLO)),
-                       KeyValue.pair(
+                       TestMessage.of(
+                                       instantOfSecond(13),
                                        InputUser.of(KLAUS),
                                        InputWord.of(KLAUS, WORD_MÜSCH)),
-                       KeyValue.pair(
+                       TestMessage.of(
+                                       instantOfSecond(0),
                                        InputUser.of(PETER),
                                        InputWord.of(PETER, WORD_WELT)),
-                       KeyValue.pair(
+                       TestMessage.of(
+                                       instantOfSecond(14),
                                        InputUser.of(KLAUS),
                                        InputWord.of(KLAUS, WORD_MÜSCH)),
-                       KeyValue.pair(
+                       TestMessage.of(
+                                       instantOfSecond(14),
                                        InputUser.of(KLAUS),
                                        InputWord.of(KLAUS, WORD_S)),
-                       KeyValue.pair(
+                       TestMessage.of(
+                                       instantOfSecond(14),
                                        InputUser.of(PETER),
                                        InputWord.of(PETER, WORD_BOÄH)),
-                       KeyValue.pair(
+                       TestMessage.of(
+                                       instantOfSecond(14),
                                        InputUser.of(PETER),
                                        InputWord.of(PETER, WORD_WELT)),
-                       KeyValue.pair(
+                       TestMessage.of(
+                                       instantOfSecond(15),
                                        InputUser.of(PETER),
                                        InputWord.of(PETER, WORD_BOÄH)),
-                       KeyValue.pair(
+                       TestMessage.of(
+                                       instantOfSecond(15),
                                        InputUser.of(KLAUS),
                                        InputWord.of(KLAUS, WORD_S)),
-                       KeyValue.pair(
+                       TestMessage.of(
+                                       instantOfSecond(29),
                                        InputUser.of(PETER),
                                        InputWord.of(PETER, WORD_BOÄH)),
-                       KeyValue.pair(
+                       TestMessage.of(
+                                       instantOfSecond(20),
                                        InputUser.of(KLAUS),
                                        InputWord.of(KLAUS, WORD_S)),
        };
 
-       static Stream<KeyValue<InputUser, InputWord>> getInputMessages()
+       private static Instant instantOfSecond(int second)
+       {
+               return instantOfSecond(second, 0);
+       }
+
+       private static Instant instantOfSecond(int second, int naonSeconds)
+       {
+               return ZonedDateTime
+                               .ofInstant(CLOCK.instant(), CLOCK.getZone())
+                               .withSecond(0)
+                               .plusSeconds(second)
+                               .withNano(naonSeconds)
+                               .toInstant();
+       }
+
+       private static Stream<TestMessage<InputUser, InputWord>> getInputMessages()
        {
                return Stream.of(TestData.INPUT_MESSAGES);
        }
 
+       static void sendInputMessages(BiConsumer<Instant, KeyValue<InputUser, InputWord>> consumer)
+       {
+               getInputMessages().forEach(message ->
+               {
+                       log.info("Sending@{}: {} -> {}", message.time, message.key, message.value);
+                       consumer.accept(message.time, new KeyValue<>(message.key, message.value));
+               });
+       }
+
        static void assertExpectedMessages(MultiValueMap<OutputWindowedWord, OutputWordCounter> receivedMessages)
        {
                expectedMessages().forEach(
 
        static void assertExpectedNumberOfMessagesForWord(MultiValueMap<OutputWindowedWord, OutputWordCounter> receivedMessages)
        {
-               assertThat(countMessagesForWord(PETER_HALLO, receivedMessages));
-               assertThat(countMessagesForWord(PETER_WELT, receivedMessages));
-               assertThat(countMessagesForWord(PETER_BOÄH, receivedMessages));
-               assertThat(countMessagesForWord(KLAUS_MÜSCH, receivedMessages));
-               assertThat(countMessagesForWord(KLAUS_S, receivedMessages));
+               assertThat(countMessagesForWord(WINDOWED_WORD_HALLO, receivedMessages));
+               assertThat(countMessagesForWord(WINDOWED_WORD_WELT, receivedMessages));
+               assertThat(countMessagesForWord(WINDOWED_WORD_BOÄH, receivedMessages));
+               assertThat(countMessagesForWord(WINDOWED_WORD_MÜSCH, receivedMessages));
+               assertThat(countMessagesForWord(WINDOWED_WORD_S, receivedMessages));
        }
 
        private static int countMessagesForWord(OutputWindowedWord word, MultiValueMap<OutputWindowedWord, OutputWordCounter> messagesForUsers)
                                : messagesForUsers.get(word).size();
        }
 
-       static void assertExpectedState(ReadOnlyKeyValueStore<Word, Long> store)
+       static void assertExpectedState(ReadOnlyKeyValueStore<WindowedWord, WordCounter> store)
        {
-               assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, store.get(wordOf(PETER_HALLO)));
-               assertWordCountEqualsWordCountFromLastMessage(PETER_WELT, store.get(wordOf(PETER_WELT)));
-               assertWordCountEqualsWordCountFromLastMessage(PETER_BOÄH, store.get(wordOf(PETER_BOÄH)));
-               assertWordCountEqualsWordCountFromLastMessage(KLAUS_MÜSCH, store.get(wordOf(KLAUS_MÜSCH)));
-               assertWordCountEqualsWordCountFromLastMessage(KLAUS_S, store.get(wordOf(KLAUS_S)));
+               assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_HALLO, store.get(windowedWordOf(WINDOWED_WORD_HALLO)));
+               assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_WELT, store.get(windowedWordOf(WINDOWED_WORD_WELT)));
+               assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_BOÄH, store.get(windowedWordOf(WINDOWED_WORD_BOÄH)));
+               assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_MÜSCH, store.get(windowedWordOf(WINDOWED_WORD_MÜSCH)));
+               assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_S, store.get(windowedWordOf(WINDOWED_WORD_S)));
        }
 
-       private static Word wordOf(OutputWindowedWord testOutputWindowedWord)
+       private static WindowedWord windowedWordOf(OutputWindowedWord outputWindowedWord)
        {
-               Word word = new Word();
+               WindowedWord windowedWord = new WindowedWord();
 
-               word.setUser(testOutputWindowedWord.getUser());
-               word.setWord(testOutputWindowedWord.getWord());
+               windowedWord.setStart(ZonedDateTime.ofInstant(outputWindowedWord.getStart(), CLOCK.getZone()));
+               windowedWord.setEnd(ZonedDateTime.ofInstant(outputWindowedWord.getEnd(), CLOCK.getZone()));
+               windowedWord.setWord(outputWindowedWord.getWord());
 
-               return word;
+               return windowedWord;
        }
 
        static void assertExpectedLastMessagesForWord(MultiValueMap<OutputWindowedWord, OutputWordCounter> receivedMessages)
        {
-               assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, getLastMessageFor(PETER_HALLO, receivedMessages));
-               assertWordCountEqualsWordCountFromLastMessage(PETER_WELT, getLastMessageFor(PETER_WELT, receivedMessages));
-               assertWordCountEqualsWordCountFromLastMessage(PETER_BOÄH, getLastMessageFor(PETER_BOÄH, receivedMessages));
-               assertWordCountEqualsWordCountFromLastMessage(KLAUS_MÜSCH, getLastMessageFor(KLAUS_MÜSCH, receivedMessages));
-               assertWordCountEqualsWordCountFromLastMessage(KLAUS_S, getLastMessageFor(KLAUS_S, receivedMessages));
+               assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_HALLO, getLastMessageFor(WINDOWED_WORD_HALLO, receivedMessages));
+               assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_WELT, getLastMessageFor(WINDOWED_WORD_WELT, receivedMessages));
+               assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_BOÄH, getLastMessageFor(WINDOWED_WORD_BOÄH, receivedMessages));
+               assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_MÜSCH, getLastMessageFor(WINDOWED_WORD_MÜSCH, receivedMessages));
+               assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_S, getLastMessageFor(WINDOWED_WORD_S, receivedMessages));
        }
 
        private static void assertWordCountEqualsWordCountFromLastMessage(
                        OutputWindowedWord word,
-                       Long counter)
+                       WordCounter counter)
        {
-               OutputWordCounter testOutputWordCounter = OutputWordCounter.of(
-                               word.getUser(),
+               OutputWordCounter outputWordCounter = OutputWordCounter.of(
                                word.getWord(),
-                               counter);
-               assertWordCountEqualsWordCountFromLastMessage(word, testOutputWordCounter);
+                               counter.getCounter());
+               assertWordCountEqualsWordCountFromLastMessage(word, outputWordCounter);
        }
 
        private static void assertWordCountEqualsWordCountFromLastMessage(
        private static final KeyValue<OutputWindowedWord, OutputWordCounter>[] EXPECTED_MESSAGES = new KeyValue[]
        {
                        KeyValue.pair(
-                                       PETER_HALLO,
-                                       OutputWordCounter.of(PETER, WORD_HALLO,1)),
+                                       WINDOWED_WORD_HALLO,
+                                       OutputWordCounter.of(WORD_HALLO,1)),
                        KeyValue.pair(
-                                       KLAUS_MÜSCH,
-                                       OutputWordCounter.of(KLAUS, WORD_MÜSCH,1)),
+                                       WINDOWED_WORD_MÜSCH,
+                                       OutputWordCounter.of(WORD_MÜSCH,1)),
                        KeyValue.pair(
-                                       PETER_WELT,
-                                       OutputWordCounter.of(PETER, WORD_WELT,1)),
+                                       WINDOWED_WORD_WELT,
+                                       OutputWordCounter.of(WORD_WELT,1)),
                        KeyValue.pair(
-                                       KLAUS_MÜSCH,
-                                       OutputWordCounter.of(KLAUS, WORD_MÜSCH,2)),
+                                       WINDOWED_WORD_MÜSCH,
+                                       OutputWordCounter.of(WORD_MÜSCH,2)),
                        KeyValue.pair(
-                                       KLAUS_S,
-                                       OutputWordCounter.of(KLAUS, WORD_S,1)),
+                                       WINDOWED_WORD_S,
+                                       OutputWordCounter.of(WORD_S,1)),
                        KeyValue.pair(
-                                       PETER_BOÄH,
-                                       OutputWordCounter.of(PETER, WORD_BOÄH,1)),
+                                       WINDOWED_WORD_BOÄH,
+                                       OutputWordCounter.of(WORD_BOÄH,1)),
                        KeyValue.pair(
-                                       PETER_WELT,
-                                       OutputWordCounter.of(PETER, WORD_WELT,2)),
+                                       WINDOWED_WORD_WELT,
+                                       OutputWordCounter.of(WORD_WELT,2)),
                        KeyValue.pair(
-                                       PETER_BOÄH,
-                                       OutputWordCounter.of(PETER, WORD_BOÄH,2)),
+                                       WINDOWED_WORD_BOÄH,
+                                       OutputWordCounter.of(WORD_BOÄH,2)),
                        KeyValue.pair(
-                                       KLAUS_S,
-                                       OutputWordCounter.of(KLAUS, WORD_S,2)),
+                                       WINDOWED_WORD_S,
+                                       OutputWordCounter.of(WORD_S,2)),
                        KeyValue.pair(
-                                       PETER_BOÄH,
-                                       OutputWordCounter.of(PETER, WORD_BOÄH,3)),
+                                       WINDOWED_WORD_BOÄH,
+                                       OutputWordCounter.of(WORD_BOÄH,3)),
                        KeyValue.pair(
-                                       KLAUS_S,
-                                       OutputWordCounter.of(KLAUS, WORD_S,3)),
+                                       WINDOWED_WORD_S,
+                                       OutputWordCounter.of(WORD_S,3)),
        };
 
        static MultiValueMap<OutputWindowedWord, OutputWordCounter> expectedMessages()