From fc7b9dfe12a1401e7365d85ec723364750bdcd3d Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Wed, 1 Sep 2021 20:33:32 +0200 Subject: [PATCH] users:1.0.0 - users are directly send to kafka --- pom.xml | 6 +-- .../de/juplo/kafka/wordcount/users/User.java | 25 +++++++++++ .../UsersApplication.java} | 10 ++--- .../UsersApplicationProperties.java} | 8 ++-- .../UsersController.java} | 43 +++++++++---------- .../UsersResult.java} | 6 +-- .../{recorder => users}/ApplicationTests.java | 2 +- 7 files changed, 62 insertions(+), 38 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/wordcount/users/User.java rename src/main/java/de/juplo/kafka/wordcount/{recorder/RecorderApplication.java => users/UsersApplication.java} (78%) rename src/main/java/de/juplo/kafka/wordcount/{recorder/RecorderApplicationProperties.java => users/UsersApplicationProperties.java} (56%) rename src/main/java/de/juplo/kafka/wordcount/{recorder/RecorderController.java => users/UsersController.java} (63%) rename src/main/java/de/juplo/kafka/wordcount/{recorder/RecordingResult.java => users/UsersResult.java} (80%) rename src/test/java/de/juplo/kafka/wordcount/{recorder => users}/ApplicationTests.java (80%) diff --git a/pom.xml b/pom.xml index 98cc0dc..5790169 100644 --- a/pom.xml +++ b/pom.xml @@ -9,10 +9,10 @@ de.juplo.kafka.wordcount - recorder + users 1.0.0 - Wordcount-Recorder - Recorder-service of the multi-user wordcount-example + Wordcount-Users + Users-service of the multi-user wordcount-example 0.33.0 11 diff --git a/src/main/java/de/juplo/kafka/wordcount/users/User.java b/src/main/java/de/juplo/kafka/wordcount/users/User.java new file mode 100644 index 0000000..04c9e0c --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/users/User.java @@ -0,0 +1,25 @@ +package de.juplo.kafka.wordcount.users; + +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; + +import javax.validation.constraints.NotEmpty; + + +@Getter +@Setter +@ToString +@EqualsAndHashCode(of = "username") +public class User +{ + public enum Sex { FEMALE, MALE, OTHER } + + @NotEmpty + private String username; + + private String firstName; + private String lastName; + private Sex sex; +} diff --git a/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderApplication.java b/src/main/java/de/juplo/kafka/wordcount/users/UsersApplication.java similarity index 78% rename from src/main/java/de/juplo/kafka/wordcount/recorder/RecorderApplication.java rename to src/main/java/de/juplo/kafka/wordcount/users/UsersApplication.java index abe0685..9ae44e1 100644 --- a/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderApplication.java +++ b/src/main/java/de/juplo/kafka/wordcount/users/UsersApplication.java @@ -1,4 +1,4 @@ -package de.juplo.kafka.wordcount.recorder; +package de.juplo.kafka.wordcount.users; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; @@ -13,11 +13,11 @@ import java.util.Properties; @SpringBootApplication -@EnableConfigurationProperties(RecorderApplicationProperties.class) -public class RecorderApplication +@EnableConfigurationProperties(UsersApplicationProperties.class) +public class UsersApplication { @Bean(destroyMethod = "close") - KafkaProducer producer(RecorderApplicationProperties properties) + KafkaProducer producer(UsersApplicationProperties properties) { Assert.hasText(properties.getBootstrapServer(), "juplo.wordcount.recorder.bootstrap-server must be set"); @@ -31,6 +31,6 @@ public class RecorderApplication public static void main(String[] args) { - SpringApplication.run(RecorderApplication.class, args); + SpringApplication.run(UsersApplication.class, args); } } diff --git a/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderApplicationProperties.java b/src/main/java/de/juplo/kafka/wordcount/users/UsersApplicationProperties.java similarity index 56% rename from src/main/java/de/juplo/kafka/wordcount/recorder/RecorderApplicationProperties.java rename to src/main/java/de/juplo/kafka/wordcount/users/UsersApplicationProperties.java index 552ebaf..8218f99 100644 --- a/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderApplicationProperties.java +++ b/src/main/java/de/juplo/kafka/wordcount/users/UsersApplicationProperties.java @@ -1,4 +1,4 @@ -package de.juplo.kafka.wordcount.recorder; +package de.juplo.kafka.wordcount.users; import lombok.Getter; @@ -7,12 +7,12 @@ import lombok.ToString; import org.springframework.boot.context.properties.ConfigurationProperties; -@ConfigurationProperties("juplo.wordcount.recorder") +@ConfigurationProperties("juplo.wordcount.users") @Getter @Setter @ToString -public class RecorderApplicationProperties +public class UsersApplicationProperties { private String bootstrapServer = "localhost:9092"; - private String topic = "recordings"; + private String topic = "users"; } diff --git a/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderController.java b/src/main/java/de/juplo/kafka/wordcount/users/UsersController.java similarity index 63% rename from src/main/java/de/juplo/kafka/wordcount/recorder/RecorderController.java rename to src/main/java/de/juplo/kafka/wordcount/users/UsersController.java index 5fe69ad..4e64524 100644 --- a/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderController.java +++ b/src/main/java/de/juplo/kafka/wordcount/users/UsersController.java @@ -1,58 +1,57 @@ -package de.juplo.kafka.wordcount.recorder; +package de.juplo.kafka.wordcount.users; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.util.MimeTypeUtils; -import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.context.request.async.DeferredResult; -import javax.validation.constraints.NotEmpty; - @RestController -public class RecorderController +public class UsersController { private final String topic; + private final ObjectMapper mapper; private final KafkaProducer producer; - public RecorderController(RecorderApplicationProperties properties, KafkaProducer producer) + public UsersController( + UsersApplicationProperties properties, + ObjectMapper mapper, + KafkaProducer producer) { this.topic = properties.getTopic(); + this.mapper = mapper; this.producer = producer; } @PostMapping( - path = "/{username}", + path = "/users", consumes = { MimeTypeUtils.TEXT_PLAIN_VALUE, MimeTypeUtils.APPLICATION_JSON_VALUE }, produces = MimeTypeUtils.APPLICATION_JSON_VALUE) - DeferredResult> speak( - @PathVariable - @NotEmpty(message = "A username must be provided") - String username, - @RequestBody - @NotEmpty(message = "The spoken sentence must not be empty!") - String sentence) + DeferredResult> post(@RequestBody User user) throws JsonProcessingException { - DeferredResult> result = new DeferredResult<>(); + DeferredResult> result = new DeferredResult<>(); - ProducerRecord record = new ProducerRecord<>(topic, username, sentence); + String value = mapper.writeValueAsString(user); + ProducerRecord record = new ProducerRecord<>(topic, user.getUsername(), value); producer.send(record, (metadata, exception) -> { if (metadata != null) { result.setResult( - ResponseEntity.ok(RecordingResult.of( - username, - sentence, + ResponseEntity.ok(UsersResult.of( + user.getUsername(), + user, topic, metadata.partition(), metadata.offset(), @@ -64,9 +63,9 @@ public class RecorderController result.setErrorResult( ResponseEntity .internalServerError() - .body(RecordingResult.of( - username, - sentence, + .body(UsersResult.of( + user.getUsername(), + user, topic, null, null, diff --git a/src/main/java/de/juplo/kafka/wordcount/recorder/RecordingResult.java b/src/main/java/de/juplo/kafka/wordcount/users/UsersResult.java similarity index 80% rename from src/main/java/de/juplo/kafka/wordcount/recorder/RecordingResult.java rename to src/main/java/de/juplo/kafka/wordcount/users/UsersResult.java index 939b1d4..d712039 100644 --- a/src/main/java/de/juplo/kafka/wordcount/recorder/RecordingResult.java +++ b/src/main/java/de/juplo/kafka/wordcount/users/UsersResult.java @@ -1,4 +1,4 @@ -package de.juplo.kafka.wordcount.recorder; +package de.juplo.kafka.wordcount.users; import com.fasterxml.jackson.annotation.JsonInclude; import lombok.Value; @@ -7,10 +7,10 @@ import static com.fasterxml.jackson.annotation.JsonInclude.Include.NON_NULL; @Value(staticConstructor = "of") -public class RecordingResult +public class UsersResult { @JsonInclude(NON_NULL) private final String username; - @JsonInclude(NON_NULL) private final String sentence; + @JsonInclude(NON_NULL) private final User user; @JsonInclude(NON_NULL) private final String topic; @JsonInclude(NON_NULL) private final Integer partition; @JsonInclude(NON_NULL) private final Long offset; diff --git a/src/test/java/de/juplo/kafka/wordcount/recorder/ApplicationTests.java b/src/test/java/de/juplo/kafka/wordcount/users/ApplicationTests.java similarity index 80% rename from src/test/java/de/juplo/kafka/wordcount/recorder/ApplicationTests.java rename to src/test/java/de/juplo/kafka/wordcount/users/ApplicationTests.java index 885a408..9a92bf1 100644 --- a/src/test/java/de/juplo/kafka/wordcount/recorder/ApplicationTests.java +++ b/src/test/java/de/juplo/kafka/wordcount/users/ApplicationTests.java @@ -1,4 +1,4 @@ -package de.juplo.kafka.wordcount.recorder; +package de.juplo.kafka.wordcount.users; import org.junit.jupiter.api.Test; import org.springframework.boot.test.context.SpringBootTest; -- 2.20.1