import org.springframework.util.LinkedMultiValueMap;
import org.springframework.util.MultiValueMap;
+import java.time.Duration;
+import java.time.ZoneId;
import java.util.Map;
import static de.juplo.kafka.wordcount.popular.PopularApplicationConfiguriation.serializationConfig;
-import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.STORE_NAME;
+import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.KEY_VALUE_STORE_NAME;
+import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.WINDOW_STORE_NAME;
@Slf4j
{
public static final String IN = "TEST-IN";
public static final String OUT = "TEST-OUT";
+ public static final ZoneId ZONE = ZoneId.of("Europe/Berlin");
static TopologyTestDriver testDriver;
Topology topology = PopularStreamProcessor.buildTopology(
IN,
OUT,
- Stores.inMemoryKeyValueStore(STORE_NAME));
+ ZONE,
+ Stores.inMemoryWindowStore(
+ WINDOW_STORE_NAME,
+ Duration.ofSeconds(60),
+ Duration.ofSeconds(30),
+ false),
+ Stores.inMemoryKeyValueStore(KEY_VALUE_STORE_NAME));
testDriver = new TopologyTestDriver(topology, serializationConfig());
TestOutputTopic<OutputWindowedWord, OutputWordCounter> out =
testDriver.createOutputTopic(OUT, keyDeserializer(), valueDeserializer());
- TestData
- .getInputMessages()
- .forEach(kv -> in.pipeInput(kv.key, kv.value));
+ TestData.sendInputMessages((instant, kv) -> in.pipeInput(kv.key, kv.value, instant));
receivedMessages = new LinkedMultiValueMap<>();
out
@Test
public void testExpectedState()
{
- KeyValueStore<Word, Long> store = testDriver.getKeyValueStore(STORE_NAME);
+ KeyValueStore<WindowedWord, WordCounter> store = testDriver.getKeyValueStore(KEY_VALUE_STORE_NAME);
TestData.assertExpectedState(store);
}