popular: 1.0.0 - Words are counted for tumbling time-windows
authorKai Moritz <kai@juplo.de>
Sun, 16 Jun 2024 09:48:24 +0000 (11:48 +0200)
committerKai Moritz <kai@juplo.de>
Wed, 19 Jun 2024 20:51:11 +0000 (22:51 +0200)
12 files changed:
src/main/java/de/juplo/kafka/wordcount/popular/PopularApplicationConfiguriation.java
src/main/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessor.java
src/main/java/de/juplo/kafka/wordcount/popular/UserWord.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/wordcount/popular/WindowedWord.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/wordcount/popular/Word.java
src/main/java/de/juplo/kafka/wordcount/popular/WordCounter.java
src/test/java/de/juplo/kafka/wordcount/popular/PopularApplicationIT.java
src/test/java/de/juplo/kafka/wordcount/popular/PopularStreamProcessorTopologyTest.java
src/test/java/de/juplo/kafka/wordcount/popular/TestData.java
src/test/java/de/juplo/kafka/wordcount/popular/TestMessage.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/wordcount/stats/OutputWindowedWord.java
src/test/java/de/juplo/kafka/wordcount/stats/OutputWordCounter.java

index 1c02197..45a0a57 100644 (file)
@@ -5,6 +5,7 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.ConfigurableApplicationContext;
@@ -13,10 +14,12 @@ import org.springframework.context.annotation.Configuration;
 import org.springframework.kafka.support.serializer.JsonDeserializer;
 import org.springframework.kafka.support.serializer.JsonSerde;
 
+import java.time.Duration;
+import java.time.ZoneId;
 import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
 
-import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.STORE_NAME;
+import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.*;
 import static org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
 
 
@@ -59,14 +62,18 @@ public class PopularApplicationConfiguriation
        public PopularStreamProcessor streamProcessor(
                        PopularApplicationProperties applicationProperties,
                        Properties streamProcessorProperties,
-                       KeyValueBytesStoreSupplier storeSupplier,
+                       ZoneId zone,
+                       KeyValueBytesStoreSupplier keyValueBytesStoreSupplier,
+                       WindowBytesStoreSupplier windowBytesStoreSupplier,
                        ConfigurableApplicationContext context)
        {
                PopularStreamProcessor streamProcessor = new PopularStreamProcessor(
                                applicationProperties.getInputTopic(),
                                applicationProperties.getOutputTopic(),
                                streamProcessorProperties,
-                               storeSupplier);
+                               zone,
+                               windowBytesStoreSupplier,
+                               keyValueBytesStoreSupplier);
 
                streamProcessor.streams.setUncaughtExceptionHandler((Throwable e) ->
                {
@@ -84,8 +91,24 @@ public class PopularApplicationConfiguriation
        }
 
        @Bean
-       public KeyValueBytesStoreSupplier storeSupplier()
+       public ZoneId defaultZone()
        {
-               return Stores.persistentKeyValueStore(STORE_NAME);
+               return ZoneId.systemDefault();
+       }
+
+       @Bean
+       public WindowBytesStoreSupplier windowBytesStoreSupplier()
+       {
+               return Stores.persistentWindowStore(
+                               KEY_VALUE_STORE_NAME,
+                               WINDOW_SIZE.multipliedBy(2),
+                               WINDOW_SIZE,
+                               false); // << Must always be `false` for normal use-cases!
+       }
+
+       @Bean
+       public KeyValueBytesStoreSupplier keyValueBytesStoreSupplier()
+       {
+               return Stores.persistentKeyValueStore(WINDOW_STORE_NAME);
        }
 }
index 7bba240..9ba9ad2 100644 (file)
@@ -1,16 +1,22 @@
 package de.juplo.kafka.wordcount.popular;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.streams.*;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.TimeWindows;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.QueryableStoreTypes;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
 import org.springframework.kafka.support.serializer.JsonSerde;
 import org.springframework.kafka.support.serializer.JsonSerializer;
 
+import java.time.Duration;
+import java.time.ZoneId;
+import java.time.ZonedDateTime;
 import java.util.Map;
 import java.util.Properties;
 import java.util.stream.Collectors;
