From: Kai Moritz Date: Wed, 5 Jun 2024 21:02:25 +0000 (+0200) Subject: recorder: 1.2.0 - Fixed a bug (error-detection) in `RecorderController` X-Git-Tag: recorder-1.2.0 X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;ds=sidebyside;h=refs%2Fheads%2Frecorder;hp=b185f168c230e65878ed4c99fc69229f356c41da;p=demos%2Fkafka%2Fwordcount recorder: 1.2.0 - Fixed a bug (error-detection) in `RecorderController` --- diff --git a/Dockerfile b/Dockerfile index 9c0b843..cb9ad4e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM openjdk:11-jre-slim +FROM eclipse-temurin:17-jre COPY target/*.jar /opt/app.jar EXPOSE 8081 ENTRYPOINT ["java", "-jar", "/opt/app.jar"] diff --git a/pom.xml b/pom.xml index 44a6cd1..b11275e 100644 --- a/pom.xml +++ b/pom.xml @@ -5,12 +5,12 @@ org.springframework.boot spring-boot-starter-parent - 3.0.2 + 3.2.5 de.juplo.kafka.wordcount recorder - 1.0.2 + 1.2.0 Wordcount-Recorder Recorder-service of the multi-user wordcount-example @@ -26,8 +26,8 @@ spring-boot-starter-web - org.apache.kafka - kafka-clients + org.springframework.kafka + spring-kafka org.hibernate.validator @@ -55,10 +55,23 @@ spring-boot-starter-test test + + org.springframework.kafka + spring-kafka-test + test + + + org.awaitility + awaitility + test + + + maven-failsafe-plugin + org.springframework.boot spring-boot-maven-plugin diff --git a/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderApplication.java b/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderApplication.java index abe0685..699c671 100644 --- a/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderApplication.java +++ b/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderApplication.java @@ -2,11 +2,11 @@ package de.juplo.kafka.wordcount.recorder; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; +import org.springframework.kafka.support.serializer.JsonSerializer; import org.springframework.util.Assert; import java.util.Properties; @@ -17,14 +17,15 @@ import java.util.Properties; public class RecorderApplication { @Bean(destroyMethod = "close") - KafkaProducer producer(RecorderApplicationProperties properties) + KafkaProducer producer(RecorderApplicationProperties properties) { Assert.hasText(properties.getBootstrapServer(), "juplo.wordcount.recorder.bootstrap-server must be set"); Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); + props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false); return new KafkaProducer<>(props); } diff --git a/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderController.java b/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderController.java index f7e32e2..e76d24c 100644 --- a/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderController.java +++ b/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderController.java @@ -18,10 +18,12 @@ import jakarta.validation.constraints.NotEmpty; public class RecorderController { private final String topic; - private final KafkaProducer producer; + private final KafkaProducer producer; - public RecorderController(RecorderApplicationProperties properties, KafkaProducer producer) + public RecorderController( + RecorderApplicationProperties properties, + KafkaProducer producer) { this.topic = properties.getTopic(); this.producer = producer; @@ -44,10 +46,14 @@ public class RecorderController { DeferredResult> result = new DeferredResult<>(); - ProducerRecord record = new ProducerRecord<>(topic, username, sentence); + ProducerRecord record = new ProducerRecord<>( + topic, + User.of(username), + Recording.of(username, sentence)); + producer.send(record, (metadata, exception) -> { - if (metadata != null) + if (exception == null) { result.setResult( ResponseEntity.ok(RecordingResult.of( diff --git a/src/main/java/de/juplo/kafka/wordcount/recorder/Recording.java b/src/main/java/de/juplo/kafka/wordcount/recorder/Recording.java new file mode 100644 index 0000000..6117438 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/recorder/Recording.java @@ -0,0 +1,11 @@ +package de.juplo.kafka.wordcount.recorder; + +import lombok.Value; + + +@Value(staticConstructor = "of") +public class Recording +{ + String user; + String sentence; +} diff --git a/src/main/java/de/juplo/kafka/wordcount/recorder/User.java b/src/main/java/de/juplo/kafka/wordcount/recorder/User.java new file mode 100644 index 0000000..1245cfe --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/recorder/User.java @@ -0,0 +1,10 @@ +package de.juplo.kafka.wordcount.recorder; + +import lombok.Value; + + +@Value(staticConstructor = "of") +public class User +{ + String user; +} diff --git a/src/test/java/de/juplo/kafka/wordcount/recorder/ApplicationTests.java b/src/test/java/de/juplo/kafka/wordcount/recorder/ApplicationTests.java deleted file mode 100644 index 885a408..0000000 --- a/src/test/java/de/juplo/kafka/wordcount/recorder/ApplicationTests.java +++ /dev/null @@ -1,13 +0,0 @@ -package de.juplo.kafka.wordcount.recorder; - -import org.junit.jupiter.api.Test; -import org.springframework.boot.test.context.SpringBootTest; - -@SpringBootTest -class ApplicationTests -{ - @Test - void contextLoads() - { - } -} diff --git a/src/test/java/de/juplo/kafka/wordcount/recorder/RecorderApplicationIT.java b/src/test/java/de/juplo/kafka/wordcount/recorder/RecorderApplicationIT.java new file mode 100644 index 0000000..9a1d516 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/recorder/RecorderApplicationIT.java @@ -0,0 +1,136 @@ +package de.juplo.kafka.wordcount.recorder; + +import de.juplo.kafka.wordcount.splitter.TestRecording; +import de.juplo.kafka.wordcount.splitter.TestUser; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.autoconfigure.web.servlet.AutoConfigureMockMvc; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.TestConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.http.MediaType; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.support.KafkaHeaders; +import org.springframework.kafka.test.context.EmbeddedKafka; +import org.springframework.messaging.handler.annotation.Header; +import org.springframework.messaging.handler.annotation.Payload; +import org.springframework.test.web.servlet.MockMvc; +import org.springframework.test.web.servlet.MvcResult; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; + +import java.time.Duration; +import java.util.List; + +import static de.juplo.kafka.wordcount.recorder.RecorderApplicationIT.TOPIC_OUT; +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; +import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.asyncDispatch; +import static org.springframework.test.web.servlet.request.MockMvcRequestBuilders.post; +import static org.springframework.test.web.servlet.result.MockMvcResultHandlers.print; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.jsonPath; +import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; + + +@SpringBootTest( + properties = { + "spring.kafka.consumer.auto-offset-reset=earliest", + "spring.kafka.consumer.key-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", + "spring.kafka.consumer.value-deserializer=org.springframework.kafka.support.serializer.JsonDeserializer", + "spring.kafka.consumer.properties.spring.json.key.default.type=de.juplo.kafka.wordcount.splitter.TestUser", + "spring.kafka.consumer.properties.spring.json.value.default.type=de.juplo.kafka.wordcount.splitter.TestRecording", + "logging.level.root=WARN", + "logging.level.de.juplo=DEBUG", + "juplo.wordcount.recorder.bootstrap-server=${spring.embedded.kafka.brokers}", + "juplo.wordcount.recorder.topic=" + TOPIC_OUT }) +@AutoConfigureMockMvc +@EmbeddedKafka(topics = { TOPIC_OUT }) +@Slf4j +class RecorderApplicationIT +{ + static final String TOPIC_OUT = "out"; + + @Autowired + MockMvc mockMvc; + @Autowired + Consumer consumer; + + + @Test + @DisplayName("Posted messages are accepted and sent to Kafka") + void userMessagesAreAcceptedAndSentToKafka() + { + MultiValueMap recordings = new LinkedMultiValueMap<>(); + + TestData + .getInputMessages() + .forEach(kv -> + { + sendRedording(kv.key, kv.value); + recordings.add(TestUser.of(kv.key), TestRecording.of(kv.key, kv.value)); + }); + + + await("Received expected messages") + .atMost(Duration.ofSeconds(5)) + .untilAsserted(() -> recordings.forEach((user, userRecordings) -> + assertThat(consumer.receivedFor(user)).containsExactlyElementsOf(userRecordings))); + } + + void sendRedording(String user, String sentence) + { + try + { + MvcResult result = mockMvc + .perform(post("/{user}", user) + .contentType(MediaType.TEXT_PLAIN) + .content(sentence)) + .andReturn(); + + mockMvc.perform(asyncDispatch(result)) + .andDo(print()) + .andExpect(status().isOk()) + .andExpect(jsonPath("$.username").value(user)) + .andExpect(jsonPath("$.sentence").value(sentence)); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + + + static class Consumer + { + private final MultiValueMap received = new LinkedMultiValueMap<>(); + + @KafkaListener(groupId = "TEST", topics = TOPIC_OUT) + public synchronized void receive( + @Header(KafkaHeaders.RECEIVED_KEY) TestUser user, + @Payload TestRecording recording) + { + log.debug("Received message: {}={}", user, recording); + received.add(user, recording); + } + + synchronized List receivedFor(TestUser user) + { + List recordings = received.get(user); + return recordings == null + ? List.of() + : received.get(user).stream().toList(); + } + } + + @TestConfiguration + static class Configuration + { + @Bean + Consumer consumer() + { + return new Consumer(); + } + } +} diff --git a/src/test/java/de/juplo/kafka/wordcount/recorder/TestData.java b/src/test/java/de/juplo/kafka/wordcount/recorder/TestData.java new file mode 100644 index 0000000..2f98595 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/recorder/TestData.java @@ -0,0 +1,25 @@ +package de.juplo.kafka.wordcount.recorder; + +import org.apache.kafka.streams.KeyValue; + +import java.util.stream.Stream; + + +class TestData +{ + static final String PETER = "päter"; + static final String KLAUS = "klühs"; + + static final Stream> getInputMessages() + { + return Stream.of(INPUT_MESSAGES); + } + + private static final KeyValue[] INPUT_MESSAGES = new KeyValue[] + { + new KeyValue<>("päter", "Hall° Wält?¢*&%€!"), + new KeyValue<>("päter", "Hallo Welt!"), + new KeyValue<>("klühs", "Müsch gäb's auch!"), + new KeyValue<>("päter", "Boäh, echt! ß mal nä Nümmäh!"), + }; +} diff --git a/src/test/java/de/juplo/kafka/wordcount/splitter/TestRecording.java b/src/test/java/de/juplo/kafka/wordcount/splitter/TestRecording.java new file mode 100644 index 0000000..06018c9 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/splitter/TestRecording.java @@ -0,0 +1,15 @@ +package de.juplo.kafka.wordcount.splitter; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + + +@Data +@NoArgsConstructor +@AllArgsConstructor(staticName = "of") +public class TestRecording +{ + String user; + String sentence; +} diff --git a/src/test/java/de/juplo/kafka/wordcount/splitter/TestUser.java b/src/test/java/de/juplo/kafka/wordcount/splitter/TestUser.java new file mode 100644 index 0000000..60c0708 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/splitter/TestUser.java @@ -0,0 +1,14 @@ +package de.juplo.kafka.wordcount.splitter; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + + +@Data +@NoArgsConstructor +@AllArgsConstructor(staticName = "of") +public class TestUser +{ + String user; +}