import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_IN;
import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.TOPIC_OUT;
+import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME;
import static org.awaitility.Awaitility.await;
@Autowired
Consumer consumer;
+ @Autowired
+ CounterStreamProcessor streamProcessor;
@BeforeAll
{
await("Expected number of messages")
.atMost(Duration.ofSeconds(5))
- .untilAsserted(() -> consumer.enforceAssertion(TestData.expectedNumberOfMessagesForWordAssertion()));
+ .untilAsserted(() -> consumer.enforceAssertion(
+ receivedMessages -> TestData.assertExpectedNumberOfMessagesForWord(receivedMessages)));
}
@DisplayName("Await the expected output messages")
{
await("Expected messages")
.atMost(Duration.ofSeconds(10))
- .untilAsserted(() -> consumer.enforceAssertion(TestData.expectedMessagesAssertion()));
+ .untilAsserted(() -> consumer.enforceAssertion(
+ receivedMessages -> TestData.assertExpectedMessages(receivedMessages)));
}
@DisplayName("Await the expected final output messages")
{
await("Expected final output messages")
.atMost(Duration.ofSeconds(5))
- .untilAsserted(() -> consumer.enforceAssertion(TestData.expectedLastMessagesForWordAssertion()));
+ .untilAsserted(() -> consumer.enforceAssertion(
+ receivedMessages -> TestData.assertExpectedLastMessagesForWord(receivedMessages)));
+ }
+
+ @DisplayName("Await the expected state in the state-store")
+ @Test
+ public void testAwaitExpectedState()
+ {
+ await("Expected state")
+ .atMost(Duration.ofSeconds(5))
+ .untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore()));
}
@Bean
KeyValueBytesStoreSupplier inMemoryStoreSupplier()
{
- return Stores.inMemoryKeyValueStore("TEST-STORE");
+ return Stores.inMemoryKeyValueStore(STORE_NAME);
}
}
}