X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Frecorder%2FApplicationTests.java;fp=src%2Ftest%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Frecorder%2FApplicationTests.java;h=0000000000000000000000000000000000000000;hb=928536af97a88dc121e1f5438b8fb671ee861960;hp=3879d38efd1789c205559ffe5dcf71ef829dd474;hpb=ac67b34208c00885880c093a6a46312dc16e11bb;p=demos%2Fkafka%2Fwordcount diff --git a/src/test/java/de/juplo/kafka/wordcount/recorder/ApplicationTests.java b/src/test/java/de/juplo/kafka/wordcount/recorder/ApplicationTests.java deleted file mode 100644 index 3879d38..0000000 --- a/src/test/java/de/juplo/kafka/wordcount/recorder/ApplicationTests.java +++ /dev/null @@ -1,139 +0,0 @@ -package de.juplo.kafka.wordcount.recorder; - -import de.juplo.kafka.wordcount.splitter.TestRecording; -import lombok.extern.slf4j.Slf4j; -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.boot.test.context.TestConfiguration; -import org.springframework.context.annotation.Bean; -import org.springframework.http.MediaType; -import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.kafka.support.KafkaHeaders; -import org.springframework.kafka.test.context.EmbeddedKafka; -import org.springframework.messaging.handler.annotation.Header; -import org.springframework.messaging.handler.annotation.Payload; -import org.springframework.test.web.servlet.MockMvc; -import org.springframework.test.web.servlet.MvcResult; -import org.springframework.util.LinkedMultiValueMap; -import org.springframework.util.MultiValueMap; - -import java.time.Duration; -import java.util.List; -import java.util.stream.Stream; - -import static de.juplo.kafka.wordcount.recorder.ApplicationTests.TOPIC_OUT; -import static org.assertj.core.api.Assertions.assertThat; -import static org.awaitility.Awaitility.await; -import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.asyncDispatch; -import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post; -import static org.springframework.test.web.servlet.result.MockMvcResultHandlers.print; -import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath; -import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; - - -@SpringBootTest( - properties = { - "spring.kafka.consumer.auto-offset-reset=earliest", - "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", - "spring.kafka.consumer.properties.spring.json.value.default.type=de.juplo.kafka.wordcount.splitter.TestRecording", - "spring.kafka.consumer.properties.spring.json.trusted.packages=de.juplo.kafka.wordcount.recorder", - "logging.level.root=WARN", - "logging.level.de.juplo=DEBUG", - "juplo.wordcount.recorder.bootstrap-server=${spring.embedded.kafka.brokers}", - "juplo.wordcount.recorder.topic=" + TOPIC_OUT }) -@AutoConfigureMockMvc -@EmbeddedKafka(topics = { TOPIC_OUT }) -@Slf4j -class ApplicationTests -{ - static final String TOPIC_OUT = "out"; - - @Autowired - MockMvc mockMvc; - @Autowired - Consumer consumer; - - - @Test - @DisplayName("Posted messages are accepted and sent to Kafka") - void userMessagesAreAcceptedAndSentToKafka() - { - MultiValueMap recordings = new LinkedMultiValueMap<>(); - - Stream - .of( - new TestRecording("päter", "Hall° Wält?¢*&%€!"), - new TestRecording("päter", "Hallo Welt!"), - new TestRecording("klühs", "Müsch gäb's auch!"), - new TestRecording("päter", "Boäh, echt! ß mal nä Nümmäh!")) - .forEach(recording -> - { - sendRedording(recording.getUser(), recording.getSentence()); - recordings.add(recording.getUser(), recording); - }); - - - await("Received expected messages") - .atMost(Duration.ofSeconds(5)) - .untilAsserted(() -> recordings.forEach((user, userRecordings) -> - assertThat(consumer.receivedFor(user)).containsExactlyElementsOf(userRecordings))); - } - - void sendRedording(String user, String sentence) - { - try - { - MvcResult result = mockMvc - .perform(post("/{user}", user) - .contentType(MediaType.TEXT_PLAIN) - .content(sentence)) - .andReturn(); - - mockMvc.perform(asyncDispatch(result)) - .andDo(print()) - .andExpect(status().isOk()) - .andExpect(jsonPath("$.username").value(user)) - .andExpect(jsonPath("$.sentence").value(sentence)); - } - catch (Exception e) - { - throw new RuntimeException(e); - } - } - - - static class Consumer - { - private final MultiValueMap received = new LinkedMultiValueMap<>(); - - @KafkaListener(groupId = "TEST", topics = TOPIC_OUT) - public synchronized void receive( - @Header(KafkaHeaders.RECEIVED_KEY) String user, - @Payload TestRecording recording) - { - log.debug("Received message: {}={}", user, recording); - received.add(user, recording); - } - - synchronized List receivedFor(String user) - { - List recordings = received.get(user); - return recordings == null - ? List.of() - : received.get(user).stream().toList(); - } - } - - @TestConfiguration - static class Configuration - { - @Bean - Consumer consumer() - { - return new Consumer(); - } - } -}