Introduced different Events for the creation and the state-changes
authorKai Moritz <kai@juplo.de>
Sat, 19 Jun 2021 07:11:08 +0000 (09:11 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 19 Jun 2021 16:17:48 +0000 (18:17 +0200)
14 files changed:
src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java
src/main/java/de/juplo/kafka/payment/transfer/adapter/EventType.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/payment/transfer/adapter/KafkaMessagingService.java
src/main/java/de/juplo/kafka/payment/transfer/adapter/NewTransferEvent.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java
src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferController.java
src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferStateChangedEvent.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/payment/transfer/domain/Transfer.java
src/main/java/de/juplo/kafka/payment/transfer/domain/TransferService.java
src/main/java/de/juplo/kafka/payment/transfer/ports/CreateTransferUseCase.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/payment/transfer/ports/HandleStateChangeUseCase.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/payment/transfer/ports/HandleTransferUseCase.java [deleted file]
src/main/java/de/juplo/kafka/payment/transfer/ports/MessagingService.java
src/test/java/de/juplo/kafka/payment/transfer/domain/TransferTest.java

index 02842e5..58a3af2 100644 (file)
@@ -67,7 +67,14 @@ public class TransferServiceApplication
       TransferService transferService)
   {
     TransferConsumer transferConsumer =
-        new TransferConsumer(properties.topic, consumer, executorService, mapper, transferService);
+        new TransferConsumer(
+            properties.topic,
+            consumer,
+            executorService,
+            mapper,
+            transferService,
+            transferService,
+            transferService);
     transferConsumer.start();
     return transferConsumer;
   }
diff --git a/src/main/java/de/juplo/kafka/payment/transfer/adapter/EventType.java b/src/main/java/de/juplo/kafka/payment/transfer/adapter/EventType.java
new file mode 100644 (file)
index 0000000..aa4b897
--- /dev/null
@@ -0,0 +1,9 @@
+package de.juplo.kafka.payment.transfer.adapter;
+
+public abstract class EventType
+{
+  public final static String HEADER = "$";
+
+  public final static byte NEW_TRANSFER = 1;
+  public final static byte TRANSFER_STATE_CHANGED = 2;
+}
index 3161af3..6da7937 100644 (file)
@@ -24,26 +24,38 @@ public class KafkaMessagingService implements MessagingService
 
   @Override
   public CompletableFuture<?> send(Transfer transfer)
+  {
+    return send(transfer.getId(), EventType.NEW_TRANSFER, NewTransferEvent.ofTransfer(transfer));
+  }
+
+  public CompletableFuture<?> send(Long id, Transfer.State state)
+  {
+    return send(id, EventType.TRANSFER_STATE_CHANGED, new TransferStateChangedEvent(id, state));
+  }
+
+  private CompletableFuture send(Long id, byte eventType, Object payload)
   {
     try
     {
       CompletableFuture<TopicPartition> future = new CompletableFuture<>();
+
       ProducerRecord<String, String> record =
           new ProducerRecord<>(
               topic,
-              Long.toString(transfer.getId()),
-              mapper.writeValueAsString(transfer));
+              Long.toString(id),
+              mapper.writeValueAsString(payload));
+      record.headers().add(EventType.HEADER, new byte[] { eventType });
 
       producer.send(record, (metadata, exception) ->
       {
         if (metadata != null)
         {
-          log.debug("Sent {} to {}/{}:{}", transfer, metadata.topic(), metadata.partition(), metadata.offset());
+          log.debug("Sent {} to {}/{}:{}", payload, metadata.topic(), metadata.partition(), metadata.offset());
           future.complete(new TopicPartition(metadata.topic(), metadata.partition()));
         }
         else
         {
-          log.error("Could not send {}: {}", transfer, exception.getMessage());
+          log.error("Could not send {}: {}", payload, exception.getMessage());
           future.completeExceptionally(exception);
         }
       });
@@ -52,7 +64,7 @@ public class KafkaMessagingService implements MessagingService
     }
     catch (JsonProcessingException e)
     {
-      throw new RuntimeException("Could not convert " + transfer, e);
+      throw new RuntimeException("Could not convert " + payload, e);
     }
   }
 
