X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fusers%2FUsersController.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fusers%2FUsersController.java;h=4e64524aa1effbba05f5753d6e9a7afec6238bda;hb=fc7b9dfe12a1401e7365d85ec723364750bdcd3d;hp=0000000000000000000000000000000000000000;hpb=338279a329a06be7a141a3930d80b2a2805719dc;p=demos%2Fkafka%2Fwordcount diff --git a/src/main/java/de/juplo/kafka/wordcount/users/UsersController.java b/src/main/java/de/juplo/kafka/wordcount/users/UsersController.java new file mode 100644 index 0000000..4e64524 --- /dev/null +++ b/src/main/java/de/juplo/kafka/wordcount/users/UsersController.java @@ -0,0 +1,79 @@ +package de.juplo.kafka.wordcount.users; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.util.MimeTypeUtils; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.context.request.async.DeferredResult; + + +@RestController +public class UsersController +{ + private final String topic; + private final ObjectMapper mapper; + private final KafkaProducer producer; + + + public UsersController( + UsersApplicationProperties properties, + ObjectMapper mapper, + KafkaProducer producer) + { + this.topic = properties.getTopic(); + this.mapper = mapper; + this.producer = producer; + } + + @PostMapping( + path = "/users", + consumes = { + MimeTypeUtils.TEXT_PLAIN_VALUE, + MimeTypeUtils.APPLICATION_JSON_VALUE + }, + produces = MimeTypeUtils.APPLICATION_JSON_VALUE) + DeferredResult> post(@RequestBody User user) throws JsonProcessingException + { + DeferredResult> result = new DeferredResult<>(); + + String value = mapper.writeValueAsString(user); + ProducerRecord record = new ProducerRecord<>(topic, user.getUsername(), value); + producer.send(record, (metadata, exception) -> + { + if (metadata != null) + { + result.setResult( + ResponseEntity.ok(UsersResult.of( + user.getUsername(), + user, + topic, + metadata.partition(), + metadata.offset(), + null, + null))); + } + else + { + result.setErrorResult( + ResponseEntity + .internalServerError() + .body(UsersResult.of( + user.getUsername(), + user, + topic, + null, + null, + HttpStatus.INTERNAL_SERVER_ERROR.value(), + exception.toString()))); + } + }); + + return result; + } +}