X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fcounter%2FCounterApplicationIT.java;h=d08930c8e41c8bb28996399671dd6c2a9e6b7146;hb=237133719b1d06d542302fb948d9cf3aff80a8a4;hp=2f1e0c3a2b2b9a02577264c7dc43c311e904dba4;hpb=44f1ad5dcd50851ef5d93b1be759481d5a38f63a;p=demos%2Fkafka%2Fwordcount diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java index 2f1e0c3..d08930c 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/CounterApplicationIT.java @@ -6,7 +6,8 @@ import de.juplo.kafka.wordcount.top10.TestOutputWordCounter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; import org.apache.kafka.streams.state.Stores; -import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @@ -16,6 +17,7 @@ import org.springframework.context.annotation.Primary; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.kafka.support.SendResult; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.Payload; @@ -51,29 +53,60 @@ public class CounterApplicationIT public static final String TOPIC_IN = "in"; public static final String TOPIC_OUT = "out"; - @Autowired - KafkaTemplate kafkaTemplate; @Autowired Consumer consumer; - @BeforeEach - public void clear() + @BeforeAll + public static void testSendMessage( + @Autowired KafkaTemplate kafkaTemplate) { - consumer.received.clear(); + TestData + .getInputMessages() + .forEach(kv -> + { + try + { + SendResult result = kafkaTemplate.send(TOPIC_IN, kv.key, kv.value).get(); + log.info( + "Sent: {}={}, partition={}, offset={}", + result.getProducerRecord().key(), + result.getProducerRecord().value(), + result.getRecordMetadata().partition(), + result.getRecordMetadata().offset()); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + }); } + @DisplayName("Await the expected number of messages") + @Test + public void testAwaitExpectedNumberOfMessagesForUsers() + { + await("Expected number of messages") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> consumer.enforceAssertion(TestData.expectedNumberOfMessagesForWordAssertion())); + } + @DisplayName("Await the expected output messages") @Test void testSendMessage() { - TestData - .getInputMessages() - .forEach(kv -> kafkaTemplate.send(TOPIC_IN, kv.key, kv.value)); - await("Expected messages") .atMost(Duration.ofSeconds(10)) - .untilAsserted(() -> TestData.assertExpectedMessages(consumer.getReceivedMessages())); + .untilAsserted(() -> consumer.enforceAssertion(TestData.expectedMessagesAssertion())); + } + + @DisplayName("Await the expected final output messages") + @Test + public void testAwaitExpectedLastMessagesForUsers() + { + await("Expected final output messages") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> consumer.enforceAssertion(TestData.expectedLastMessagesForWordAssertion())); } @@ -90,9 +123,10 @@ public class CounterApplicationIT received.add(word, counter); } - synchronized MultiValueMap getReceivedMessages() + synchronized void enforceAssertion( + java.util.function.Consumer> assertion) { - return received; + assertion.accept(received); } }