package de.juplo.kafka.wordcount.top10;
-import de.juplo.kafka.wordcount.counter.TestWord;
import de.juplo.kafka.wordcount.counter.TestCounter;
+import de.juplo.kafka.wordcount.counter.TestWord;
import de.juplo.kafka.wordcount.query.TestRanking;
-import de.juplo.kafka.wordcount.query.TestUser;
+import de.juplo.kafka.wordcount.query.TestStats;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.Stores;
-import org.junit.jupiter.api.*;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.TestConfiguration;
import org.springframework.util.MultiValueMap;
import java.time.Duration;
-import java.util.stream.Stream;
import static de.juplo.kafka.wordcount.top10.Top10StreamProcessor.STORE_NAME;
import static org.awaitility.Awaitility.await;
properties = {
"spring.kafka.producer.key-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
"spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer",
- "spring.kafka.producer.properties.spring.json.type.mapping=word:de.juplo.kafka.wordcount.counter.TestWord,counter:de.juplo.kafka.wordcount.counter.TestCounter",
+ "spring.kafka.producer.properties.spring.json.type.mapping=key:de.juplo.kafka.wordcount.counter.TestWord,counter:de.juplo.kafka.wordcount.counter.TestCounter",
"spring.kafka.consumer.auto-offset-reset=earliest",
"spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
"spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer",
- "spring.kafka.consumer.properties.spring.json.type.mapping=user:de.juplo.kafka.wordcount.query.TestUser,ranking:de.juplo.kafka.wordcount.query.TestRanking",
+ "spring.kafka.consumer.properties.spring.json.type.mapping=stats:de.juplo.kafka.wordcount.query.TestStats,ranking:de.juplo.kafka.wordcount.query.TestRanking",
"logging.level.root=WARN",
"logging.level.de.juplo=DEBUG",
"logging.level.org.apache.kafka.clients=INFO",
public static void testSendMessage(
@Autowired KafkaTemplate<TestWord, TestCounter> kafkaTemplate)
{
- Stream
- .of(TestData.INPUT_MESSAGES)
+ TestData
+ .getInputMessages()
.forEach(kv ->
{
try
@DisplayName("Await the expected number of messages")
@Test
- public void testAwaitExpectedNumberOfMessagesForUsers()
+ public void testAwaitExpectedNumberOfMessages()
{
await("Expected number of messages")
.atMost(Duration.ofSeconds(5))
.untilAsserted(() -> consumer.enforceAssertion(
- receivedMessages -> TestData.assertExpectedNumberOfMessagesForUsers(receivedMessages)));
+ receivedMessages -> TestData.assertExpectedNumberOfMessages(receivedMessages)));
}
@DisplayName("Await the expected final output messages")
@Test
- public void testAwaitExpectedLastMessagesForUsers()
+ public void testAwaitExpectedLastMessages()
{
await("Expected final output messages")
.atMost(Duration.ofSeconds(5))
.untilAsserted(() -> consumer.enforceAssertion(
- receivedMessages -> TestData.assertExpectedLastMessagesForUsers(receivedMessages)));
+ receivedMessages -> TestData.assertExpectedLastMessages(receivedMessages)));
}
static class Consumer
{
- private final MultiValueMap<TestUser, TestRanking> received = new LinkedMultiValueMap<>();
+ private final MultiValueMap<TestStats, TestRanking> received = new LinkedMultiValueMap<>();
@KafkaListener(groupId = "TEST", topics = TOPIC_OUT)
public synchronized void receive(
- @Header(KafkaHeaders.RECEIVED_KEY) TestUser user,
+ @Header(KafkaHeaders.RECEIVED_KEY) TestStats user,
@Payload TestRanking ranking)
{
log.debug("Received message: {} -> {}", user, ranking);
}
synchronized void enforceAssertion(
- java.util.function.Consumer<MultiValueMap<TestUser, TestRanking>> assertion)
+ java.util.function.Consumer<MultiValueMap<TestStats, TestRanking>> assertion)
{
assertion.accept(received);
}