popular: 1.0.0 - Word are counted for hopping time-windows
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / popular / PopularStreamProcessorTopologyTest.java
index e80e383..1d62bef 100644 (file)
@@ -1,9 +1,9 @@
-package de.juplo.kafka.wordcount.counter;
+package de.juplo.kafka.wordcount.popular;
 
-import de.juplo.kafka.wordcount.splitter.TestInputUser;
-import de.juplo.kafka.wordcount.splitter.TestInputWord;
-import de.juplo.kafka.wordcount.top10.TestOutputWord;
-import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
+import de.juplo.kafka.wordcount.splitter.InputUser;
+import de.juplo.kafka.wordcount.splitter.InputWord;
+import de.juplo.kafka.wordcount.stats.OutputWindowedWord;
+import de.juplo.kafka.wordcount.stats.OutputWordCounter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.streams.TestInputTopic;
 import org.apache.kafka.streams.TestOutputTopic;
@@ -20,41 +20,49 @@ import org.springframework.kafka.support.serializer.JsonSerializer;
 import org.springframework.util.LinkedMultiValueMap;
 import org.springframework.util.MultiValueMap;
 
+import java.time.Duration;
+import java.time.ZoneId;
 import java.util.Map;
 
-import static de.juplo.kafka.wordcount.counter.CounterApplicationConfiguriation.serializationConfig;
-import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME;
+import static de.juplo.kafka.wordcount.popular.PopularApplicationConfiguriation.serializationConfig;
+import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.KEY_VALUE_STORE_NAME;
+import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.WINDOW_STORE_NAME;
 
 
 @Slf4j
-public class CounterStreamProcessorTopologyTest
+public class PopularStreamProcessorTopologyTest
 {
   public static final String IN = "TEST-IN";
   public static final String OUT = "TEST-OUT";
+  public static final ZoneId ZONE = ZoneId.of("Europe/Berlin");
 
 
   static TopologyTestDriver testDriver;
-  static MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages = new LinkedMultiValueMap<>();
+  static MultiValueMap<OutputWindowedWord, OutputWordCounter> receivedMessages = new LinkedMultiValueMap<>();
 
 
   @BeforeAll
   public static void setUpTestDriver()
   {
-    Topology topology = CounterStreamProcessor.buildTopology(
+    Topology topology = PopularStreamProcessor.buildTopology(
         IN,
         OUT,
-        Stores.inMemoryKeyValueStore(STORE_NAME));
+        ZONE,
+        Stores.inMemoryWindowStore(
+            WINDOW_STORE_NAME,
+            Duration.ofSeconds(60),
+            Duration.ofSeconds(30),
+            false),
+        Stores.inMemoryKeyValueStore(KEY_VALUE_STORE_NAME));
 
     testDriver = new TopologyTestDriver(topology, serializationConfig());
 
-    TestInputTopic<TestInputUser, TestInputWord> in =
+    TestInputTopic<InputUser, InputWord> in =
         testDriver.createInputTopic(IN, serializer(), serializer());
-    TestOutputTopic<TestOutputWord, TestOutputWordCounter> out =
+    TestOutputTopic<OutputWindowedWord, OutputWordCounter> out =
         testDriver.createOutputTopic(OUT, keyDeserializer(), valueDeserializer());
 
-    TestData
-        .getInputMessages()
-        .forEach(kv -> in.pipeInput(kv.key, kv.value));
+    TestData.sendInputMessages((instant, kv) -> in.pipeInput(kv.key, kv.value, instant));
 
     receivedMessages = new LinkedMultiValueMap<>();
     out
@@ -88,7 +96,7 @@ public class CounterStreamProcessorTopologyTest
   @Test
   public void testExpectedState()
   {
-    KeyValueStore<Word, Long> store = testDriver.getKeyValueStore(STORE_NAME);
+    KeyValueStore<WindowedWord, WordCounter> store = testDriver.getKeyValueStore(KEY_VALUE_STORE_NAME);
     TestData.assertExpectedState(store);
   }
 
@@ -104,12 +112,12 @@ public class CounterStreamProcessorTopologyTest
     return new JsonSerializer().noTypeInfo();
   }
 
-  private static JsonDeserializer<TestOutputWord> keyDeserializer()
+  private static JsonDeserializer<OutputWindowedWord> keyDeserializer()
   {
     return deserializer(true);
   }
 
-  private static JsonDeserializer<TestOutputWordCounter> valueDeserializer()
+  private static JsonDeserializer<OutputWordCounter> valueDeserializer()
   {
     return deserializer(false);
   }
@@ -125,6 +133,6 @@ public class CounterStreamProcessorTopologyTest
 
   private static String typeMappingsConfig()
   {
-    return CounterStreamProcessor.typeMappingsConfig(TestOutputWord.class, TestOutputWordCounter.class);
+    return PopularStreamProcessor.typeMappingsConfig(OutputWindowedWord.class, OutputWordCounter.class);
   }
 }