From: Kai Moritz Date: Wed, 1 Sep 2021 18:33:32 +0000 (+0200) Subject: users:1.0.0 - users are directly send to kafka X-Git-Tag: users-1.0.0 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=fc7b9dfe12a1401e7365d85ec723364750bdcd3d;p=demos%2Fkafka%2Fwordcount users:1.0.0 - users are directly send to kafka --- diff --git a/pom.xml b/pom.xml index 98cc0dc..5790169 100644 --- a/pom.xml +++ b/pom.xml @@ -9,10 +9,10 @@ de.juplo.kafka.wordcount - recorder + users 1.0.0 - Wordcount-Recorder - Recorder-service of the multi-user wordcount-example + Wordcount-Users + Users-service of the multi-user wordcount-example 0.33.0 11 diff --git a/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderApplication.java b/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderApplication.java deleted file mode 100644 index abe0685..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderApplication.java +++ /dev/null @@ -1,36 +0,0 @@ -package de.juplo.kafka.wordcount.recorder; - -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.serialization.StringSerializer; -import org.springframework.boot.SpringApplication; -import org.springframework.boot.autoconfigure.SpringBootApplication; -import org.springframework.boot.context.properties.EnableConfigurationProperties; -import org.springframework.context.annotation.Bean; -import org.springframework.util.Assert; - -import java.util.Properties; - - -@SpringBootApplication -@EnableConfigurationProperties(RecorderApplicationProperties.class) -public class RecorderApplication -{ - @Bean(destroyMethod = "close") - KafkaProducer producer(RecorderApplicationProperties properties) - { - Assert.hasText(properties.getBootstrapServer(), "juplo.wordcount.recorder.bootstrap-server must be set"); - - Properties props = new Properties(); - props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - - return new KafkaProducer<>(props); - } - - public static void main(String[] args) - { - SpringApplication.run(RecorderApplication.class, args); - } -} diff --git a/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderApplicationProperties.java b/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderApplicationProperties.java deleted file mode 100644 index 552ebaf..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderApplicationProperties.java +++ /dev/null @@ -1,18 +0,0 @@ -package de.juplo.kafka.wordcount.recorder; - - -import lombok.Getter; -import lombok.Setter; -import lombok.ToString; -import org.springframework.boot.context.properties.ConfigurationProperties; - - -@ConfigurationProperties("juplo.wordcount.recorder") -@Getter -@Setter -@ToString -public class RecorderApplicationProperties -{ - private String bootstrapServer = "localhost:9092"; - private String topic = "recordings"; -} diff --git a/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderController.java b/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderController.java deleted file mode 100644 index 5fe69ad..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderController.java +++ /dev/null @@ -1,80 +0,0 @@ -package de.juplo.kafka.wordcount.recorder; - -import org.apache.kafka.clients.producer.KafkaProducer; -import org.apache.kafka.clients.producer.ProducerRecord; -import org.springframework.http.HttpStatus; -import org.springframework.http.ResponseEntity; -import org.springframework.util.MimeTypeUtils; -import org.springframework.web.bind.annotation.PathVariable; -import org.springframework.web.bind.annotation.PostMapping; -import org.springframework.web.bind.annotation.RequestBody; -import org.springframework.web.bind.annotation.RestController; -import org.springframework.web.context.request.async.DeferredResult; - -import javax.validation.constraints.NotEmpty; - - -@RestController -public class RecorderController -{ - private final String topic; - private final KafkaProducer producer; - - - public RecorderController(RecorderApplicationProperties properties, KafkaProducer producer) - { - this.topic = properties.getTopic(); - this.producer = producer; - } - - @PostMapping( - path = "/{username}", - consumes = { - MimeTypeUtils.TEXT_PLAIN_VALUE, - MimeTypeUtils.APPLICATION_JSON_VALUE - }, - produces = MimeTypeUtils.APPLICATION_JSON_VALUE) - DeferredResult> speak( - @PathVariable - @NotEmpty(message = "A username must be provided") - String username, - @RequestBody - @NotEmpty(message = "The spoken sentence must not be empty!") - String sentence) - { - DeferredResult> result = new DeferredResult<>(); - - ProducerRecord record = new ProducerRecord<>(topic, username, sentence); - producer.send(record, (metadata, exception) -> - { - if (metadata != null) - { - result.setResult( - ResponseEntity.ok(RecordingResult.of( - username, - sentence, - topic, - metadata.partition(), - metadata.offset(), - null, - null))); - } - else - { - result.setErrorResult( - ResponseEntity - .internalServerError() - .body(RecordingResult.of( - username, - sentence, - topic, - null, - null, - HttpStatus.INTERNAL_SERVER_ERROR.value(), - exception.toString()))); - } - }); - - return result; - } -} diff --git a/src/main/java/de/juplo/kafka/wordcount/recorder/RecordingResult.java b/src/main/java/de/juplo/kafka/wordcount/recorder/RecordingResult.java deleted file mode 100644 index 939b1d4..0000000 --- a/src/main/java/de/juplo/kafka/wordcount/recorder/RecordingResult.java +++ /dev/null @@ -1,19 +0,0 @@ -package de.juplo.kafka.wordcount.recorder; - -import com.fasterxml.jackson.annotation.JsonInclude; -import lombok.Value; - -import static com.fasterxml.jackson.annotation.JsonInclude.Include.NON_NULL; - - -@Value(staticConstructor = "of") -public class RecordingResult -{ - @JsonInclude(NON_NULL) private final String username; - @JsonInclude(NON_NULL) private final String sentence; - @JsonInclude(NON_NULL) private final String topic; - @JsonInclude(NON_NULL) private final Integer partition; - @JsonInclude(NON_NULL) private final Long offset; - @JsonInclude(NON_NULL) private final Integer status; - @JsonInclude(NON_NULL) private final String error; -} diff --git a/src/main/java/de/juplo/kafka/wordcount/users/User.java b/src/main/java/de/juplo/kafka/wordcount/users/User.java new file mode 100644 index 0000000..04c9e0c --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/users/User.java @@ -0,0 +1,25 @@ +package de.juplo.kafka.wordcount.users; + +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; + +import javax.validation.constraints.NotEmpty; + + +@Getter +@Setter +@ToString +@EqualsAndHashCode(of = "username") +public class User +{ + public enum Sex { FEMALE, MALE, OTHER } + + @NotEmpty + private String username; + + private String firstName; + private String lastName; + private Sex sex; +} diff --git a/src/main/java/de/juplo/kafka/wordcount/users/UsersApplication.java b/src/main/java/de/juplo/kafka/wordcount/users/UsersApplication.java new file mode 100644 index 0000000..9ae44e1 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/users/UsersApplication.java @@ -0,0 +1,36 @@ +package de.juplo.kafka.wordcount.users; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringSerializer; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.util.Assert; + +import java.util.Properties; + + +@SpringBootApplication +@EnableConfigurationProperties(UsersApplicationProperties.class) +public class UsersApplication +{ + @Bean(destroyMethod = "close") + KafkaProducer producer(UsersApplicationProperties properties) + { + Assert.hasText(properties.getBootstrapServer(), "juplo.wordcount.recorder.bootstrap-server must be set"); + + Properties props = new Properties(); + props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer()); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + + return new KafkaProducer<>(props); + } + + public static void main(String[] args) + { + SpringApplication.run(UsersApplication.class, args); + } +} diff --git a/src/main/java/de/juplo/kafka/wordcount/users/UsersApplicationProperties.java b/src/main/java/de/juplo/kafka/wordcount/users/UsersApplicationProperties.java new file mode 100644 index 0000000..8218f99 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/users/UsersApplicationProperties.java @@ -0,0 +1,18 @@ +package de.juplo.kafka.wordcount.users; + + +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; +import org.springframework.boot.context.properties.ConfigurationProperties; + + +@ConfigurationProperties("juplo.wordcount.users") +@Getter +@Setter +@ToString +public class UsersApplicationProperties +{ + private String bootstrapServer = "localhost:9092"; + private String topic = "users"; +} diff --git a/src/main/java/de/juplo/kafka/wordcount/users/UsersController.java b/src/main/java/de/juplo/kafka/wordcount/users/UsersController.java new file mode 100644 index 0000000..4e64524 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/users/UsersController.java @@ -0,0 +1,79 @@ +package de.juplo.kafka.wordcount.users; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.util.MimeTypeUtils; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.context.request.async.DeferredResult; + + +@RestController +public class UsersController +{ + private final String topic; + private final ObjectMapper mapper; + private final KafkaProducer producer; + + + public UsersController( + UsersApplicationProperties properties, + ObjectMapper mapper, + KafkaProducer producer) + { + this.topic = properties.getTopic(); + this.mapper = mapper; + this.producer = producer; + } + + @PostMapping( + path = "/users", + consumes = { + MimeTypeUtils.TEXT_PLAIN_VALUE, + MimeTypeUtils.APPLICATION_JSON_VALUE + }, + produces = MimeTypeUtils.APPLICATION_JSON_VALUE) + DeferredResult> post(@RequestBody User user) throws JsonProcessingException + { + DeferredResult> result = new DeferredResult<>(); + + String value = mapper.writeValueAsString(user); + ProducerRecord record = new ProducerRecord<>(topic, user.getUsername(), value); + producer.send(record, (metadata, exception) -> + { + if (metadata != null) + { + result.setResult( + ResponseEntity.ok(UsersResult.of( + user.getUsername(), + user, + topic, + metadata.partition(), + metadata.offset(), + null, + null))); + } + else + { + result.setErrorResult( + ResponseEntity + .internalServerError() + .body(UsersResult.of( + user.getUsername(), + user, + topic, + null, + null, + HttpStatus.INTERNAL_SERVER_ERROR.value(), + exception.toString()))); + } + }); + + return result; + } +} diff --git a/src/main/java/de/juplo/kafka/wordcount/users/UsersResult.java b/src/main/java/de/juplo/kafka/wordcount/users/UsersResult.java new file mode 100644 index 0000000..d712039 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/users/UsersResult.java @@ -0,0 +1,19 @@ +package de.juplo.kafka.wordcount.users; + +import com.fasterxml.jackson.annotation.JsonInclude; +import lombok.Value; + +import static com.fasterxml.jackson.annotation.JsonInclude.Include.NON_NULL; + + +@Value(staticConstructor = "of") +public class UsersResult +{ + @JsonInclude(NON_NULL) private final String username; + @JsonInclude(NON_NULL) private final User user; + @JsonInclude(NON_NULL) private final String topic; + @JsonInclude(NON_NULL) private final Integer partition; + @JsonInclude(NON_NULL) private final Long offset; + @JsonInclude(NON_NULL) private final Integer status; + @JsonInclude(NON_NULL) private final String error; +} diff --git a/src/test/java/de/juplo/kafka/wordcount/recorder/ApplicationTests.java b/src/test/java/de/juplo/kafka/wordcount/recorder/ApplicationTests.java deleted file mode 100644 index 885a408..0000000 --- a/src/test/java/de/juplo/kafka/wordcount/recorder/ApplicationTests.java +++ /dev/null @@ -1,13 +0,0 @@ -package de.juplo.kafka.wordcount.recorder; - -import org.junit.jupiter.api.Test; -import org.springframework.boot.test.context.SpringBootTest; - -@SpringBootTest -class ApplicationTests -{ - @Test - void contextLoads() - { - } -} diff --git a/src/test/java/de/juplo/kafka/wordcount/users/ApplicationTests.java b/src/test/java/de/juplo/kafka/wordcount/users/ApplicationTests.java new file mode 100644 index 0000000..9a92bf1 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/users/ApplicationTests.java @@ -0,0 +1,13 @@ +package de.juplo.kafka.wordcount.users; + +import org.junit.jupiter.api.Test; +import org.springframework.boot.test.context.SpringBootTest; + +@SpringBootTest +class ApplicationTests +{ + @Test + void contextLoads() + { + } +}