import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.wordcount.avro.User;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.http.HttpStatus;
{
private final String topic;
private final ObjectMapper mapper;
- private final KafkaProducer<String, String> producer;
+ private final KafkaProducer<String, User> producer;
public UsersController(
UsersApplicationProperties properties,
ObjectMapper mapper,
- KafkaProducer<String,String> producer)
+ KafkaProducer<String,User> producer)
{
this.topic = properties.getTopic();
this.mapper = mapper;
MimeTypeUtils.APPLICATION_JSON_VALUE
},
produces = MimeTypeUtils.APPLICATION_JSON_VALUE)
- DeferredResult<ResponseEntity<UsersResult>> post(@RequestBody User user) throws JsonProcessingException
+ DeferredResult<ResponseEntity<UsersResult>> post(@RequestBody UserRequest userRequest) throws JsonProcessingException
{
DeferredResult<ResponseEntity<UsersResult>> result = new DeferredResult<>();
- String value = mapper.writeValueAsString(user);
- ProducerRecord<String, String> record = new ProducerRecord<>(topic, user.getUsername(), value);
+ ProducerRecord<String, User> record =
+ new ProducerRecord<>(
+ topic,
+ userRequest.getUsername(),
+ User
+ .newBuilder()
+ .setUsername(userRequest.getUsername())
+ .setFirstName(userRequest.getFirstName())
+ .setLastName(userRequest.getLastName())
+ .build());
producer.send(record, (metadata, exception) ->
{
if (metadata != null)
{
result.setResult(
ResponseEntity.ok(UsersResult.of(
- user.getUsername(),
- user,
+ userRequest.getUsername(),
+ userRequest,
topic,
metadata.partition(),
metadata.offset(),
ResponseEntity
.internalServerError()
.body(UsersResult.of(
- user.getUsername(),
- user,
+ userRequest.getUsername(),
+ userRequest,
topic,
null,
null,