<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>de.juplo.kafka.wordcount</groupId>
- <artifactId>recorder</artifactId>
+ <artifactId>users</artifactId>
<version>1.0.0</version>
- <name>Wordcount-Recorder</name>
- <description>Recorder-service of the multi-user wordcount-example</description>
+ <name>Wordcount-Users</name>
+ <description>Users-service of the multi-user wordcount-example</description>
<properties>
<docker-maven-plugin.version>0.33.0</docker-maven-plugin.version>
<java.version>11</java.version>
+++ /dev/null
-package de.juplo.kafka.wordcount.recorder;
-
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.Bean;
-import org.springframework.util.Assert;
-
-import java.util.Properties;
-
-
-@SpringBootApplication
-@EnableConfigurationProperties(RecorderApplicationProperties.class)
-public class RecorderApplication
-{
- @Bean(destroyMethod = "close")
- KafkaProducer<String, String> producer(RecorderApplicationProperties properties)
- {
- Assert.hasText(properties.getBootstrapServer(), "juplo.wordcount.recorder.bootstrap-server must be set");
-
- Properties props = new Properties();
- props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-
- return new KafkaProducer<>(props);
- }
-
- public static void main(String[] args)
- {
- SpringApplication.run(RecorderApplication.class, args);
- }
-}
+++ /dev/null
-package de.juplo.kafka.wordcount.recorder;
-
-
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
-import org.springframework.boot.context.properties.ConfigurationProperties;
-
-
-@ConfigurationProperties("juplo.wordcount.recorder")
-@Getter
-@Setter
-@ToString
-public class RecorderApplicationProperties
-{
- private String bootstrapServer = "localhost:9092";
- private String topic = "recordings";
-}
+++ /dev/null
-package de.juplo.kafka.wordcount.recorder;
-
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.springframework.http.HttpStatus;
-import org.springframework.http.ResponseEntity;
-import org.springframework.util.MimeTypeUtils;
-import org.springframework.web.bind.annotation.PathVariable;
-import org.springframework.web.bind.annotation.PostMapping;
-import org.springframework.web.bind.annotation.RequestBody;
-import org.springframework.web.bind.annotation.RestController;
-import org.springframework.web.context.request.async.DeferredResult;
-
-import javax.validation.constraints.NotEmpty;
-
-
-@RestController
-public class RecorderController
-{
- private final String topic;
- private final KafkaProducer<String, String> producer;
-
-
- public RecorderController(RecorderApplicationProperties properties, KafkaProducer<String,String> producer)
- {
- this.topic = properties.getTopic();
- this.producer = producer;
- }
-
- @PostMapping(
- path = "/{username}",
- consumes = {
- MimeTypeUtils.TEXT_PLAIN_VALUE,
- MimeTypeUtils.APPLICATION_JSON_VALUE
- },
- produces = MimeTypeUtils.APPLICATION_JSON_VALUE)
- DeferredResult<ResponseEntity<RecordingResult>> speak(
- @PathVariable
- @NotEmpty(message = "A username must be provided")
- String username,
- @RequestBody
- @NotEmpty(message = "The spoken sentence must not be empty!")
- String sentence)
- {
- DeferredResult<ResponseEntity<RecordingResult>> result = new DeferredResult<>();
-
- ProducerRecord<String, String> record = new ProducerRecord<>(topic, username, sentence);
- producer.send(record, (metadata, exception) ->
- {
- if (metadata != null)
- {
- result.setResult(
- ResponseEntity.ok(RecordingResult.of(
- username,
- sentence,
- topic,
- metadata.partition(),
- metadata.offset(),
- null,
- null)));
- }
- else
- {
- result.setErrorResult(
- ResponseEntity
- .internalServerError()
- .body(RecordingResult.of(
- username,
- sentence,
- topic,
- null,
- null,
- HttpStatus.INTERNAL_SERVER_ERROR.value(),
- exception.toString())));
- }
- });
-
- return result;
- }
-}
+++ /dev/null
-package de.juplo.kafka.wordcount.recorder;
-
-import com.fasterxml.jackson.annotation.JsonInclude;
-import lombok.Value;
-
-import static com.fasterxml.jackson.annotation.JsonInclude.Include.NON_NULL;
-
-
-@Value(staticConstructor = "of")
-public class RecordingResult
-{
- @JsonInclude(NON_NULL) private final String username;
- @JsonInclude(NON_NULL) private final String sentence;
- @JsonInclude(NON_NULL) private final String topic;
- @JsonInclude(NON_NULL) private final Integer partition;
- @JsonInclude(NON_NULL) private final Long offset;
- @JsonInclude(NON_NULL) private final Integer status;
- @JsonInclude(NON_NULL) private final String error;
-}
--- /dev/null
+package de.juplo.kafka.wordcount.users;
+
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+
+import javax.validation.constraints.NotEmpty;
+
+
+@Getter
+@Setter
+@ToString
+@EqualsAndHashCode(of = "username")
+public class User
+{
+ public enum Sex { FEMALE, MALE, OTHER }
+
+ @NotEmpty
+ private String username;
+
+ private String firstName;
+ private String lastName;
+ private Sex sex;
+}
--- /dev/null
+package de.juplo.kafka.wordcount.users;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.util.Assert;
+
+import java.util.Properties;
+
+
+@SpringBootApplication
+@EnableConfigurationProperties(UsersApplicationProperties.class)
+public class UsersApplication
+{
+ @Bean(destroyMethod = "close")
+ KafkaProducer<String, String> producer(UsersApplicationProperties properties)
+ {
+ Assert.hasText(properties.getBootstrapServer(), "juplo.wordcount.recorder.bootstrap-server must be set");
+
+ Properties props = new Properties();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+
+ return new KafkaProducer<>(props);
+ }
+
+ public static void main(String[] args)
+ {
+ SpringApplication.run(UsersApplication.class, args);
+ }
+}
--- /dev/null
+package de.juplo.kafka.wordcount.users;
+
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+
+@ConfigurationProperties("juplo.wordcount.users")
+@Getter
+@Setter
+@ToString
+public class UsersApplicationProperties
+{
+ private String bootstrapServer = "localhost:9092";
+ private String topic = "users";
+}
--- /dev/null
+package de.juplo.kafka.wordcount.users;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
+import org.springframework.util.MimeTypeUtils;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.context.request.async.DeferredResult;
+
+
+@RestController
+public class UsersController
+{
+ private final String topic;
+ private final ObjectMapper mapper;
+ private final KafkaProducer<String, String> producer;
+
+
+ public UsersController(
+ UsersApplicationProperties properties,
+ ObjectMapper mapper,
+ KafkaProducer<String,String> producer)
+ {
+ this.topic = properties.getTopic();
+ this.mapper = mapper;
+ this.producer = producer;
+ }
+
+ @PostMapping(
+ path = "/users",
+ consumes = {
+ MimeTypeUtils.TEXT_PLAIN_VALUE,
+ MimeTypeUtils.APPLICATION_JSON_VALUE
+ },
+ produces = MimeTypeUtils.APPLICATION_JSON_VALUE)
+ DeferredResult<ResponseEntity<UsersResult>> post(@RequestBody User user) throws JsonProcessingException
+ {
+ DeferredResult<ResponseEntity<UsersResult>> result = new DeferredResult<>();
+
+ String value = mapper.writeValueAsString(user);
+ ProducerRecord<String, String> record = new ProducerRecord<>(topic, user.getUsername(), value);
+ producer.send(record, (metadata, exception) ->
+ {
+ if (metadata != null)
+ {
+ result.setResult(
+ ResponseEntity.ok(UsersResult.of(
+ user.getUsername(),
+ user,
+ topic,
+ metadata.partition(),
+ metadata.offset(),
+ null,
+ null)));
+ }
+ else
+ {
+ result.setErrorResult(
+ ResponseEntity
+ .internalServerError()
+ .body(UsersResult.of(
+ user.getUsername(),
+ user,
+ topic,
+ null,
+ null,
+ HttpStatus.INTERNAL_SERVER_ERROR.value(),
+ exception.toString())));
+ }
+ });
+
+ return result;
+ }
+}
--- /dev/null
+package de.juplo.kafka.wordcount.users;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import lombok.Value;
+
+import static com.fasterxml.jackson.annotation.JsonInclude.Include.NON_NULL;
+
+
+@Value(staticConstructor = "of")
+public class UsersResult
+{
+ @JsonInclude(NON_NULL) private final String username;
+ @JsonInclude(NON_NULL) private final User user;
+ @JsonInclude(NON_NULL) private final String topic;
+ @JsonInclude(NON_NULL) private final Integer partition;
+ @JsonInclude(NON_NULL) private final Long offset;
+ @JsonInclude(NON_NULL) private final Integer status;
+ @JsonInclude(NON_NULL) private final String error;
+}
+++ /dev/null
-package de.juplo.kafka.wordcount.recorder;
-
-import org.junit.jupiter.api.Test;
-import org.springframework.boot.test.context.SpringBootTest;
-
-@SpringBootTest
-class ApplicationTests
-{
- @Test
- void contextLoads()
- {
- }
-}
--- /dev/null
+package de.juplo.kafka.wordcount.users;
+
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.test.context.SpringBootTest;
+
+@SpringBootTest
+class ApplicationTests
+{
+ @Test
+ void contextLoads()
+ {
+ }
+}