import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_IN;
import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_OUT;
+import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME;
import static org.awaitility.Awaitility.await;
@Autowired
Consumer consumer;
+ @Autowired
+ CounterStreamProcessor streamProcessor;
@BeforeAll
.untilAsserted(() -> consumer.enforceAssertion(TestData.expectedMessagesAssertion()));
}
+ @DisplayName("Await the expected final output messages")
+ @Test
+ public void testAwaitExpectedLastMessagesForUsers()
+ {
+ await("Expected final output messages")
+ .atMost(Duration.ofSeconds(5))
+ .untilAsserted(() -> consumer.enforceAssertion(TestData.expectedLastMessagesForWordAssertion()));
+ }
+
+ @DisplayName("Await the expected state in the state-store")
+ @Test
+ public void testAwaitExpectedState()
+ {
+ await("Expected state")
+ .atMost(Duration.ofSeconds(5))
+ .untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore()));
+ }
+
static class Consumer
{
@Bean
KeyValueBytesStoreSupplier inMemoryStoreSupplier()
{
- return Stores.inMemoryKeyValueStore("TEST-STORE");
+ return Stores.inMemoryKeyValueStore(STORE_NAME);
}
}
}