From: Kai Moritz Date: Sun, 5 May 2024 11:26:28 +0000 (+0200) Subject: recorder: 1.1.2 - The `JsonSerializer` is used for serialization X-Git-Tag: recorder-1.1.2 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=ac67b34208c00885880c093a6a46312dc16e11bb;p=demos%2Fkafka%2Fwordcount recorder: 1.1.2 - The `JsonSerializer` is used for serialization * Configured the Spring Kafka `JsonSerializer` for serialization. * The `JsonSerializer` is configured, to add _no_ type-info to the headers. * Added an integration test to proof, that everything works as expected. --- diff --git a/pom.xml b/pom.xml index 4e87c6e..0377da1 100644 --- a/pom.xml +++ b/pom.xml @@ -10,7 +10,7 @@ de.juplo.kafka.wordcount recorder - 1.1.1 + 1.1.2 Wordcount-Recorder Recorder-service of the multi-user wordcount-example @@ -55,6 +55,16 @@ spring-boot-starter-test test + + org.springframework.kafka + spring-kafka-test + test + + + org.awaitility + awaitility + test + diff --git a/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderApplication.java b/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderApplication.java index 11248d8..3928f2f 100644 --- a/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderApplication.java +++ b/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderApplication.java @@ -7,6 +7,7 @@ import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; +import org.springframework.kafka.support.serializer.JsonSerializer; import org.springframework.util.Assert; import java.util.Properties; @@ -24,7 +25,8 @@ public class RecorderApplication Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); + props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false); return new KafkaProducer<>(props); } diff --git a/src/test/java/de/juplo/kafka/wordcount/recorder/ApplicationTests.java b/src/test/java/de/juplo/kafka/wordcount/recorder/ApplicationTests.java index 885a408..3879d38 100644 --- a/src/test/java/de/juplo/kafka/wordcount/recorder/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/wordcount/recorder/ApplicationTests.java @@ -1,13 +1,139 @@ package de.juplo.kafka.wordcount.recorder; +import de.juplo.kafka.wordcount.splitter.TestRecording; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc; import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.http.MediaType; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.messaging.handler.annotation.Payload; +import org.springframework.test.web.servlet.MockMvc; +import org.springframework.test.web.servlet.MvcResult; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; -@SpringBootTest +import java.time.Duration; +import java.util.List; +import java.util.stream.Stream; + +import static de.juplo.kafka.wordcount.recorder.ApplicationTests.TOPIC_OUT; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; +import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.asyncDispatch; +import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post; +import static org.springframework.test.web.servlet.result.MockMvcResultHandlers.print; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; + + +@SpringBootTest( + properties = { + "spring.kafka.consumer.auto-offset-reset=earliest", + "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", + "spring.kafka.consumer.properties.spring.json.value.default.type=de.juplo.kafka.wordcount.splitter.TestRecording", + "spring.kafka.consumer.properties.spring.json.trusted.packages=de.juplo.kafka.wordcount.recorder", + "logging.level.root=WARN", + "logging.level.de.juplo=DEBUG", + "juplo.wordcount.recorder.bootstrap-server=${spring.embedded.kafka.brokers}", + "juplo.wordcount.recorder.topic=" + TOPIC_OUT }) +@AutoConfigureMockMvc +@EmbeddedKafka(topics = { TOPIC_OUT }) +@Slf4j class ApplicationTests { + static final String TOPIC_OUT = "out"; + + @Autowired + MockMvc mockMvc; + @Autowired + Consumer consumer; + + @Test - void contextLoads() + @DisplayName("Posted messages are accepted and sent to Kafka") + void userMessagesAreAcceptedAndSentToKafka() + { + MultiValueMap recordings = new LinkedMultiValueMap<>(); + + Stream + .of( + new TestRecording("päter", "Hall° Wält?¢*&%€!"), + new TestRecording("päter", "Hallo Welt!"), + new TestRecording("klühs", "Müsch gäb's auch!"), + new TestRecording("päter", "Boäh, echt! ß mal nä Nümmäh!")) + .forEach(recording -> + { + sendRedording(recording.getUser(), recording.getSentence()); + recordings.add(recording.getUser(), recording); + }); + + + await("Received expected messages") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> recordings.forEach((user, userRecordings) -> + assertThat(consumer.receivedFor(user)).containsExactlyElementsOf(userRecordings))); + } + + void sendRedording(String user, String sentence) + { + try + { + MvcResult result = mockMvc + .perform(post("/{user}", user) + .contentType(MediaType.TEXT_PLAIN) + .content(sentence)) + .andReturn(); + + mockMvc.perform(asyncDispatch(result)) + .andDo(print()) + .andExpect(status().isOk()) + .andExpect(jsonPath("$.username").value(user)) + .andExpect(jsonPath("$.sentence").value(sentence)); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + + + static class Consumer + { + private final MultiValueMap received = new LinkedMultiValueMap<>(); + + @KafkaListener(groupId = "TEST", topics = TOPIC_OUT) + public synchronized void receive( + @Header(KafkaHeaders.RECEIVED_KEY) String user, + @Payload TestRecording recording) + { + log.debug("Received message: {}={}", user, recording); + received.add(user, recording); + } + + synchronized List receivedFor(String user) + { + List recordings = received.get(user); + return recordings == null + ? List.of() + : received.get(user).stream().toList(); + } + } + + @TestConfiguration + static class Configuration { + @Bean + Consumer consumer() + { + return new Consumer(); + } } } diff --git a/src/test/java/de/juplo/kafka/wordcount/splitter/TestRecording.java b/src/test/java/de/juplo/kafka/wordcount/splitter/TestRecording.java new file mode 100644 index 0000000..f4aa016 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/splitter/TestRecording.java @@ -0,0 +1,15 @@ +package de.juplo.kafka.wordcount.splitter; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class TestRecording +{ + String user; + String sentence; +}