diff --git a/src/main/java/de/juplo/kafka/payment/transfer/adapter/NewTransferEvent.java b/src/main/java/de/juplo/kafka/payment/transfer/adapter/NewTransferEvent.java
new file mode 100644 (file)
index 0000000..0c5e271
--- /dev/null
@@ -0,0 +1,42 @@
+package de.juplo.kafka.payment.transfer.adapter;
+
+import de.juplo.kafka.payment.transfer.domain.Transfer;
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+
+@Data
+@EqualsAndHashCode
+@Builder
+public class NewTransferEvent
+{
+  private Long id;
+  private Long payer;
+  private Long payee;
+  private Integer amount;
+
+  public Transfer toTransfer()
+  {
+    return
+        Transfer
+            .builder()
+            .id(id)
+            .payer(payer)
+            .payee(payee)
+            .amount(amount)
+            .build();
+  }
+
+  public static NewTransferEvent ofTransfer(Transfer transfer)
+  {
+    return
+        NewTransferEvent
+            .builder()
+            .id(transfer.getId())
+            .payer(transfer.getPayer())
+            .payee(transfer.getPayee())
+            .amount(transfer.getAmount())
+            .build();
+  }
+}
index e7c2430..24f3e88 100644 (file)
@@ -3,7 +3,9 @@ package de.juplo.kafka.payment.transfer.adapter;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import de.juplo.kafka.payment.transfer.domain.Transfer;
-import de.juplo.kafka.payment.transfer.ports.HandleTransferUseCase;
+import de.juplo.kafka.payment.transfer.ports.CreateTransferUseCase;
+import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase;
+import de.juplo.kafka.payment.transfer.ports.HandleStateChangeUseCase;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -19,6 +21,8 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 
+import static de.juplo.kafka.payment.transfer.domain.Transfer.State.CREATED;
+
 
 @RequestMapping("/consumer")
 @ResponseBody
@@ -30,7 +34,9 @@ public class TransferConsumer implements Runnable
   private final KafkaConsumer<String, String> consumer;
   private final ExecutorService executorService;
   private final ObjectMapper mapper;
-  private final HandleTransferUseCase handleTransferUseCase;
+  private final GetTransferUseCase getTransferUseCase;
+  private final CreateTransferUseCase createTransferUseCase;
+  private final HandleStateChangeUseCase handleStateChangeUseCase;
 
   private boolean running = false;
   private Future<?> future = null;
