WIP
[demos/kafka/demos-kafka-payment-system-transfer] / src / main / java / de / juplo / kafka / payment / transfer / ports / TransferService.java
index d4eb3ef..7ee0432 100644 (file)
@@ -1,6 +1,7 @@
 package de.juplo.kafka.payment.transfer.ports;
 
 
+import de.juplo.kafka.payment.transfer.adapter.TransferStateChangedEvent;
 import de.juplo.kafka.payment.transfer.domain.Transfer;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
@@ -19,36 +20,35 @@ public class TransferService implements CreateTransferUseCase, HandleStateChange
   private final MessagingService messagingService;
 
   @Override
-  public void create(Long id, Long payer, Long payee, Integer amount)
+  public Optional<TransferStateChangedEvent> create(Long id, Long payer, Long payee, Integer amount)
   {
+    return
     repository
         .get(id)
-        .ifPresentOrElse(
-            stored -> log.info(
+        .flatMap(
+            stored ->
+            {log.info(
                 "transfer already exisits: {}, ignoring: id={}, payer={}, payee={}, amount={}",
                 stored,
                 payer,
                 payee,
-                amount),
-            () ->
+                amount);
+            return Optional.empty();
+            })
+        .or(() ->
             {
-              Transfer transfer =
-                  Transfer
+              log.info("creating transfer: {}", transfer);
+              return
+                  TransferStateChangedEvent
                       .builder()
-                      .id(id)
-                      .payer(payer)
-                      .payee(payee)
-                      .amount(amount)
+                      .id(transfer.getId())
+                      .state(CREATED)
                       .build();
-
-              log.info("creating transfer: {}", transfer);
-              repository.store(transfer);
-              messagingService.send(transfer.getId(), CREATED);
             });
   }
 
   @Override
-  public void handleStateChange(Long id, Transfer.State state)
+  public TransferStateChangedEvent handleStateChange(Long id, Transfer.State state)
   {
     get(id)
         .ifPresentOrElse(