@@ -19,7 +25,9 @@ import java.util.stream.Collectors;
 @Slf4j
 public class PopularStreamProcessor
 {
-       public static final String STORE_NAME = "popular";
+       public static final String KEY_VALUE_STORE_NAME = "popular";
+       public static final String WINDOW_STORE_NAME = "popular-windows";
+       public static final Duration WINDOW_SIZE = Duration.ofSeconds(30);
 
 
        public final KafkaStreams streams;
@@ -29,12 +37,16 @@ public class PopularStreamProcessor
                        String inputTopic,
                        String outputTopic,
                        Properties properties,
-                       KeyValueBytesStoreSupplier storeSupplier)
+                       ZoneId zone,
+                       WindowBytesStoreSupplier windowBytesStoreSupplier,
+                       KeyValueBytesStoreSupplier keyValueBytesStoreSupplier)
        {
                Topology topology = PopularStreamProcessor.buildTopology(
                                inputTopic,
                                outputTopic,
-                               storeSupplier);
+                               zone,
+                               windowBytesStoreSupplier,
+                               keyValueBytesStoreSupplier);
 
                streams = new KafkaStreams(topology, properties);
        }
@@ -42,20 +54,35 @@ public class PopularStreamProcessor
        static Topology buildTopology(
                        String inputTopic,
                        String outputTopic,
-                       KeyValueBytesStoreSupplier storeSupplier)
+                       ZoneId zone,
+                       WindowBytesStoreSupplier windowBytesStoreSupplier,
+                       KeyValueBytesStoreSupplier keyValueBytesStoreSupplier)
        {
                StreamsBuilder builder = new StreamsBuilder();
 
                builder
                                .stream(inputTopic, Consumed.with(inKeySerde(), inValueSerde()))
-                               .map((key, word) -> new KeyValue<>(word, word))
+                               .map((key, userWord) -> new KeyValue<>(Word.of(userWord.getWord()), Word.of(userWord.getWord())))
                                .groupByKey()
+                               .windowedBy(TimeWindows.ofSizeWithNoGrace(WINDOW_SIZE))
                                .count(
                                                Materialized
-                                                               .<Word, Long>as(storeSupplier)
-                                                               .withKeySerde(new JsonSerde<>(Word.class))) // No headers are present: fixed typing is needed!
+                                                               .<Word, Long>as(windowBytesStoreSupplier)
+                                                               .withKeySerde(new JsonSerde<>(Word.class).noTypeInfo())
+                                                               .withValueSerde(Serdes.Long()))
+                               .toStream()
+                               .map((windowedWord, counter) -> new KeyValue<>(
+                                               WindowedWord.of(
+                                                               ZonedDateTime.ofInstant(windowedWord.window().startTime(), zone),
+                                                               ZonedDateTime.ofInstant(windowedWord.window().endTime(), zone),
+                                                               windowedWord.key().getWord()),
+                                               WordCounter.of(windowedWord.key().getWord(), counter)))
+                               .toTable(
+                                               Materialized
+                                                               .<WindowedWord, WordCounter>as(keyValueBytesStoreSupplier)
+                                                               .withKeySerde(new JsonSerde<>(WindowedWord.class).noTypeInfo())
+                                                               .withValueSerde(new JsonSerde<>(WordCounter.class).noTypeInfo()))
                                .toStream()
-                               .map((word, counter) -> new KeyValue<>(word, WordCounter.of(word, counter)))
                                .to(outputTopic, Produced.with(outKeySerde(), outValueSerde()));
 
                Topology topology = builder.build();
@@ -64,9 +91,9 @@ public class PopularStreamProcessor
                return topology;
        }
 
-       ReadOnlyKeyValueStore<Word, Long> getStore()
+       ReadOnlyKeyValueStore<WindowedWord, WordCounter> getStore()
        {
-               return streams.store(StoreQueryParameters.fromNameAndType(STORE_NAME, QueryableStoreTypes.keyValueStore()));
+               return streams.store(StoreQueryParameters.fromNameAndType(KEY_VALUE_STORE_NAME, QueryableStoreTypes.keyValueStore()));
        }
 
        public void start()
