popular: 1.0.0 - Word are counted for hopping time-windows
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / popular / PopularStreamProcessorTopologyTest.java
index 3e71d3e..1d62bef 100644 (file)
@@ -20,10 +20,13 @@ import org.springframework.kafka.support.serializer.JsonSerializer;
 import org.springframework.util.LinkedMultiValueMap;
 import org.springframework.util.MultiValueMap;
 
+import java.time.Duration;
+import java.time.ZoneId;
 import java.util.Map;
 
 import static de.juplo.kafka.wordcount.popular.PopularApplicationConfiguriation.serializationConfig;
-import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.STORE_NAME;
+import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.KEY_VALUE_STORE_NAME;
+import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.WINDOW_STORE_NAME;
 
 
 @Slf4j
@@ -31,6 +34,7 @@ public class PopularStreamProcessorTopologyTest
 {
   public static final String IN = "TEST-IN";
   public static final String OUT = "TEST-OUT";
+  public static final ZoneId ZONE = ZoneId.of("Europe/Berlin");
 
 
   static TopologyTestDriver testDriver;
@@ -43,7 +47,13 @@ public class PopularStreamProcessorTopologyTest
     Topology topology = PopularStreamProcessor.buildTopology(
         IN,
         OUT,
-        Stores.inMemoryKeyValueStore(STORE_NAME));
+        ZONE,
+        Stores.inMemoryWindowStore(
+            WINDOW_STORE_NAME,
+            Duration.ofSeconds(60),
+            Duration.ofSeconds(30),
+            false),
+        Stores.inMemoryKeyValueStore(KEY_VALUE_STORE_NAME));
 
     testDriver = new TopologyTestDriver(topology, serializationConfig());
 
@@ -52,9 +62,7 @@ public class PopularStreamProcessorTopologyTest
     TestOutputTopic<OutputWindowedWord, OutputWordCounter> out =
         testDriver.createOutputTopic(OUT, keyDeserializer(), valueDeserializer());
 
-    TestData
-        .getInputMessages()
-        .forEach(kv -> in.pipeInput(kv.key, kv.value));
+    TestData.sendInputMessages((instant, kv) -> in.pipeInput(kv.key, kv.value, instant));
 
     receivedMessages = new LinkedMultiValueMap<>();
     out
@@ -88,7 +96,7 @@ public class PopularStreamProcessorTopologyTest
   @Test
   public void testExpectedState()
   {
-    KeyValueStore<Word, Long> store = testDriver.getKeyValueStore(STORE_NAME);
+    KeyValueStore<WindowedWord, WordCounter> store = testDriver.getKeyValueStore(KEY_VALUE_STORE_NAME);
     TestData.assertExpectedState(store);
   }