@@ -51,8 +57,28 @@ public class TransferConsumer implements Runnable
         {
           try
           {
-            Transfer transfer = mapper.readValue(record.value(), Transfer.class);
-            handleTransferUseCase.handle(transfer);
+            byte eventType = record.headers().lastHeader(EventType.HEADER).value()[0];
+
+            switch (eventType)
+            {
+              case EventType.NEW_TRANSFER:
+
+                NewTransferEvent newTransferEvent =
+                    mapper.readValue(record.value(), NewTransferEvent.class);
+                createTransferUseCase.create(newTransferEvent.toTransfer().setState(CREATED));
+                break;
+
+              case EventType.TRANSFER_STATE_CHANGED:
+
+                TransferStateChangedEvent stateChangedEvent =
+                    mapper.readValue(record.value(), TransferStateChangedEvent.class);
+                getTransferUseCase
+                    .get(stateChangedEvent.getId())
+                    .ifPresentOrElse(
+                        transfer -> handleStateChangeUseCase.handle(transfer.setState(stateChangedEvent.getState())),
+                        () -> log.error("unknown transfer: {}", stateChangedEvent.getId()));
+                break;
+            }
           }
           catch (JsonProcessingException e)
           {
index 8240310..b9a2fb0 100644 (file)
@@ -63,7 +63,6 @@ import java.util.concurrent.CompletableFuture;
                             .payer(transferDTO.getPayer())
                             .payee(transferDTO.getPayee())
                             .amount(transferDTO.getAmount())
-                            .state(Transfer.State.RECEIVED)
                             .build())
                     .thenApply($ ->
                         ResponseEntity
diff --git a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferStateChangedEvent.java b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferStateChangedEvent.java
new file mode 100644 (file)
index 0000000..cdb4178
--- /dev/null
@@ -0,0 +1,20 @@
+package de.juplo.kafka.payment.transfer.adapter;
+
+
+import de.juplo.kafka.payment.transfer.domain.Transfer;
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+import java.util.LinkedList;
+import java.util.List;
+
+
+@Data
+@EqualsAndHashCode
+@Builder
+public class TransferStateChangedEvent
+{
+  private long id;
+  private Transfer.State state;
+}
index cc207d9..cac0f19 100644 (file)
@@ -16,7 +16,6 @@ public class Transfer
 {
   public enum State
   {
-    RECEIVED(false),
     CREATED(false),
     INVALID(false),
     CHECKED(false),
index 90ef682..7a2349a 100644 (file)
@@ -1,27 +1,24 @@
 package de.juplo.kafka.payment.transfer.domain;
 
 
-import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase;
-import de.juplo.kafka.payment.transfer.ports.HandleTransferUseCase;
-import de.juplo.kafka.payment.transfer.ports.MessagingService;
-import de.juplo.kafka.payment.transfer.ports.TransferRepository;
+import de.juplo.kafka.payment.transfer.ports.*;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 
 import java.util.Optional;
 
-import static de.juplo.kafka.payment.transfer.domain.Transfer.State.CHECKED;
-import static de.juplo.kafka.payment.transfer.domain.Transfer.State.CREATED;
+import static de.juplo.kafka.payment.transfer.domain.Transfer.State.*;
 
 
 @Slf4j
 @RequiredArgsConstructor
-public class TransferService implements HandleTransferUseCase, GetTransferUseCase
+public class TransferService implements CreateTransferUseCase, HandleStateChangeUseCase, GetTransferUseCase
 {
   private final TransferRepository repository;
   private final MessagingService messagingService;
 
-  private void create(Transfer transfer)
+  @Override
+  public void create(Transfer transfer)
   {
     repository
         .get(transfer.getId())
@@ -30,8 +27,7 @@ public class TransferService implements HandleTransferUseCase, GetTransferUseCas
             () ->
             {
               repository.store(transfer);
-              transfer.setState(CREATED);
-              messagingService.send(transfer);
+              messagingService.send(transfer.getId(), CREATED);
             });
   }
 
@@ -41,11 +37,6 @@ public class TransferService implements HandleTransferUseCase, GetTransferUseCas
     Transfer.State state = transfer.getState();
     switch (state)
     {
-      case RECEIVED:
-        repository.store(transfer);
-        create(transfer);
-        break;
-
       case CREATED:
         repository.store(transfer);
         check(transfer);
@@ -64,8 +55,7 @@ public class TransferService implements HandleTransferUseCase, GetTransferUseCas
   private void check(Transfer transfer)
   {
     // TODO: Do some time consuming checks...
-    transfer.setState(CHECKED);
-    messagingService.send(transfer);
+    messagingService.send(transfer.getId(), CHECKED);
   }
 
   public Optional<Transfer> get(Long id)
diff --git a/src/main/java/de/juplo/kafka/payment/transfer/ports/CreateTransferUseCase.java b/src/main/java/de/juplo/kafka/payment/transfer/ports/CreateTransferUseCase.java
new file mode 100644 (file)
index 0000000..34ae0e9
--- /dev/null
@@ -0,0 +1,9 @@
+package de.juplo.kafka.payment.transfer.ports;
+
+import de.juplo.kafka.payment.transfer.domain.Transfer;
+
+
+public interface CreateTransferUseCase
+{
+  void create(Transfer transfer);
+}
diff --git a/src/main/java/de/juplo/kafka/payment/transfer/ports/HandleStateChangeUseCase.java b/src/main/java/de/juplo/kafka/payment/transfer/ports/HandleStateChangeUseCase.java
new file mode 100644 (file)
index 0000000..2e75fc0
--- /dev/null
@@ -0,0 +1,9 @@
+package de.juplo.kafka.payment.transfer.ports;
+
+import de.juplo.kafka.payment.transfer.domain.Transfer;
+
+
+public interface HandleStateChangeUseCase
+{
+  void handle(Transfer transfer);
+}
diff --git a/src/main/java/de/juplo/kafka/payment/transfer/ports/HandleTransferUseCase.java b/src/main/java/de/juplo/kafka/payment/transfer/ports/HandleTransferUseCase.java
deleted file mode 100644 (file)
index 5d1a2b2..0000000
+++ /dev/null
@@ -1,9 +0,0 @@
-package de.juplo.kafka.payment.transfer.ports;
-
-import de.juplo.kafka.payment.transfer.domain.Transfer;
-
-
-public interface HandleTransferUseCase
-{
-  void handle(Transfer transfer);
-}
index 4037a90..b651c20 100644 (file)
@@ -8,4 +8,5 @@ import java.util.concurrent.CompletableFuture;
 public interface MessagingService
 {
   CompletableFuture<?> send(Transfer transfer);
+  CompletableFuture<?> send(Long id, Transfer.State state);
 }
index b7e8b86..6e2d21b 100644 (file)
@@ -2,8 +2,7 @@ package de.juplo.kafka.payment.transfer.domain;
 
 import org.junit.jupiter.api.Test;
 
-import static de.juplo.kafka.payment.transfer.domain.Transfer.State.CHECKED;
-import static de.juplo.kafka.payment.transfer.domain.Transfer.State.RECEIVED;
+import static de.juplo.kafka.payment.transfer.domain.Transfer.State.*;
 import static org.assertj.core.api.Assertions.assertThat;
 
 
@@ -12,7 +11,7 @@ public class TransferTest
   @Test
   public void testEqualsIgnoresState()
   {
-    Transfer a = Transfer.builder().id(1).payer(1).payee(1).amount(1).state(RECEIVED).build();
+    Transfer a = Transfer.builder().id(1).payer(1).payee(1).amount(1).state(CREATED).build();
     Transfer b = Transfer.builder().id(1).payer(1).payee(1).amount(1).state(CHECKED).build();
 
     assertThat(a).isEqualTo(b);