@@ -93,7 +120,7 @@ public class PopularStreamProcessor
                return new JsonSerde<>(Word.class);
        }
 
-       public static JsonSerde<Word> outKeySerde()
+       public static JsonSerde<WindowedWord> outKeySerde()
        {
                return serde(true);
        }
@@ -114,7 +141,7 @@ public class PopularStreamProcessor
 
        private static String typeMappingsConfig()
        {
-               return typeMappingsConfig(Word.class, WordCounter.class);
+               return typeMappingsConfig(WindowedWord.class, WordCounter.class);
        }
 
        public static String typeMappingsConfig(Class wordClass, Class wordCounterClass)
diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/UserWord.java b/src/main/java/de/juplo/kafka/wordcount/popular/UserWord.java
new file mode 100644 (file)
index 0000000..18f4726
--- /dev/null
@@ -0,0 +1,13 @@
+package de.juplo.kafka.wordcount.popular;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.Data;
+
+
+@Data
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class UserWord
+{
+  private String user;
+  private String word;
+}
diff --git a/src/main/java/de/juplo/kafka/wordcount/popular/WindowedWord.java b/src/main/java/de/juplo/kafka/wordcount/popular/WindowedWord.java
new file mode 100644 (file)
index 0000000..df988fb
--- /dev/null
@@ -0,0 +1,18 @@
+package de.juplo.kafka.wordcount.popular;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.time.ZonedDateTime;
+
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor(staticName = "of")
+public class WindowedWord
+{
+  ZonedDateTime start;
+  ZonedDateTime end;
+  String word;
+}
index 92c1d6d..1cf09f3 100644 (file)
@@ -1,13 +1,14 @@
 package de.juplo.kafka.wordcount.popular;
 
-import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import lombok.AllArgsConstructor;
 import lombok.Data;
+import lombok.NoArgsConstructor;
 
 
 @Data
-@JsonIgnoreProperties(ignoreUnknown = true)
+@NoArgsConstructor
+@AllArgsConstructor(staticName = "of")
 public class Word
 {
-  private String user;
-  private String word;
+  String word;
 }
index 39eebf0..c94ec0e 100644 (file)
@@ -1,6 +1,5 @@
 package de.juplo.kafka.wordcount.popular;
 
-import lombok.AccessLevel;
 import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
@@ -8,15 +7,9 @@ import lombok.NoArgsConstructor;
 
 @Data
 @NoArgsConstructor
-@AllArgsConstructor(access = AccessLevel.PRIVATE)
+@AllArgsConstructor(staticName = "of")
 public class WordCounter
 {
-  String user;
   String word;
   long counter;
-
-  public static WordCounter of(Word word, long counter)
-  {
-    return new WordCounter(word.getUser(), word.getWord(), counter);
-  }
 }
index a327389..1a5cbf4 100644 (file)
@@ -7,6 +7,7 @@ import de.juplo.kafka.wordcount.stats.OutputWordCounter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.DisplayName;
 import org.junit.jupiter.api.Test;
@@ -28,7 +29,7 @@ import java.time.Duration;
 
 import static de.juplo.kafka.wordcount.popular.PopularApplicationIT.TOPIC_IN;
 import static de.juplo.kafka.wordcount.popular.PopularApplicationIT.TOPIC_OUT;
-import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.STORE_NAME;
+import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.*;
 import static org.awaitility.Awaitility.await;
 
 
