package de.juplo.kafka.wordcount.top10;
-import de.juplo.kafka.wordcount.counter.TestWord;
import de.juplo.kafka.wordcount.counter.TestCounter;
+import de.juplo.kafka.wordcount.counter.TestWord;
import de.juplo.kafka.wordcount.query.TestRanking;
+import de.juplo.kafka.wordcount.query.TestUser;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.Stores;
-import org.junit.jupiter.api.*;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.util.MultiValueMap;
import java.time.Duration;
-import java.util.stream.Stream;
+import static de.juplo.kafka.wordcount.top10.Top10StreamProcessor.STORE_NAME;
import static org.awaitility.Awaitility.await;
"spring.kafka.consumer.auto-offset-reset=earliest",
"spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
"spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
- "spring.kafka.consumer.properties.spring.json.use.type.headers=false",
- "spring.kafka.consumer.properties.spring.json.key.default.type=de.juplo.kafka.wordcount.top10.User",
- "spring.kafka.consumer.properties.spring.json.value.default.type=de.juplo.kafka.wordcount.query.TestRanking",
- "spring.kafka.consumer.properties.spring.json.trusted.packages=de.juplo.kafka.wordcount.top10 ",
+ "spring.kafka.consumer.properties.spring.json.type.mapping=user:de.juplo.kafka.wordcount.query.TestUser,ranking:de.juplo.kafka.wordcount.query.TestRanking",
"logging.level.root=WARN",
"logging.level.de.juplo=DEBUG",
"logging.level.org.apache.kafka.clients=INFO",
{
public static final String TOPIC_IN = "in";
public static final String TOPIC_OUT = "out";
- public static final String STORE_NAME = "TEST-STORE";
@Autowired
Consumer consumer;
public static void testSendMessage(
@Autowired KafkaTemplate<TestWord, TestCounter> kafkaTemplate)
{
- Stream
- .of(TestData.INPUT_MESSAGES)
+ TestData
+ .getInputMessages()
.forEach(kv ->
{
try
{
await("Expected state")
.atMost(Duration.ofSeconds(5))
- .untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore(STORE_NAME)));
+ .untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore()));
}
@DisplayName("Await the expected output messages")
{
await("Expected messages")
.atMost(Duration.ofSeconds(5))
- .untilAsserted(() -> TestData.assertExpectedMessages(consumer.getReceivedMessages()));
+ .untilAsserted(() -> consumer.enforceAssertion(
+ receivedMessages -> TestData.assertExpectedMessages(receivedMessages)));
+ }
+
+ @DisplayName("Await the expected number of messages")
+ @Test
+ public void testAwaitExpectedNumberOfMessagesForUsers()
+ {
+ await("Expected number of messages")
+ .atMost(Duration.ofSeconds(5))
+ .untilAsserted(() -> consumer.enforceAssertion(
+ receivedMessages -> TestData.assertExpectedNumberOfMessagesForUsers(receivedMessages)));
}
@DisplayName("Await the expected final output messages")
{
await("Expected final output messages")
.atMost(Duration.ofSeconds(5))
- .untilAsserted(() -> TestData.assertExpectedLastMessagesForUsers(consumer.getReceivedMessages()));
+ .untilAsserted(() -> consumer.enforceAssertion(
+ receivedMessages -> TestData.assertExpectedLastMessagesForUsers(receivedMessages)));
}
static class Consumer
{
- private final MultiValueMap<User, Ranking> received = new LinkedMultiValueMap<>();
+ private final MultiValueMap<TestUser, TestRanking> received = new LinkedMultiValueMap<>();
@KafkaListener(groupId = "TEST", topics = TOPIC_OUT)
public synchronized void receive(
- @Header(KafkaHeaders.RECEIVED_KEY) User user,
+ @Header(KafkaHeaders.RECEIVED_KEY) TestUser user,
@Payload TestRanking ranking)
{
log.debug("Received message: {} -> {}", user, ranking);
- received.add(user, Ranking.of(ranking.getEntries()));
+ received.add(user, ranking);
}
- synchronized MultiValueMap<User, Ranking> getReceivedMessages()
+ synchronized void enforceAssertion(
+ java.util.function.Consumer<MultiValueMap<TestUser, TestRanking>> assertion)
{
- return received;
+ assertion.accept(received);
}
}