import java.time.Duration;
import java.util.stream.Stream;
+import static de.juplo.kafka.wordcount.top10.Top10StreamProcessor.STORE_NAME;
import static org.awaitility.Awaitility.await;
{
public static final String TOPIC_IN = "in";
public static final String TOPIC_OUT = "out";
- public static final String STORE_NAME = "TEST-STORE";
@Autowired
Consumer consumer;
{
await("Expected state")
.atMost(Duration.ofSeconds(5))
- .untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore(STORE_NAME)));
+ .untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore()));
}
@DisplayName("Await the expected output messages")
.untilAsserted(() -> TestData.assertExpectedMessages(consumer.getReceivedMessages()));
}
+ @DisplayName("Await the expected number of messages")
+ @Test
+ public void testAwaitExpectedNumberOfMessagesForUsers()
+ {
+ await("Expected number of messages")
+ .atMost(Duration.ofSeconds(5))
+ .untilAsserted(() -> TestData.assertExpectedNumberOfMessagesForUsers(consumer.getReceivedMessages()));
+ }
+
+ @DisplayName("Await the expected final output messages")
+ @Test
+ public void testAwaitExpectedLastMessagesForUsers()
+ {
+ await("Expected final output messages")
+ .atMost(Duration.ofSeconds(5))
+ .untilAsserted(() -> TestData.assertExpectedLastMessagesForUsers(consumer.getReceivedMessages()));
+ }
+
static class Consumer
{