From 338279a329a06be7a141a3930d80b2a2805719dc Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Wed, 1 Sep 2021 18:42:29 +0200 Subject: [PATCH] recorder:1.0.0 - send recordings keyed by username --- .dockerignore | 2 + Dockerfile | 5 ++ pom.xml | 24 +++++- .../recorder/RecorderApplication.java | 36 +++++++++ .../RecorderApplicationProperties.java | 18 +++++ .../recorder/RecorderController.java | 80 +++++++++++++++++++ .../wordcount/recorder/RecordingResult.java | 19 +++++ .../WordcountRecorderApplication.java | 13 --- ...cationTests.java => ApplicationTests.java} | 2 +- 9 files changed, 184 insertions(+), 15 deletions(-) create mode 100644 .dockerignore create mode 100644 Dockerfile create mode 100644 src/main/java/de/juplo/kafka/wordcount/recorder/RecorderApplication.java create mode 100644 src/main/java/de/juplo/kafka/wordcount/recorder/RecorderApplicationProperties.java create mode 100644 src/main/java/de/juplo/kafka/wordcount/recorder/RecorderController.java create mode 100644 src/main/java/de/juplo/kafka/wordcount/recorder/RecordingResult.java delete mode 100644 src/main/java/de/juplo/kafka/wordcount/recorder/WordcountRecorderApplication.java rename src/test/java/de/juplo/kafka/wordcount/recorder/{WordcountRecorderApplicationTests.java => ApplicationTests.java} (82%) diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..1ad9963 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,2 @@ +* +!target/*.jar diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..bbd15ef --- /dev/null +++ b/Dockerfile @@ -0,0 +1,5 @@ +FROM openjdk:11-jre-slim +COPY target/*.jar /opt/app.jar +EXPOSE 8080 +ENTRYPOINT ["java", "-jar", "/opt/app.jar"] +CMD [] diff --git a/pom.xml b/pom.xml index 6dfd615..98cc0dc 100644 --- a/pom.xml +++ b/pom.xml @@ -10,11 +10,13 @@ de.juplo.kafka.wordcount recorder - 0.0.1-SNAPSHOT + 1.0.0 Wordcount-Recorder Recorder-service of the multi-user wordcount-example + 0.33.0 11 + 2.8.0 @@ -25,6 +27,14 @@ org.springframework.boot spring-boot-starter-web + + org.apache.kafka + kafka-clients + + + org.hibernate.validator + hibernate-validator + org.springframework.boot @@ -63,6 +73,18 @@ + + io.fabric8 + docker-maven-plugin + ${docker-maven-plugin.version} + + + + juplo/wordcount--%a:%v + + + + diff --git a/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderApplication.java b/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderApplication.java new file mode 100644 index 0000000..abe0685 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderApplication.java @@ -0,0 +1,36 @@ +package de.juplo.kafka.wordcount.recorder; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.util.Assert; + +import java.util.Properties; + + +@SpringBootApplication +@EnableConfigurationProperties(RecorderApplicationProperties.class) +public class RecorderApplication +{ + @Bean(destroyMethod = "close") + KafkaProducer producer(RecorderApplicationProperties properties) + { + Assert.hasText(properties.getBootstrapServer(), "juplo.wordcount.recorder.bootstrap-server must be set"); + + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + + return new KafkaProducer<>(props); + } + + public static void main(String[] args) + { + SpringApplication.run(RecorderApplication.class, args); + } +} diff --git a/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderApplicationProperties.java b/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderApplicationProperties.java new file mode 100644 index 0000000..552ebaf --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderApplicationProperties.java @@ -0,0 +1,18 @@ +package de.juplo.kafka.wordcount.recorder; + + +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; +import org.springframework.boot.context.properties.ConfigurationProperties; + + +@ConfigurationProperties("juplo.wordcount.recorder") +@Getter +@Setter +@ToString +public class RecorderApplicationProperties +{ + private String bootstrapServer = "localhost:9092"; + private String topic = "recordings"; +} diff --git a/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderController.java b/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderController.java new file mode 100644 index 0000000..5fe69ad --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderController.java @@ -0,0 +1,80 @@ +package de.juplo.kafka.wordcount.recorder; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.util.MimeTypeUtils; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.context.request.async.DeferredResult; + +import javax.validation.constraints.NotEmpty; + + +@RestController +public class RecorderController +{ + private final String topic; + private final KafkaProducer producer; + + + public RecorderController(RecorderApplicationProperties properties, KafkaProducer producer) + { + this.topic = properties.getTopic(); + this.producer = producer; + } + + @PostMapping( + path = "/{username}", + consumes = { + MimeTypeUtils.TEXT_PLAIN_VALUE, + MimeTypeUtils.APPLICATION_JSON_VALUE + }, + produces = MimeTypeUtils.APPLICATION_JSON_VALUE) + DeferredResult> speak( + @PathVariable + @NotEmpty(message = "A username must be provided") + String username, + @RequestBody + @NotEmpty(message = "The spoken sentence must not be empty!") + String sentence) + { + DeferredResult> result = new DeferredResult<>(); + + ProducerRecord record = new ProducerRecord<>(topic, username, sentence); + producer.send(record, (metadata, exception) -> + { + if (metadata != null) + { + result.setResult( + ResponseEntity.ok(RecordingResult.of( + username, + sentence, + topic, + metadata.partition(), + metadata.offset(), + null, + null))); + } + else + { + result.setErrorResult( + ResponseEntity + .internalServerError() + .body(RecordingResult.of( + username, + sentence, + topic, + null, + null, + HttpStatus.INTERNAL_SERVER_ERROR.value(), + exception.toString()))); + } + }); + + return result; + } +} diff --git a/src/main/java/de/juplo/kafka/wordcount/recorder/RecordingResult.java b/src/main/java/de/juplo/kafka/wordcount/recorder/RecordingResult.java new file mode 100644 index 0000000..939b1d4 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/recorder/RecordingResult.java @@ -0,0 +1,19 @@ +package de.juplo.kafka.wordcount.recorder; + +import com.fasterxml.jackson.annotation.JsonInclude; +import lombok.Value; + +import static com.fasterxml.jackson.annotation.JsonInclude.Include.NON_NULL; + + +@Value(staticConstructor = "of") +public class RecordingResult +{ + @JsonInclude(NON_NULL) private final String username; + @JsonInclude(NON_NULL) private final String sentence; + @JsonInclude(NON_NULL) private final String topic; + @JsonInclude(NON_NULL) private final Integer partition; + @JsonInclude(NON_NULL) private final Long offset; + @JsonInclude(NON_NULL) private final Integer status; + @JsonInclude(NON_NULL) private final String error; +} diff --git a/src/main/java/de/juplo/kafka/wordcount/recorder/WordcountRecorderApplication.java b/src/main/java/de/juplo/kafka/wordcount/recorder/WordcountRecorderApplication.java deleted file mode 100644 index 8220bfd..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/recorder/WordcountRecorderApplication.java +++ /dev/null @@ -1,13 +0,0 @@ -package de.juplo.kafka.wordcount.recorder; - -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.SpringBootApplication; - -@SpringBootApplication -public class WordcountRecorderApplication -{ - public static void main(String[] args) - { - SpringApplication.run(WordcountRecorderApplication.class, args); - } -} diff --git a/src/test/java/de/juplo/kafka/wordcount/recorder/WordcountRecorderApplicationTests.java b/src/test/java/de/juplo/kafka/wordcount/recorder/ApplicationTests.java similarity index 82% rename from src/test/java/de/juplo/kafka/wordcount/recorder/WordcountRecorderApplicationTests.java rename to src/test/java/de/juplo/kafka/wordcount/recorder/ApplicationTests.java index 8b8aa50..885a408 100644 --- a/src/test/java/de/juplo/kafka/wordcount/recorder/WordcountRecorderApplicationTests.java +++ b/src/test/java/de/juplo/kafka/wordcount/recorder/ApplicationTests.java @@ -4,7 +4,7 @@ import org.junit.jupiter.api.Test; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest -class WordcountRecorderApplicationTests +class ApplicationTests { @Test void contextLoads() -- 2.20.1