import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import static de.juplo.kafka.wordcount.popular.PopularApplicationIT.TOPIC_IN;
import static de.juplo.kafka.wordcount.popular.PopularApplicationIT.TOPIC_OUT;
-import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.STORE_NAME;
+import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.KEY_VALUE_STORE_NAME;
+import static de.juplo.kafka.wordcount.popular.PopularStreamProcessor.WINDOW_STORE_NAME;
import static org.awaitility.Awaitility.await;
@Autowired KafkaTemplate<InputUser, InputWord> kafkaTemplate)
{
TestData
- .getInputMessages()
- .forEach(kv ->
+ .sendInputMessages((instant, kv) ->
{
try
{
- SendResult<InputUser, InputWord> result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get();
+ SendResult<InputUser, InputWord> result = kafkaTemplate.send(TOPIC_IN, null, instant.toEpochMilli(), kv.key, kv.value).get();
log.info(
"Sent: {}={}, partition={}, offset={}",
result.getProducerRecord().key(),
@DisplayName("Await the expected final output messages")
@Test
- public void testAwaitExpectedLastMessagesForUsers()
+ public void testAwaitExpectedLastMessagesForWord()
{
await("Expected final output messages")
.atMost(Duration.ofSeconds(5))
}
@Bean
- KeyValueBytesStoreSupplier storeSupplier()
+ WindowBytesStoreSupplier windowBytesStoreSupplier()
{
- return Stores.inMemoryKeyValueStore(STORE_NAME);
+ return Stores.inMemoryWindowStore(
+ WINDOW_STORE_NAME,
+ Duration.ofSeconds(60),
+ Duration.ofSeconds(30),
+ false);
+ }
+
+ @Bean
+ KeyValueBytesStoreSupplier keyValueBytesStoreSupplier()
+ {
+ return Stores.inMemoryKeyValueStore(KEY_VALUE_STORE_NAME);
}
}
}