WIP master
authorKai Moritz <kai@juplo.de>
Mon, 15 Nov 2021 20:41:03 +0000 (21:41 +0100)
committerKai Moritz <kai@juplo.de>
Mon, 15 Nov 2021 20:41:03 +0000 (21:41 +0100)
src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java
src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java
src/main/java/de/juplo/kafka/payment/transfer/ports/CreateTransferUseCase.java
src/main/java/de/juplo/kafka/payment/transfer/ports/HandleStateChangeUseCase.java
src/main/java/de/juplo/kafka/payment/transfer/ports/TransferService.java

index a5c1faf..e94f703 100644 (file)
@@ -113,9 +113,9 @@ public class TransferServiceApplication
             mapper,
             new TransferConsumer.ConsumerUseCases() {
               @Override
-              public void create(Long id, Long payer, Long payee, Integer amount)
+              public TransferStateChangedEvent create(Long id, Long payer, Long payee, Integer amount)
               {
-                productionTransferService.create(id, payer, payee, amount);
+                return productionTransferService.create(id, payer, payee, amount);
               }
 
               @Override
@@ -125,16 +125,17 @@ public class TransferServiceApplication
               }
 
               @Override
-              public void handleStateChange(Long id, Transfer.State state)
+              public TransferStateChangedEvent handleStateChange(
+                  TransferStateChangedEvent stateChangedEvent)
               {
-                productionTransferService.handleStateChange(id, state);
+                return productionTransferService.handleStateChange(stateChangedEvent);
               }
             },
             new TransferConsumer.ConsumerUseCases() {
               @Override
-              public void create(Long id, Long payer, Long payee, Integer amount)
+              public TransferStateChangedEvent create(Long id, Long payer, Long payee, Integer amount)
               {
-                restoreTransferService.create(id, payer, payee, amount);
+                return restoreTransferService.create(id, payer, payee, amount);
               }
 
               @Override
@@ -144,9 +145,10 @@ public class TransferServiceApplication
               }
 
               @Override
-              public void handleStateChange(Long id, Transfer.State state)
+              public TransferStateChangedEvent handleStateChange(
+                  TransferStateChangedEvent stateChangedEvent)
               {
-                restoreTransferService.handleStateChange(id, state);
+                return restoreTransferService.handleStateChange(stateChangedEvent);
               }
             });
   }
index 1cae540..c66c534 100644 (file)
@@ -172,7 +172,7 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener
 
           TransferStateChangedEvent stateChangedEvent =
               mapper.readValue(record.value(), TransferStateChangedEvent.class);
-          useCases.handleStateChange(stateChangedEvent.getId(), stateChangedEvent.getState());
+          useCases.handleStateChange(stateChangedEvent);
           break;
       }
     }
index bfe6156..8591a0d 100644 (file)
@@ -1,9 +1,11 @@
 package de.juplo.kafka.payment.transfer.ports;
 
-import de.juplo.kafka.payment.transfer.domain.Transfer;
+import de.juplo.kafka.payment.transfer.adapter.TransferStateChangedEvent;
+
+import java.util.Optional;
 
 
 public interface CreateTransferUseCase
 {
-  void create(Long id, Long payer, Long payee, Integer amount);
+  Optional<TransferStateChangedEvent> create(Long id, Long payer, Long payee, Integer amount);
 }
index 9b3e270..7d9fe83 100644 (file)
@@ -1,9 +1,9 @@
 package de.juplo.kafka.payment.transfer.ports;
 
-import de.juplo.kafka.payment.transfer.domain.Transfer;
+import de.juplo.kafka.payment.transfer.adapter.TransferStateChangedEvent;
 
 
 public interface HandleStateChangeUseCase
 {
-  void handleStateChange(Long id, Transfer.State state);
+  TransferStateChangedEvent handleStateChange(TransferStateChangedEvent stateChangedEvent);
 }
index d4eb3ef..7ee0432 100644 (file)
@@ -1,6 +1,7 @@
 package de.juplo.kafka.payment.transfer.ports;
 
 
+import de.juplo.kafka.payment.transfer.adapter.TransferStateChangedEvent;
 import de.juplo.kafka.payment.transfer.domain.Transfer;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
@@ -19,36 +20,35 @@ public class TransferService implements CreateTransferUseCase, HandleStateChange
   private final MessagingService messagingService;
 
   @Override
-  public void create(Long id, Long payer, Long payee, Integer amount)
+  public Optional<TransferStateChangedEvent> create(Long id, Long payer, Long payee, Integer amount)
   {
+    return
     repository
         .get(id)
-        .ifPresentOrElse(
-            stored -> log.info(
+        .flatMap(
+            stored ->
+            {log.info(
                 "transfer already exisits: {}, ignoring: id={}, payer={}, payee={}, amount={}",
                 stored,
                 payer,
                 payee,
-                amount),
-            () ->
+                amount);
+            return Optional.empty();
+            })
+        .or(() ->
             {
-              Transfer transfer =
-                  Transfer
+              log.info("creating transfer: {}", transfer);
+              return
+                  TransferStateChangedEvent
                       .builder()
-                      .id(id)
-                      .payer(payer)
-                      .payee(payee)
-                      .amount(amount)
+                      .id(transfer.getId())
+                      .state(CREATED)
                       .build();
-
-              log.info("creating transfer: {}", transfer);
-              repository.store(transfer);
-              messagingService.send(transfer.getId(), CREATED);
             });
   }
 
   @Override
-  public void handleStateChange(Long id, Transfer.State state)
+  public TransferStateChangedEvent handleStateChange(Long id, Transfer.State state)
   {
     get(id)
         .ifPresentOrElse(