users:1.0.0 - users are directly send to kafka
authorKai Moritz <kai@juplo.de>
Wed, 1 Sep 2021 18:33:32 +0000 (20:33 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 4 Sep 2021 15:54:45 +0000 (17:54 +0200)
12 files changed:
pom.xml
src/main/java/de/juplo/kafka/wordcount/recorder/RecorderApplication.java [deleted file]
src/main/java/de/juplo/kafka/wordcount/recorder/RecorderApplicationProperties.java [deleted file]
src/main/java/de/juplo/kafka/wordcount/recorder/RecorderController.java [deleted file]
src/main/java/de/juplo/kafka/wordcount/recorder/RecordingResult.java [deleted file]
src/main/java/de/juplo/kafka/wordcount/users/User.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/wordcount/users/UsersApplication.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/wordcount/users/UsersApplicationProperties.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/wordcount/users/UsersController.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/wordcount/users/UsersResult.java [new file with mode: 0644]
src/test/java/de/juplo/kafka/wordcount/recorder/ApplicationTests.java [deleted file]
src/test/java/de/juplo/kafka/wordcount/users/ApplicationTests.java [new file with mode: 0644]

diff --git a/pom.xml b/pom.xml
index 98cc0dc..5790169 100644 (file)
--- a/pom.xml
+++ b/pom.xml
@@ -9,10 +9,10 @@
                <relativePath/> <!-- lookup parent from repository -->
        </parent>
        <groupId>de.juplo.kafka.wordcount</groupId>
-       <artifactId>recorder</artifactId>
+       <artifactId>users</artifactId>
        <version>1.0.0</version>
-       <name>Wordcount-Recorder</name>
-       <description>Recorder-service of the multi-user wordcount-example</description>
+       <name>Wordcount-Users</name>
+       <description>Users-service of the multi-user wordcount-example</description>
        <properties>
                <docker-maven-plugin.version>0.33.0</docker-maven-plugin.version>
                <java.version>11</java.version>
diff --git a/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderApplication.java b/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderApplication.java
deleted file mode 100644 (file)
index abe0685..0000000
+++ /dev/null
@@ -1,36 +0,0 @@
-package de.juplo.kafka.wordcount.recorder;
-
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.springframework.boot.SpringApplication;
-import org.springframework.boot.autoconfigure.SpringBootApplication;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.Bean;
-import org.springframework.util.Assert;
-
-import java.util.Properties;
-
-
-@SpringBootApplication
-@EnableConfigurationProperties(RecorderApplicationProperties.class)
-public class RecorderApplication
-{
-       @Bean(destroyMethod = "close")
-       KafkaProducer<String, String> producer(RecorderApplicationProperties properties)
-       {
-               Assert.hasText(properties.getBootstrapServer(), "juplo.wordcount.recorder.bootstrap-server must be set");
-
-               Properties props = new Properties();
-               props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
-               props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-               props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
-
-               return new KafkaProducer<>(props);
-       }
-
-       public static void main(String[] args)
-       {
-               SpringApplication.run(RecorderApplication.class, args);
-       }
-}
diff --git a/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderApplicationProperties.java b/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderApplicationProperties.java
deleted file mode 100644 (file)
index 552ebaf..0000000
+++ /dev/null
@@ -1,18 +0,0 @@
-package de.juplo.kafka.wordcount.recorder;
-
-
-import lombok.Getter;
-import lombok.Setter;
-import lombok.ToString;
-import org.springframework.boot.context.properties.ConfigurationProperties;
-
-
-@ConfigurationProperties("juplo.wordcount.recorder")
-@Getter
-@Setter
-@ToString
-public class RecorderApplicationProperties
-{
-  private String bootstrapServer = "localhost:9092";
-  private String topic = "recordings";
-}
diff --git a/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderController.java b/src/main/java/de/juplo/kafka/wordcount/recorder/RecorderController.java
deleted file mode 100644 (file)
index 5fe69ad..0000000
+++ /dev/null
@@ -1,80 +0,0 @@
-package de.juplo.kafka.wordcount.recorder;
-
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.springframework.http.HttpStatus;
-import org.springframework.http.ResponseEntity;
-import org.springframework.util.MimeTypeUtils;
-import org.springframework.web.bind.annotation.PathVariable;
-import org.springframework.web.bind.annotation.PostMapping;
-import org.springframework.web.bind.annotation.RequestBody;
-import org.springframework.web.bind.annotation.RestController;
-import org.springframework.web.context.request.async.DeferredResult;
-
-import javax.validation.constraints.NotEmpty;
-
-
-@RestController
-public class RecorderController
-{
-  private final String topic;
-  private final KafkaProducer<String, String> producer;
-
-
-  public RecorderController(RecorderApplicationProperties properties, KafkaProducer<String,String> producer)
-  {
-    this.topic = properties.getTopic();
-    this.producer = producer;
-  }
-
-  @PostMapping(
-      path = "/{username}",
-      consumes = {
-          MimeTypeUtils.TEXT_PLAIN_VALUE,
-          MimeTypeUtils.APPLICATION_JSON_VALUE
-      },
-      produces = MimeTypeUtils.APPLICATION_JSON_VALUE)
-  DeferredResult<ResponseEntity<RecordingResult>> speak(
-      @PathVariable
-      @NotEmpty(message = "A username must be provided")
-      String username,
-      @RequestBody
-      @NotEmpty(message = "The spoken sentence must not be empty!")
-      String sentence)
-  {
-    DeferredResult<ResponseEntity<RecordingResult>> result = new DeferredResult<>();
-
-    ProducerRecord<String, String> record = new ProducerRecord<>(topic, username, sentence);
-    producer.send(record, (metadata, exception) ->
-    {
-      if (metadata != null)
-      {
-        result.setResult(
-            ResponseEntity.ok(RecordingResult.of(
-                username,
-                sentence,
-                topic,
-                metadata.partition(),
-                metadata.offset(),
-                null,
-                null)));
-      }
-      else
-      {
-        result.setErrorResult(
-            ResponseEntity
-                .internalServerError()
-                .body(RecordingResult.of(
-                    username,
-                    sentence,
-                    topic,
-                    null,
-                    null,
-                    HttpStatus.INTERNAL_SERVER_ERROR.value(),
-                    exception.toString())));
-      }
-    });
-
-    return result;
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/wordcount/recorder/RecordingResult.java b/src/main/java/de/juplo/kafka/wordcount/recorder/RecordingResult.java
deleted file mode 100644 (file)
index 939b1d4..0000000
+++ /dev/null
@@ -1,19 +0,0 @@
-package de.juplo.kafka.wordcount.recorder;
-
-import com.fasterxml.jackson.annotation.JsonInclude;
-import lombok.Value;
-
-import static com.fasterxml.jackson.annotation.JsonInclude.Include.NON_NULL;
-
-
-@Value(staticConstructor = "of")
-public class RecordingResult
-{
-  @JsonInclude(NON_NULL) private final String username;
-  @JsonInclude(NON_NULL) private final String sentence;
-  @JsonInclude(NON_NULL) private final String topic;
-  @JsonInclude(NON_NULL) private final Integer partition;
-  @JsonInclude(NON_NULL) private final Long offset;
-  @JsonInclude(NON_NULL) private final Integer status;
-  @JsonInclude(NON_NULL) private final String error;
-}
diff --git a/src/main/java/de/juplo/kafka/wordcount/users/User.java b/src/main/java/de/juplo/kafka/wordcount/users/User.java
new file mode 100644 (file)
index 0000000..04c9e0c
--- /dev/null
@@ -0,0 +1,25 @@
+package de.juplo.kafka.wordcount.users;
+
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+
+import javax.validation.constraints.NotEmpty;
+
+
+@Getter
+@Setter
+@ToString
+@EqualsAndHashCode(of = "username")
+public class User
+{
+  public enum Sex { FEMALE, MALE, OTHER }
+
+  @NotEmpty
+  private String username;
+
+  private String firstName;
+  private String lastName;
+  private Sex sex;
+}
diff --git a/src/main/java/de/juplo/kafka/wordcount/users/UsersApplication.java b/src/main/java/de/juplo/kafka/wordcount/users/UsersApplication.java
new file mode 100644 (file)
index 0000000..9ae44e1
--- /dev/null
@@ -0,0 +1,36 @@
+package de.juplo.kafka.wordcount.users;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.util.Assert;
+
+import java.util.Properties;
+
+
+@SpringBootApplication
+@EnableConfigurationProperties(UsersApplicationProperties.class)
+public class UsersApplication
+{
+       @Bean(destroyMethod = "close")
+       KafkaProducer<String, String> producer(UsersApplicationProperties properties)
+       {
+               Assert.hasText(properties.getBootstrapServer(), "juplo.wordcount.recorder.bootstrap-server must be set");
+
+               Properties props = new Properties();
+               props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServer());
+               props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+               props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+
+               return new KafkaProducer<>(props);
+       }
+
+       public static void main(String[] args)
+       {
+               SpringApplication.run(UsersApplication.class, args);
+       }
+}
diff --git a/src/main/java/de/juplo/kafka/wordcount/users/UsersApplicationProperties.java b/src/main/java/de/juplo/kafka/wordcount/users/UsersApplicationProperties.java
new file mode 100644 (file)
index 0000000..8218f99
--- /dev/null
@@ -0,0 +1,18 @@
+package de.juplo.kafka.wordcount.users;
+
+
+import lombok.Getter;
+import lombok.Setter;
+import lombok.ToString;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+
+@ConfigurationProperties("juplo.wordcount.users")
+@Getter
+@Setter
+@ToString
+public class UsersApplicationProperties
+{
+  private String bootstrapServer = "localhost:9092";
+  private String topic = "users";
+}
diff --git a/src/main/java/de/juplo/kafka/wordcount/users/UsersController.java b/src/main/java/de/juplo/kafka/wordcount/users/UsersController.java
new file mode 100644 (file)
index 0000000..4e64524
--- /dev/null
@@ -0,0 +1,79 @@
+package de.juplo.kafka.wordcount.users;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.ResponseEntity;
+import org.springframework.util.MimeTypeUtils;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.context.request.async.DeferredResult;
+
+
+@RestController
+public class UsersController
+{
+  private final String topic;
+  private final ObjectMapper mapper;
+  private final KafkaProducer<String, String> producer;
+
+
+  public UsersController(
+      UsersApplicationProperties properties,
+      ObjectMapper mapper,
+      KafkaProducer<String,String> producer)
+  {
+    this.topic = properties.getTopic();
+    this.mapper = mapper;
+    this.producer = producer;
+  }
+
+  @PostMapping(
+      path = "/users",
+      consumes = {
+          MimeTypeUtils.TEXT_PLAIN_VALUE,
+          MimeTypeUtils.APPLICATION_JSON_VALUE
+      },
+      produces = MimeTypeUtils.APPLICATION_JSON_VALUE)
+  DeferredResult<ResponseEntity<UsersResult>> post(@RequestBody User user) throws JsonProcessingException
+  {
+    DeferredResult<ResponseEntity<UsersResult>> result = new DeferredResult<>();
+
+    String value = mapper.writeValueAsString(user);
+    ProducerRecord<String, String> record = new ProducerRecord<>(topic, user.getUsername(), value);
+    producer.send(record, (metadata, exception) ->
+    {
+      if (metadata != null)
+      {
+        result.setResult(
+            ResponseEntity.ok(UsersResult.of(
+                user.getUsername(),
+                user,
+                topic,
+                metadata.partition(),
+                metadata.offset(),
+                null,
+                null)));
+      }
+      else
+      {
+        result.setErrorResult(
+            ResponseEntity
+                .internalServerError()
+                .body(UsersResult.of(
+                    user.getUsername(),
+                    user,
+                    topic,
+                    null,
+                    null,
+                    HttpStatus.INTERNAL_SERVER_ERROR.value(),
+                    exception.toString())));
+      }
+    });
+
+    return result;
+  }
+}
diff --git a/src/main/java/de/juplo/kafka/wordcount/users/UsersResult.java b/src/main/java/de/juplo/kafka/wordcount/users/UsersResult.java
new file mode 100644 (file)
index 0000000..d712039
--- /dev/null
@@ -0,0 +1,19 @@
+package de.juplo.kafka.wordcount.users;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import lombok.Value;
+
+import static com.fasterxml.jackson.annotation.JsonInclude.Include.NON_NULL;
+
+
+@Value(staticConstructor = "of")
+public class UsersResult
+{
+  @JsonInclude(NON_NULL) private final String username;
+  @JsonInclude(NON_NULL) private final User user;
+  @JsonInclude(NON_NULL) private final String topic;
+  @JsonInclude(NON_NULL) private final Integer partition;
+  @JsonInclude(NON_NULL) private final Long offset;
+  @JsonInclude(NON_NULL) private final Integer status;
+  @JsonInclude(NON_NULL) private final String error;
+}
diff --git a/src/test/java/de/juplo/kafka/wordcount/recorder/ApplicationTests.java b/src/test/java/de/juplo/kafka/wordcount/recorder/ApplicationTests.java
deleted file mode 100644 (file)
index 885a408..0000000
+++ /dev/null
@@ -1,13 +0,0 @@
-package de.juplo.kafka.wordcount.recorder;
-
-import org.junit.jupiter.api.Test;
-import org.springframework.boot.test.context.SpringBootTest;
-
-@SpringBootTest
-class ApplicationTests
-{
-       @Test
-       void contextLoads()
-       {
-       }
-}
diff --git a/src/test/java/de/juplo/kafka/wordcount/users/ApplicationTests.java b/src/test/java/de/juplo/kafka/wordcount/users/ApplicationTests.java
new file mode 100644 (file)
index 0000000..9a92bf1
--- /dev/null
@@ -0,0 +1,13 @@
+package de.juplo.kafka.wordcount.users;
+
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.test.context.SpringBootTest;
+
+@SpringBootTest
+class ApplicationTests
+{
+       @Test
+       void contextLoads()
+       {
+       }
+}