X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Frecorder%2FApplicationTests.java;h=3879d38efd1789c205559ffe5dcf71ef829dd474;hb=ac67b34208c00885880c093a6a46312dc16e11bb;hp=885a4088ac3636d0907acdd0c599c45fe0b2b2d4;hpb=95c8f4618fdbc28df31efb8312f413d8e5a55077;p=demos%2Fkafka%2Fwordcount diff --git a/src/test/java/de/juplo/kafka/wordcount/recorder/ApplicationTests.java b/src/test/java/de/juplo/kafka/wordcount/recorder/ApplicationTests.java index 885a408..3879d38 100644 --- a/src/test/java/de/juplo/kafka/wordcount/recorder/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/wordcount/recorder/ApplicationTests.java @@ -1,13 +1,139 @@ package de.juplo.kafka.wordcount.recorder; +import de.juplo.kafka.wordcount.splitter.TestRecording; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc; import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.http.MediaType; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.messaging.handler.annotation.Payload; +import org.springframework.test.web.servlet.MockMvc; +import org.springframework.test.web.servlet.MvcResult; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; -@SpringBootTest +import java.time.Duration; +import java.util.List; +import java.util.stream.Stream; + +import static de.juplo.kafka.wordcount.recorder.ApplicationTests.TOPIC_OUT; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; +import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.asyncDispatch; +import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post; +import static org.springframework.test.web.servlet.result.MockMvcResultHandlers.print; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; + + +@SpringBootTest( + properties = { + "spring.kafka.consumer.auto-offset-reset=earliest", + "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", + "spring.kafka.consumer.properties.spring.json.value.default.type=de.juplo.kafka.wordcount.splitter.TestRecording", + "spring.kafka.consumer.properties.spring.json.trusted.packages=de.juplo.kafka.wordcount.recorder", + "logging.level.root=WARN", + "logging.level.de.juplo=DEBUG", + "juplo.wordcount.recorder.bootstrap-server=${spring.embedded.kafka.brokers}", + "juplo.wordcount.recorder.topic=" + TOPIC_OUT }) +@AutoConfigureMockMvc +@EmbeddedKafka(topics = { TOPIC_OUT }) +@Slf4j class ApplicationTests { + static final String TOPIC_OUT = "out"; + + @Autowired + MockMvc mockMvc; + @Autowired + Consumer consumer; + + @Test - void contextLoads() + @DisplayName("Posted messages are accepted and sent to Kafka") + void userMessagesAreAcceptedAndSentToKafka() + { + MultiValueMap recordings = new LinkedMultiValueMap<>(); + + Stream + .of( + new TestRecording("päter", "Hall° Wält?¢*&%€!"), + new TestRecording("päter", "Hallo Welt!"), + new TestRecording("klühs", "Müsch gäb's auch!"), + new TestRecording("päter", "Boäh, echt! ß mal nä Nümmäh!")) + .forEach(recording -> + { + sendRedording(recording.getUser(), recording.getSentence()); + recordings.add(recording.getUser(), recording); + }); + + + await("Received expected messages") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> recordings.forEach((user, userRecordings) -> + assertThat(consumer.receivedFor(user)).containsExactlyElementsOf(userRecordings))); + } + + void sendRedording(String user, String sentence) + { + try + { + MvcResult result = mockMvc + .perform(post("/{user}", user) + .contentType(MediaType.TEXT_PLAIN) + .content(sentence)) + .andReturn(); + + mockMvc.perform(asyncDispatch(result)) + .andDo(print()) + .andExpect(status().isOk()) + .andExpect(jsonPath("$.username").value(user)) + .andExpect(jsonPath("$.sentence").value(sentence)); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + + + static class Consumer + { + private final MultiValueMap received = new LinkedMultiValueMap<>(); + + @KafkaListener(groupId = "TEST", topics = TOPIC_OUT) + public synchronized void receive( + @Header(KafkaHeaders.RECEIVED_KEY) String user, + @Payload TestRecording recording) + { + log.debug("Received message: {}={}", user, recording); + received.add(user, recording); + } + + synchronized List receivedFor(String user) + { + List recordings = received.get(user); + return recordings == null + ? List.of() + : received.get(user).stream().toList(); + } + } + + @TestConfiguration + static class Configuration { + @Bean + Consumer consumer() + { + return new Consumer(); + } } }