X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Ftop10%2FTop10ApplicationIT.java;h=a1bc1f02e94b670c1750a61deb0f68445a365b19;hb=e59d8080ffe2750774c881766e691012964c9099;hp=1097310ccde0fcc2d69e793c0bbdc8aba9e4a137;hpb=fa19058953b388f893255d2bcc9351ac9eb3328b;p=demos%2Fkafka%2Fwordcount diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java index 1097310..a1bc1f0 100644 --- a/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/top10/Top10ApplicationIT.java @@ -3,6 +3,7 @@ package de.juplo.kafka.wordcount.top10; import de.juplo.kafka.wordcount.counter.TestWord; import de.juplo.kafka.wordcount.counter.TestCounter; import de.juplo.kafka.wordcount.query.TestRanking; +import de.juplo.kafka.wordcount.query.TestUser; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.Stores; @@ -25,6 +26,7 @@ import org.springframework.util.MultiValueMap; import java.time.Duration; import java.util.stream.Stream; +import static de.juplo.kafka.wordcount.top10.Top10StreamProcessor.STORE_NAME; import static org.awaitility.Awaitility.await; @@ -36,10 +38,7 @@ import static org.awaitility.Awaitility.await; "spring.kafka.consumer.auto-offset-reset=earliest", "spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", - "spring.kafka.consumer.properties.spring.json.use.type.headers=false", - "spring.kafka.consumer.properties.spring.json.key.default.type=de.juplo.kafka.wordcount.top10.User", - "spring.kafka.consumer.properties.spring.json.value.default.type=de.juplo.kafka.wordcount.query.TestRanking", - "spring.kafka.consumer.properties.spring.json.trusted.packages=de.juplo.kafka.wordcount.top10 ", + "spring.kafka.consumer.properties.spring.json.type.mapping=user:de.juplo.kafka.wordcount.query.TestUser,ranking:de.juplo.kafka.wordcount.query.TestRanking", "logging.level.root=WARN", "logging.level.de.juplo=DEBUG", "logging.level.org.apache.kafka.clients=INFO", @@ -55,7 +54,6 @@ public class Top10ApplicationIT { public static final String TOPIC_IN = "in"; public static final String TOPIC_OUT = "out"; - public static final String STORE_NAME = "TEST-STORE"; @Autowired Consumer consumer; @@ -94,7 +92,7 @@ public class Top10ApplicationIT { await("Expected state") .atMost(Duration.ofSeconds(5)) - .untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore(STORE_NAME))); + .untilAsserted(() -> TestData.assertExpectedState(streamProcessor.getStore())); } @DisplayName("Await the expected output messages") @@ -104,7 +102,8 @@ public class Top10ApplicationIT { await("Expected messages") .atMost(Duration.ofSeconds(5)) - .untilAsserted(() -> TestData.assertExpectedMessages(consumer.getReceivedMessages())); + .untilAsserted(() -> consumer.enforceAssertion( + receivedMessages -> TestData.assertExpectedMessages(receivedMessages))); } @DisplayName("Await the expected number of messages") @@ -113,7 +112,8 @@ public class Top10ApplicationIT { await("Expected number of messages") .atMost(Duration.ofSeconds(5)) - .untilAsserted(() -> TestData.assertExpectedNumberOfMessagesForUsers(consumer.getReceivedMessages())); + .untilAsserted(() -> consumer.enforceAssertion( + receivedMessages -> TestData.assertExpectedNumberOfMessagesForUsers(receivedMessages))); } @DisplayName("Await the expected final output messages") @@ -122,26 +122,28 @@ public class Top10ApplicationIT { await("Expected final output messages") .atMost(Duration.ofSeconds(5)) - .untilAsserted(() -> TestData.assertExpectedLastMessagesForUsers(consumer.getReceivedMessages())); + .untilAsserted(() -> consumer.enforceAssertion( + receivedMessages -> TestData.assertExpectedLastMessagesForUsers(receivedMessages))); } static class Consumer { - private final MultiValueMap received = new LinkedMultiValueMap<>(); + private final MultiValueMap received = new LinkedMultiValueMap<>(); @KafkaListener(groupId = "TEST", topics = TOPIC_OUT) public synchronized void receive( - @Header(KafkaHeaders.RECEIVED_KEY) User user, + @Header(KafkaHeaders.RECEIVED_KEY) TestUser user, @Payload TestRanking ranking) { log.debug("Received message: {} -> {}", user, ranking); - received.add(user, Ranking.of(ranking.getEntries())); + received.add(user, ranking); } - synchronized MultiValueMap getReceivedMessages() + synchronized void enforceAssertion( + java.util.function.Consumer> assertion) { - return received; + assertion.accept(received); } }