WIP
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / users / UsersController.java
index 4e64524..84aaf37 100644 (file)
@@ -2,6 +2,7 @@ package de.juplo.kafka.wordcount.users;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.wordcount.avro.User;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.springframework.http.HttpStatus;
@@ -18,13 +19,13 @@ public class UsersController
 {
   private final String topic;
   private final ObjectMapper mapper;
-  private final KafkaProducer<String, String> producer;
+  private final KafkaProducer<String, User> producer;
 
 
   public UsersController(
       UsersApplicationProperties properties,
       ObjectMapper mapper,
-      KafkaProducer<String,String> producer)
+      KafkaProducer<String,User> producer)
   {
     this.topic = properties.getTopic();
     this.mapper = mapper;
@@ -38,20 +39,28 @@ public class UsersController
           MimeTypeUtils.APPLICATION_JSON_VALUE
       },
       produces = MimeTypeUtils.APPLICATION_JSON_VALUE)
-  DeferredResult<ResponseEntity<UsersResult>> post(@RequestBody User user) throws JsonProcessingException
+  DeferredResult<ResponseEntity<UsersResult>> post(@RequestBody UserRequest userRequest) throws JsonProcessingException
   {
     DeferredResult<ResponseEntity<UsersResult>> result = new DeferredResult<>();
 
-    String value = mapper.writeValueAsString(user);
-    ProducerRecord<String, String> record = new ProducerRecord<>(topic, user.getUsername(), value);
+    ProducerRecord<String, User> record =
+        new ProducerRecord<>(
+            topic,
+            userRequest.getUsername(),
+            User
+                .newBuilder()
+                .setUsername(userRequest.getUsername())
+                .setFirstName(userRequest.getFirstName())
+                .setLastName(userRequest.getLastName())
+                .build());
     producer.send(record, (metadata, exception) ->
     {
       if (metadata != null)
       {
         result.setResult(
             ResponseEntity.ok(UsersResult.of(
-                user.getUsername(),
-                user,
+                userRequest.getUsername(),
+                userRequest,
                 topic,
                 metadata.partition(),
                 metadata.offset(),
@@ -64,8 +73,8 @@ public class UsersController
             ResponseEntity
                 .internalServerError()
                 .body(UsersResult.of(
-                    user.getUsername(),
-                    user,
+                    userRequest.getUsername(),
+                    userRequest,
                     topic,
                     null,
                     null,