From: Kai Moritz Date: Sun, 2 Jun 2024 10:20:01 +0000 (+0200) Subject: recorder: 1.2.0 - `ApplicationTest` -> `RecorderApplicationIT` -- MOVE X-Git-Tag: recorder-1.2.0~6 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=928536af97a88dc121e1f5438b8fb671ee861960;p=demos%2Fkafka%2Fwordcount recorder: 1.2.0 - `ApplicationTest` -> `RecorderApplicationIT` -- MOVE --- diff --git a/src/test/java/de/juplo/kafka/wordcount/recorder/ApplicationTests.java b/src/test/java/de/juplo/kafka/wordcount/recorder/ApplicationTests.java deleted file mode 100644 index 3879d38..0000000 --- a/src/test/java/de/juplo/kafka/wordcount/recorder/ApplicationTests.java +++ /dev/null @@ -1,139 +0,0 @@ -package de.juplo.kafka.wordcount.recorder; - -import de.juplo.kafka.wordcount.splitter.TestRecording; -import lombok.extern.slf4j.Slf4j; -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.boot.test.context.TestConfiguration; -import org.springframework.context.annotation.Bean; -import org.springframework.http.MediaType; -import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.kafka.support.KafkaHeaders; -import org.springframework.kafka.test.context.EmbeddedKafka; -import org.springframework.messaging.handler.annotation.Header; -import org.springframework.messaging.handler.annotation.Payload; -import org.springframework.test.web.servlet.MockMvc; -import org.springframework.test.web.servlet.MvcResult; -import org.springframework.util.LinkedMultiValueMap; -import org.springframework.util.MultiValueMap; - -import java.time.Duration; -import java.util.List; -import java.util.stream.Stream; - -import static de.juplo.kafka.wordcount.recorder.ApplicationTests.TOPIC_OUT; -import static org.assertj.core.api.Assertions.assertThat; -import static org.awaitility.Awaitility.await; -import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.asyncDispatch; -import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post; -import static org.springframework.test.web.servlet.result.MockMvcResultHandlers.print; -import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath; -import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; - - -@SpringBootTest( - properties = { - "spring.kafka.consumer.auto-offset-reset=earliest", - "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", - "spring.kafka.consumer.properties.spring.json.value.default.type=de.juplo.kafka.wordcount.splitter.TestRecording", - "spring.kafka.consumer.properties.spring.json.trusted.packages=de.juplo.kafka.wordcount.recorder", - "logging.level.root=WARN", - "logging.level.de.juplo=DEBUG", - "juplo.wordcount.recorder.bootstrap-server=${spring.embedded.kafka.brokers}", - "juplo.wordcount.recorder.topic=" + TOPIC_OUT }) -@AutoConfigureMockMvc -@EmbeddedKafka(topics = { TOPIC_OUT }) -@Slf4j -class ApplicationTests -{ - static final String TOPIC_OUT = "out"; - - @Autowired - MockMvc mockMvc; - @Autowired - Consumer consumer; - - - @Test - @DisplayName("Posted messages are accepted and sent to Kafka") - void userMessagesAreAcceptedAndSentToKafka() - { - MultiValueMap recordings = new LinkedMultiValueMap<>(); - - Stream - .of( - new TestRecording("päter", "Hall° Wält?¢*&%€!"), - new TestRecording("päter", "Hallo Welt!"), - new TestRecording("klühs", "Müsch gäb's auch!"), - new TestRecording("päter", "Boäh, echt! ß mal nä Nümmäh!")) - .forEach(recording -> - { - sendRedording(recording.getUser(), recording.getSentence()); - recordings.add(recording.getUser(), recording); - }); - - - await("Received expected messages") - .atMost(Duration.ofSeconds(5)) - .untilAsserted(() -> recordings.forEach((user, userRecordings) -> - assertThat(consumer.receivedFor(user)).containsExactlyElementsOf(userRecordings))); - } - - void sendRedording(String user, String sentence) - { - try - { - MvcResult result = mockMvc - .perform(post("/{user}", user) - .contentType(MediaType.TEXT_PLAIN) - .content(sentence)) - .andReturn(); - - mockMvc.perform(asyncDispatch(result)) - .andDo(print()) - .andExpect(status().isOk()) - .andExpect(jsonPath("$.username").value(user)) - .andExpect(jsonPath("$.sentence").value(sentence)); - } - catch (Exception e) - { - throw new RuntimeException(e); - } - } - - - static class Consumer - { - private final MultiValueMap received = new LinkedMultiValueMap<>(); - - @KafkaListener(groupId = "TEST", topics = TOPIC_OUT) - public synchronized void receive( - @Header(KafkaHeaders.RECEIVED_KEY) String user, - @Payload TestRecording recording) - { - log.debug("Received message: {}={}", user, recording); - received.add(user, recording); - } - - synchronized List receivedFor(String user) - { - List recordings = received.get(user); - return recordings == null - ? List.of() - : received.get(user).stream().toList(); - } - } - - @TestConfiguration - static class Configuration - { - @Bean - Consumer consumer() - { - return new Consumer(); - } - } -} diff --git a/src/test/java/de/juplo/kafka/wordcount/recorder/RecorderApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/recorder/RecorderApplicationIT.java new file mode 100644 index 0000000..3879d38 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/recorder/RecorderApplicationIT.java @@ -0,0 +1,139 @@ +package de.juplo.kafka.wordcount.recorder; + +import de.juplo.kafka.wordcount.splitter.TestRecording; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.http.MediaType; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.messaging.handler.annotation.Payload; +import org.springframework.test.web.servlet.MockMvc; +import org.springframework.test.web.servlet.MvcResult; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; + +import java.time.Duration; +import java.util.List; +import java.util.stream.Stream; + +import static de.juplo.kafka.wordcount.recorder.ApplicationTests.TOPIC_OUT; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; +import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.asyncDispatch; +import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post; +import static org.springframework.test.web.servlet.result.MockMvcResultHandlers.print; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; + + +@SpringBootTest( + properties = { + "spring.kafka.consumer.auto-offset-reset=earliest", + "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", + "spring.kafka.consumer.properties.spring.json.value.default.type=de.juplo.kafka.wordcount.splitter.TestRecording", + "spring.kafka.consumer.properties.spring.json.trusted.packages=de.juplo.kafka.wordcount.recorder", + "logging.level.root=WARN", + "logging.level.de.juplo=DEBUG", + "juplo.wordcount.recorder.bootstrap-server=${spring.embedded.kafka.brokers}", + "juplo.wordcount.recorder.topic=" + TOPIC_OUT }) +@AutoConfigureMockMvc +@EmbeddedKafka(topics = { TOPIC_OUT }) +@Slf4j +class ApplicationTests +{ + static final String TOPIC_OUT = "out"; + + @Autowired + MockMvc mockMvc; + @Autowired + Consumer consumer; + + + @Test + @DisplayName("Posted messages are accepted and sent to Kafka") + void userMessagesAreAcceptedAndSentToKafka() + { + MultiValueMap recordings = new LinkedMultiValueMap<>(); + + Stream + .of( + new TestRecording("päter", "Hall° Wält?¢*&%€!"), + new TestRecording("päter", "Hallo Welt!"), + new TestRecording("klühs", "Müsch gäb's auch!"), + new TestRecording("päter", "Boäh, echt! ß mal nä Nümmäh!")) + .forEach(recording -> + { + sendRedording(recording.getUser(), recording.getSentence()); + recordings.add(recording.getUser(), recording); + }); + + + await("Received expected messages") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> recordings.forEach((user, userRecordings) -> + assertThat(consumer.receivedFor(user)).containsExactlyElementsOf(userRecordings))); + } + + void sendRedording(String user, String sentence) + { + try + { + MvcResult result = mockMvc + .perform(post("/{user}", user) + .contentType(MediaType.TEXT_PLAIN) + .content(sentence)) + .andReturn(); + + mockMvc.perform(asyncDispatch(result)) + .andDo(print()) + .andExpect(status().isOk()) + .andExpect(jsonPath("$.username").value(user)) + .andExpect(jsonPath("$.sentence").value(sentence)); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + + + static class Consumer + { + private final MultiValueMap received = new LinkedMultiValueMap<>(); + + @KafkaListener(groupId = "TEST", topics = TOPIC_OUT) + public synchronized void receive( + @Header(KafkaHeaders.RECEIVED_KEY) String user, + @Payload TestRecording recording) + { + log.debug("Received message: {}={}", user, recording); + received.add(user, recording); + } + + synchronized List receivedFor(String user) + { + List recordings = received.get(user); + return recordings == null + ? List.of() + : received.get(user).stream().toList(); + } + } + + @TestConfiguration + static class Configuration + { + @Bean + Consumer consumer() + { + return new Consumer(); + } + } +}