popular: 1.0.0 - Word are counted for hopping time-windows
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / popular / PopularApplicationIT.java
index a327389..20df7ac 100644 (file)
@@ -7,6 +7,7 @@ import de.juplo.kafka.wordcount.stats.OutputWordCounter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.DisplayName;
 import org.junit.jupiter.api.Test;
@@ -28,7 +29,8 @@ import java.time.Duration;
 
 import static de.juplo.kafka.wordcount.popular.PopularApplicationIT.TOPIC_IN;
 import static de.juplo.kafka.wordcount.popular.PopularApplicationIT.TOPIC_OUT;
-import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.STORE_NAME;
+import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.KEY_VALUE_STORE_NAME;
+import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.WINDOW_STORE_NAME;
 import static org.awaitility.Awaitility.await;
 
 
@@ -67,12 +69,11 @@ public class PopularApplicationIT
                        @Autowired KafkaTemplate<InputUser, InputWord> kafkaTemplate)
        {
                TestData
-                               .getInputMessages()
-                               .forEach(kv ->
+                               .sendInputMessages((instant, kv) ->
                                {
                                        try
                                        {
-                                               SendResult<InputUser, InputWord> result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get();
+                                               SendResult<InputUser, InputWord> result = kafkaTemplate.send(TOPIC_IN, null, instant.toEpochMilli(), kv.key, kv.value).get();
                                                log.info(
                                                                "Sent: {}={}, partition={}, offset={}",
                                                                result.getProducerRecord().key(),
@@ -109,7 +110,7 @@ public class PopularApplicationIT
 
        @DisplayName("Await the expected final output messages")
        @Test
-       public void testAwaitExpectedLastMessagesForUsers()
+       public void testAwaitExpectedLastMessagesForWord()
        {
                await("Expected final output messages")
                                .atMost(Duration.ofSeconds(5))
@@ -157,9 +158,19 @@ public class PopularApplicationIT
                }
 
                @Bean
-               KeyValueBytesStoreSupplier storeSupplier()
+               WindowBytesStoreSupplier windowBytesStoreSupplier()
                {
-                       return Stores.inMemoryKeyValueStore(STORE_NAME);
+                       return Stores.inMemoryWindowStore(
+                                       WINDOW_STORE_NAME,
+                                       Duration.ofSeconds(60),
+                                       Duration.ofSeconds(30),
+                                       false);
+               }
+
+               @Bean
+               KeyValueBytesStoreSupplier keyValueBytesStoreSupplier()
+               {
+                       return Stores.inMemoryKeyValueStore(KEY_VALUE_STORE_NAME);
                }
        }
 }