WIP
[demos/kafka/demos-kafka-payment-system-transfer] / transfer / src / main / java / de / juplo / kafka / payment / transfer / domain / TransferService.java
1 package de.juplo.kafka.payment.transfer.domain;
2
3
4 import com.fasterxml.jackson.databind.ObjectMapper;
5 import lombok.RequiredArgsConstructor;
6 import lombok.extern.slf4j.Slf4j;
7 import org.apache.kafka.clients.producer.KafkaProducer;
8 import org.apache.kafka.clients.producer.ProducerRecord;
9 import org.springframework.http.ResponseEntity;
10
11 import java.net.URI;
12 import java.util.UUID;
13
14
15 @Slf4j
16 @RequiredArgsConstructor
17 public class TransferService
18 {
19   private final TransferRepository repository;
20   private final KafkaProducer<UUID, String> producer;
21   private final ObjectMapper mapper;
22   private final String topic;
23
24   public void initiate(Transfer transfer)
25   {
26     repository
27         .get(transfer.getId())
28         .ifPresentOrElse(
29             stored ->
30             {
31
32             },
33             () ->
34             {
35
36             });
37     try
38     {
39       ProducerRecord<UUID, String> record =
40           new ProducerRecord<>(
41               topic,
42               transfer.getId(),
43               mapper.writeValueAsString(transfer));
44
45       producer.send(record, (metadata, exception) ->
46       {
47         if (exception != null)
48         {
49           log.error("Could not place order {}: {}", transfer, exception.toString());
50           result.setErrorResult(exception);
51           return;
52         }
53
54         result.setResult(ResponseEntity.created(URI.create(path + transfer.getId())).build());
55       });
56     }
57     catch (Exception e)
58     {
59       log.error("Unexpected exception!", e);
60       result.setErrorResult(e);
61     }
62
63     return result;
64   }
65 }