@@ -67,12 +68,11 @@ public class PopularApplicationIT
                        @Autowired KafkaTemplate<InputUser, InputWord> kafkaTemplate)
        {
                TestData
-                               .getInputMessages()
-                               .forEach(kv ->
+                               .sendInputMessages((instant, kv) ->
                                {
                                        try
                                        {
-                                               SendResult<InputUser, InputWord> result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get();
+                                               SendResult<InputUser, InputWord> result = kafkaTemplate.send(TOPIC_IN, null, instant.toEpochMilli(), kv.key, kv.value).get();
                                                log.info(
                                                                "Sent: {}={}, partition={}, offset={}",
                                                                result.getProducerRecord().key(),
@@ -109,7 +109,7 @@ public class PopularApplicationIT
 
        @DisplayName("Await the expected final output messages")
        @Test
-       public void testAwaitExpectedLastMessagesForUsers()
+       public void testAwaitExpectedLastMessagesForWord()
        {
                await("Expected final output messages")
                                .atMost(Duration.ofSeconds(5))
@@ -157,9 +157,19 @@ public class PopularApplicationIT
                }
 
                @Bean
-               KeyValueBytesStoreSupplier storeSupplier()
+               WindowBytesStoreSupplier windowBytesStoreSupplier()
                {
-                       return Stores.inMemoryKeyValueStore(STORE_NAME);
+                       return Stores.inMemoryWindowStore(
+                                       WINDOW_STORE_NAME,
+                                       WINDOW_SIZE.multipliedBy(2),
+                                       WINDOW_SIZE,
+                                       false);
+               }
+
+               @Bean
+               KeyValueBytesStoreSupplier keyValueBytesStoreSupplier()
+               {
+                       return Stores.inMemoryKeyValueStore(KEY_VALUE_STORE_NAME);
                }
        }
 }
index 3e71d3e..23ab524 100644 (file)
@@ -20,10 +20,12 @@ import org.springframework.kafka.support.serializer.JsonSerializer;
 import org.springframework.util.LinkedMultiValueMap;
 import org.springframework.util.MultiValueMap;
 
+import java.time.Duration;
+import java.time.ZoneId;
 import java.util.Map;
 
 import static de.juplo.kafka.wordcount.popular.PopularApplicationConfiguriation.serializationConfig;
-import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.STORE_NAME;
+import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.*;
 
 
 @Slf4j
@@ -31,6 +33,7 @@ public class PopularStreamProcessorTopologyTest
 {
   public static final String IN = "TEST-IN";
   public static final String OUT = "TEST-OUT";
+  public static final ZoneId ZONE = ZoneId.of("Europe/Berlin");
 
 
   static TopologyTestDriver testDriver;
@@ -43,7 +46,13 @@ public class PopularStreamProcessorTopologyTest
     Topology topology = PopularStreamProcessor.buildTopology(
         IN,
         OUT,
-        Stores.inMemoryKeyValueStore(STORE_NAME));
+        ZONE,
+        Stores.inMemoryWindowStore(
+            WINDOW_STORE_NAME,
+            WINDOW_SIZE.multipliedBy(2),
+            WINDOW_SIZE,
+            false),
+        Stores.inMemoryKeyValueStore(KEY_VALUE_STORE_NAME));
 
     testDriver = new TopologyTestDriver(topology, serializationConfig());
 
@@ -52,9 +61,7 @@ public class PopularStreamProcessorTopologyTest
     TestOutputTopic<OutputWindowedWord, OutputWordCounter> out =
         testDriver.createOutputTopic(OUT, keyDeserializer(), valueDeserializer());
 
-    TestData
-        .getInputMessages()
-        .forEach(kv -> in.pipeInput(kv.key, kv.value));
+    TestData.sendInputMessages((instant, kv) -> in.pipeInput(kv.key, kv.value, instant));
 
     receivedMessages = new LinkedMultiValueMap<>();
     out
@@ -88,7 +95,7 @@ public class PopularStreamProcessorTopologyTest
   @Test
   public void testExpectedState()
   {
-    KeyValueStore<Word, Long> store = testDriver.getKeyValueStore(STORE_NAME);
+    KeyValueStore<WindowedWord, WordCounter> store = testDriver.getKeyValueStore(KEY_VALUE_STORE_NAME);
     TestData.assertExpectedState(store);
   }
 
index 892ea81..372e0c0 100644 (file)
@@ -4,18 +4,28 @@ import de.juplo.kafka.wordcount.splitter.InputUser;
 import de.juplo.kafka.wordcount.splitter.InputWord;
 import de.juplo.kafka.wordcount.stats.OutputWindowedWord;
 import de.juplo.kafka.wordcount.stats.OutputWordCounter;
+import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
 import org.springframework.util.LinkedMultiValueMap;
 import org.springframework.util.MultiValueMap;
 
+import java.time.Clock;
+import java.time.Instant;
+import java.time.ZonedDateTime;
+import java.util.function.BiConsumer;
 import java.util.stream.Stream;
 
+import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.WINDOW_SIZE;
 import static org.assertj.core.api.Assertions.assertThat;
 
 
+@Slf4j
 class TestData
 {
+       static final Clock CLOCK = Clock.fixed(
+                       Clock.systemDefaultZone().instant(),
+                       Clock.systemDefaultZone().getZone());
        static final String PETER = "peter";
        static final String KLAUS = "klaus";
 
@@ -25,54 +35,104 @@ class TestData
        static final String WORD_S = "s";
        static final String WORD_BOÄH = "Boäh";
 
-       static final OutputWindowedWord PETER_HALLO = OutputWindowedWord.of(PETER, WORD_HALLO);
-       static final OutputWindowedWord PETER_WELT = OutputWindowedWord.of(PETER, WORD_WELT);
-       static final OutputWindowedWord PETER_BOÄH = OutputWindowedWord.of(PETER, WORD_BOÄH);
-       static final OutputWindowedWord KLAUS_MÜSCH = OutputWindowedWord.of(KLAUS, WORD_MÜSCH);
-       static final OutputWindowedWord KLAUS_S = OutputWindowedWord.of(KLAUS, WORD_S);
+       static final OutputWindowedWord WINDOWED_WORD_HALLO = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_HALLO);
+       static final OutputWindowedWord WINDOWED_WORD_WELT = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_WELT);
+       static final OutputWindowedWord WINDOWED_WORD_BOÄH = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_BOÄH);
+       static final OutputWindowedWord WINDOWED_WORD_MÜSCH = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_MÜSCH);
+       static final OutputWindowedWord WINDOWED_WORD_S = OutputWindowedWord.of(windowStart(), windowEnd(), WORD_S);
 
-       private static final KeyValue<InputUser, InputWord>[] INPUT_MESSAGES = new KeyValue[]
+       private static Instant windowStart()
        {
-                       KeyValue.pair(
+               return windowBoundFor(CLOCK.instant(), 0);
+       }
+
+       private static Instant windowEnd()
+       {
+               return windowBoundFor(CLOCK.instant(), WINDOW_SIZE.toSecondsPart());
+       }
+
+       private static Instant windowBoundFor(Instant instant, int second)
+       {
+               return instantOfSecond(second, 0);
+       }
+
+       private static final TestMessage<InputUser, InputWord>[] INPUT_MESSAGES = new TestMessage[]
+       {
+                       TestMessage.of(
+                                       instantOfSecond(0),
                                        InputUser.of(PETER),
                                        InputWord.of(PETER, WORD_HALLO)),
-                       KeyValue.pair(
+                       TestMessage.of(
+                                       instantOfSecond(13),
                                        InputUser.of(KLAUS),
                                        InputWord.of(KLAUS, WORD_MÜSCH)),
-                       KeyValue.pair(
+                       TestMessage.of(
+                                       instantOfSecond(0),
                                        InputUser.of(PETER),
                                        InputWord.of(PETER, WORD_WELT)),
-                       KeyValue.pair(
+                       TestMessage.of(
+                                       instantOfSecond(14),
                                        InputUser.of(KLAUS),
                                        InputWord.of(KLAUS, WORD_MÜSCH)),
-                       KeyValue.pair(
+                       TestMessage.of(
+                                       instantOfSecond(14),
                                        InputUser.of(KLAUS),
                                        InputWord.of(KLAUS, WORD_S)),
-                       KeyValue.pair(
+                       TestMessage.of(
+                                       instantOfSecond(14),
                                        InputUser.of(PETER),
                                        InputWord.of(PETER, WORD_BOÄH)),
-                       KeyValue.pair(
+                       TestMessage.of(
+                                       instantOfSecond(14),
                                        InputUser.of(PETER),
                                        InputWord.of(PETER, WORD_WELT)),
-                       KeyValue.pair(
+                       TestMessage.of(
+                                       instantOfSecond(15),
                                        InputUser.of(PETER),
                                        InputWord.of(PETER, WORD_BOÄH)),
-                       KeyValue.pair(
+                       TestMessage.of(
+                                       instantOfSecond(15),
                                        InputUser.of(KLAUS),
                                        InputWord.of(KLAUS, WORD_S)),
-                       KeyValue.pair(
+                       TestMessage.of(
+                                       instantOfSecond(29),
                                        InputUser.of(PETER),
                                        InputWord.of(PETER, WORD_BOÄH)),
-                       KeyValue.pair(
+                       TestMessage.of(
+                                       instantOfSecond(20),
                                        InputUser.of(KLAUS),
                                        InputWord.of(KLAUS, WORD_S)),
        };
 
-       static Stream<KeyValue<InputUser, InputWord>> getInputMessages()
+       private static Instant instantOfSecond(int second)
+       {
+               return instantOfSecond(second, 0);
+       }
+
+       private static Instant instantOfSecond(int second, int naonSeconds)
+       {
+               return ZonedDateTime
+                               .ofInstant(CLOCK.instant(), CLOCK.getZone())
+                               .withSecond(0)
+                               .plusSeconds(second)
+                               .withNano(naonSeconds)
+                               .toInstant();
+       }
+
+       private static Stream<TestMessage<InputUser, InputWord>> getInputMessages()
        {
                return Stream.of(TestData.INPUT_MESSAGES);
        }
 
+       static void sendInputMessages(BiConsumer<Instant, KeyValue<InputUser, InputWord>> consumer)
+       {
+               getInputMessages().forEach(message ->
+               {
+                       log.info("Sending@{}: {} -> {}", message.time, message.key, message.value);
+                       consumer.accept(message.time, new KeyValue<>(message.key, message.value));
+               });
+       }
+
        static void assertExpectedMessages(MultiValueMap<OutputWindowedWord, OutputWordCounter> receivedMessages)
        {
                expectedMessages().forEach(
@@ -83,11 +143,11 @@ class TestData
 
        static void assertExpectedNumberOfMessagesForWord(MultiValueMap<OutputWindowedWord, OutputWordCounter> receivedMessages)
        {
-               assertThat(countMessagesForWord(PETER_HALLO, receivedMessages));
-               assertThat(countMessagesForWord(PETER_WELT, receivedMessages));
-               assertThat(countMessagesForWord(PETER_BOÄH, receivedMessages));
-               assertThat(countMessagesForWord(KLAUS_MÜSCH, receivedMessages));
-               assertThat(countMessagesForWord(KLAUS_S, receivedMessages));
+               assertThat(countMessagesForWord(WINDOWED_WORD_HALLO, receivedMessages));
+               assertThat(countMessagesForWord(WINDOWED_WORD_WELT, receivedMessages));
+               assertThat(countMessagesForWord(WINDOWED_WORD_BOÄH, receivedMessages));
+               assertThat(countMessagesForWord(WINDOWED_WORD_MÜSCH, receivedMessages));
+               assertThat(countMessagesForWord(WINDOWED_WORD_S, receivedMessages));
        }
 
        private static int countMessagesForWord(OutputWindowedWord word, MultiValueMap<OutputWindowedWord, OutputWordCounter> messagesForUsers)
@@ -97,43 +157,43 @@ class TestData
                                : messagesForUsers.get(word).size();
        }
 
-       static void assertExpectedState(ReadOnlyKeyValueStore<Word, Long> store)
+       static void assertExpectedState(ReadOnlyKeyValueStore<WindowedWord, WordCounter> store)
        {
-               assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, store.get(wordOf(PETER_HALLO)));
-               assertWordCountEqualsWordCountFromLastMessage(PETER_WELT, store.get(wordOf(PETER_WELT)));
-               assertWordCountEqualsWordCountFromLastMessage(PETER_BOÄH, store.get(wordOf(PETER_BOÄH)));
-               assertWordCountEqualsWordCountFromLastMessage(KLAUS_MÜSCH, store.get(wordOf(KLAUS_MÜSCH)));
-               assertWordCountEqualsWordCountFromLastMessage(KLAUS_S, store.get(wordOf(KLAUS_S)));
+               assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_HALLO, store.get(windowedWordOf(WINDOWED_WORD_HALLO)));
+               assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_WELT, store.get(windowedWordOf(WINDOWED_WORD_WELT)));
+               assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_BOÄH, store.get(windowedWordOf(WINDOWED_WORD_BOÄH)));
+               assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_MÜSCH, store.get(windowedWordOf(WINDOWED_WORD_MÜSCH)));
+               assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_S, store.get(windowedWordOf(WINDOWED_WORD_S)));
        }
 
-       private static Word wordOf(OutputWindowedWord testOutputWindowedWord)
+       private static WindowedWord windowedWordOf(OutputWindowedWord outputWindowedWord)
        {
-               Word word = new Word();
+               WindowedWord windowedWord = new WindowedWord();
 
-               word.setUser(testOutputWindowedWord.getUser());
-               word.setWord(testOutputWindowedWord.getWord());
+               windowedWord.setStart(ZonedDateTime.ofInstant(outputWindowedWord.getStart(), CLOCK.getZone()));
+               windowedWord.setEnd(ZonedDateTime.ofInstant(outputWindowedWord.getEnd(), CLOCK.getZone()));
+               windowedWord.setWord(outputWindowedWord.getWord());
 
-               return word;
+               return windowedWord;
        }
 
        static void assertExpectedLastMessagesForWord(MultiValueMap<OutputWindowedWord, OutputWordCounter> receivedMessages)
        {
-               assertWordCountEqualsWordCountFromLastMessage(PETER_HALLO, getLastMessageFor(PETER_HALLO, receivedMessages));
-               assertWordCountEqualsWordCountFromLastMessage(PETER_WELT, getLastMessageFor(PETER_WELT, receivedMessages));
-               assertWordCountEqualsWordCountFromLastMessage(PETER_BOÄH, getLastMessageFor(PETER_BOÄH, receivedMessages));
-               assertWordCountEqualsWordCountFromLastMessage(KLAUS_MÜSCH, getLastMessageFor(KLAUS_MÜSCH, receivedMessages));
-               assertWordCountEqualsWordCountFromLastMessage(KLAUS_S, getLastMessageFor(KLAUS_S, receivedMessages));
+               assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_HALLO, getLastMessageFor(WINDOWED_WORD_HALLO, receivedMessages));
+               assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_WELT, getLastMessageFor(WINDOWED_WORD_WELT, receivedMessages));
+               assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_BOÄH, getLastMessageFor(WINDOWED_WORD_BOÄH, receivedMessages));
+               assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_MÜSCH, getLastMessageFor(WINDOWED_WORD_MÜSCH, receivedMessages));
+               assertWordCountEqualsWordCountFromLastMessage(WINDOWED_WORD_S, getLastMessageFor(WINDOWED_WORD_S, receivedMessages));
        }
 
        private static void assertWordCountEqualsWordCountFromLastMessage(
                        OutputWindowedWord word,
-                       Long counter)
+                       WordCounter counter)
        {
-               OutputWordCounter testOutputWordCounter = OutputWordCounter.of(
-                               word.getUser(),
+               OutputWordCounter outputWordCounter = OutputWordCounter.of(
                                word.getWord(),
-                               counter);
-               assertWordCountEqualsWordCountFromLastMessage(word, testOutputWordCounter);
+                               counter.getCounter());
+               assertWordCountEqualsWordCountFromLastMessage(word, outputWordCounter);
        }
 
        private static void assertWordCountEqualsWordCountFromLastMessage(
@@ -161,38 +221,38 @@ class TestData
        private static final KeyValue<OutputWindowedWord, OutputWordCounter>[] EXPECTED_MESSAGES = new KeyValue[]
        {
                        KeyValue.pair(
-                                       PETER_HALLO,
-                                       OutputWordCounter.of(PETER, WORD_HALLO,1)),
+                                       WINDOWED_WORD_HALLO,
+                                       OutputWordCounter.of(WORD_HALLO,1)),
                        KeyValue.pair(
-                                       KLAUS_MÜSCH,
-                                       OutputWordCounter.of(KLAUS, WORD_MÜSCH,1)),
+                                       WINDOWED_WORD_MÜSCH,
+                                       OutputWordCounter.of(WORD_MÜSCH,1)),
                        KeyValue.pair(
-                                       PETER_WELT,
-                                       OutputWordCounter.of(PETER, WORD_WELT,1)),
+                                       WINDOWED_WORD_WELT,
+                                       OutputWordCounter.of(WORD_WELT,1)),
                        KeyValue.pair(
-                                       KLAUS_MÜSCH,
-                                       OutputWordCounter.of(KLAUS, WORD_MÜSCH,2)),
+                                       WINDOWED_WORD_MÜSCH,
+                                       OutputWordCounter.of(WORD_MÜSCH,2)),
                        KeyValue.pair(
-                                       KLAUS_S,
-                                       OutputWordCounter.of(KLAUS, WORD_S,1)),
+                                       WINDOWED_WORD_S,
+                                       OutputWordCounter.of(WORD_S,1)),
                        KeyValue.pair(
-                                       PETER_BOÄH,
-                                       OutputWordCounter.of(PETER, WORD_BOÄH,1)),
+                                       WINDOWED_WORD_BOÄH,
+                                       OutputWordCounter.of(WORD_BOÄH,1)),
                        KeyValue.pair(
-                                       PETER_WELT,
-                                       OutputWordCounter.of(PETER, WORD_WELT,2)),
+                                       WINDOWED_WORD_WELT,
+                                       OutputWordCounter.of(WORD_WELT,2)),
                        KeyValue.pair(
-                                       PETER_BOÄH,
-                                       OutputWordCounter.of(PETER, WORD_BOÄH,2)),
+                                       WINDOWED_WORD_BOÄH,
+                                       OutputWordCounter.of(WORD_BOÄH,2)),
                        KeyValue.pair(
-                                       KLAUS_S,
-                                       OutputWordCounter.of(KLAUS, WORD_S,2)),
+                                       WINDOWED_WORD_S,
+                                       OutputWordCounter.of(WORD_S,2)),
                        KeyValue.pair(
-                                       PETER_BOÄH,
-                                       OutputWordCounter.of(PETER, WORD_BOÄH,3)),
+                                       WINDOWED_WORD_BOÄH,
+                                       OutputWordCounter.of(WORD_BOÄH,3)),
                        KeyValue.pair(
-                                       KLAUS_S,
-                                       OutputWordCounter.of(KLAUS, WORD_S,3)),
+                                       WINDOWED_WORD_S,
+                                       OutputWordCounter.of(WORD_S,3)),
        };
 
        static MultiValueMap<OutputWindowedWord, OutputWordCounter> expectedMessages()
diff --git a/src/test/java/de/juplo/kafka/wordcount/popular/TestMessage.java b/src/test/java/de/juplo/kafka/wordcount/popular/TestMessage.java
new file mode 100644 (file)
index 0000000..0c777c9
--- /dev/null
@@ -0,0 +1,18 @@
+package de.juplo.kafka.wordcount.popular;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.time.Instant;
+
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor(staticName = "of")
+public class TestMessage<K, V>
+{
+  Instant time;
+  K key;
+  V value;
+}
index 83288fa..1589ea0 100644 (file)
@@ -4,12 +4,15 @@ import lombok.AllArgsConstructor;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 
+import java.time.Instant;
+
 
 @Data
 @NoArgsConstructor
 @AllArgsConstructor(staticName = "of")
 public class OutputWindowedWord
 {
-  String user;
+  Instant start;
+  Instant end;
   String word;
 }
index b5d2499..f132707 100644 (file)
@@ -10,7 +10,6 @@ import lombok.NoArgsConstructor;
 @AllArgsConstructor(staticName = "of")
 public class OutputWordCounter
 {
-  String user;
   String word;
   long counter;
 }