import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.Stores;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import java.time.Duration;
import java.util.stream.Stream;
+import static de.juplo.kafka.wordcount.top10.Top10StreamProcessor.STORE_NAME;
import static org.awaitility.Awaitility.await;
{
public static final String TOPIC_IN = "in";
public static final String TOPIC_OUT = "out";
- public static final String STORE_NAME = "TEST-STORE";
@Autowired
Consumer consumer;
{
await("Expected state")
.atMost(Duration.ofSeconds(5))
- .untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore(STORE_NAME)));
+ .untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore()));
}
@DisplayName("Await the expected output messages")
@Test
+ @Disabled
public void testAwaitExpectedMessages()
{
await("Expected messages")
.untilAsserted(() -> TestData.assertExpectedMessages(consumer.getReceivedMessages()));
}
+ @DisplayName("Await the expected number of messages")
+ @Test
+ public void testAwaitExpectedNumberOfMessagesForUsers()
+ {
+ await("Expected number of messages")
+ .atMost(Duration.ofSeconds(5))
+ .untilAsserted(() -> TestData.assertExpectedNumberOfMessagesForUsers(consumer.getReceivedMessages()));
+ }
+
+ @DisplayName("Await the expected final output messages")
+ @Test
+ public void testAwaitExpectedLastMessagesForUsers()
+ {
+ await("Expected final output messages")
+ .atMost(Duration.ofSeconds(5))
+ .untilAsserted(() -> TestData.assertExpectedLastMessagesForUsers(consumer.getReceivedMessages()));
+ }
+
static class Consumer
{