TransferRepository does not need any synchronization
authorKai Moritz <kai@juplo.de>
Sun, 13 Jun 2021 21:40:56 +0000 (23:40 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 19 Jun 2021 16:02:03 +0000 (18:02 +0200)
* Only TransferService stores data in the repository
* Since alle instances of Transfer, that are handled by TransferService
  are received through a single topic, no synchronization is needed at
  all in the repository.
* This is, because records, that are received from a topic are guaranteed
  to be processed one after the other.
* The topic simply is the single source for processing requests and a
  KafkaConsumer never handles multiple records in parallel.
* Note: This implementation is not ready to run on multiple threads or
  nodes!

12 files changed:
src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java
src/main/java/de/juplo/kafka/payment/transfer/TransferServiceProperties.java
src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferController.java
src/main/java/de/juplo/kafka/payment/transfer/domain/Transfer.java
src/main/java/de/juplo/kafka/payment/transfer/domain/TransferService.java
src/main/java/de/juplo/kafka/payment/transfer/persistence/InMemoryTransferRepository.java
src/main/java/de/juplo/kafka/payment/transfer/ports/HandleTransferUseCase.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/payment/transfer/ports/InitiateTransferUseCase.java [deleted file]
src/main/java/de/juplo/kafka/payment/transfer/ports/ReceiveTransferUseCase.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/payment/transfer/ports/TransferRepository.java
src/test/java/de/juplo/kafka/payment/transfer/domain/TransferTest.java

index 65f683c..02842e5 100644 (file)
@@ -3,12 +3,16 @@ package de.juplo.kafka.payment.transfer;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import de.juplo.kafka.payment.transfer.adapter.KafkaMessagingService;
+import de.juplo.kafka.payment.transfer.adapter.TransferConsumer;
 import de.juplo.kafka.payment.transfer.domain.TransferService;
 import de.juplo.kafka.payment.transfer.ports.MessagingService;
 import de.juplo.kafka.payment.transfer.ports.TransferRepository;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
@@ -16,6 +20,8 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties
 import org.springframework.context.annotation.Bean;
 
 import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 
 @SpringBootApplication
@@ -34,6 +40,38 @@ public class TransferServiceApplication
     return new KafkaProducer<>(props);
   }
 
+  @Bean
+  KafkaConsumer<String, String> consumer(TransferServiceProperties properties)
+  {
+    Properties props = new Properties();
+    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers);
+    props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.groupId);
+    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+
+    return new KafkaConsumer<>(props);
+  }
+
+  @Bean(destroyMethod = "shutdown")
+  ExecutorService executorService()
+  {
+    return Executors.newFixedThreadPool(1);
+  }
+
+  @Bean(destroyMethod = "shutdown")
+  TransferConsumer transferConsumer(
+      TransferServiceProperties properties,
+      KafkaConsumer<String, String> consumer,
+      ExecutorService executorService,
+      ObjectMapper mapper,
+      TransferService transferService)
+  {
+    TransferConsumer transferConsumer =
+        new TransferConsumer(properties.topic, consumer, executorService, mapper, transferService);
+    transferConsumer.start();
+    return transferConsumer;
+  }
+
   @Bean
   MessagingService kafkaMessagingService(
       KafkaProducer<String, String> producer,
index ccd22a3..79473f8 100644 (file)
@@ -13,4 +13,5 @@ public class TransferServiceProperties
 {
   String bootstrapServers = "localhost:9092";
   String topic = "transfers";
+  String groupId = "transfers";
 }
diff --git a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java
new file mode 100644 (file)
index 0000000..17d91de
--- /dev/null
@@ -0,0 +1,135 @@
+package de.juplo.kafka.payment.transfer.adapter;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.payment.transfer.domain.Transfer;
+import de.juplo.kafka.payment.transfer.ports.HandleTransferUseCase;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.errors.WakeupException;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.ResponseBody;
+
+import java.time.Duration;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+
+
+@RequestMapping("/consumer")
+@ResponseBody
+@RequiredArgsConstructor
+@Slf4j
+public class TransferConsumer implements Runnable
+{
+  private final String topic;
+  private final KafkaConsumer<String, String> consumer;
+  private final ExecutorService executorService;
+  private final ObjectMapper mapper;
+  private final HandleTransferUseCase handleTransferUseCase;
+
+  private boolean running = false;
+  private Future<?> future = null;
+
+
+  @Override
+  public void run()
+  {
+    while (running)
+    {
+      try
+      {
+        ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
+        log.debug("polled {} records", records.count());
+
+        records.forEach(record ->
+        {
+          try
+          {
+            Transfer transfer = mapper.readValue(record.value(), Transfer.class);
+            handleTransferUseCase.handle(transfer);
+          }
+          catch (JsonProcessingException e)
+          {
+            log.error(
+                "ignoring invalid json in message #{} on {}/{}: {}",
+                record.offset(),
+                record.topic(),
+                record.partition(),
+                record.value());
+          }
+        });
+      }
+      catch (WakeupException e)
+      {
+        log.info("polling aborted!");
+      }
+    }
+
+    log.info("polling stopped");
+  }
+
+
+  @PostMapping("start")
+  public synchronized String start()
+  {
+    String result = "Started";
+
+    if (running)
+    {
+      stop();
+      result = "Restarted";
+    }
+
+    log.info("subscribing to topic {}", topic);
+    consumer.subscribe(Set.of(topic));
+    running = true;
+    future = executorService.submit(this);
+
+    return result;
+  }
+
+  @PostMapping("stop")
+  public synchronized String stop()
+  {
+    if (!running)
+    {
+      log.info("not running!");
+      return "Not running";
+    }
+
+    running = false;
+    if (!future.isDone())
+      consumer.wakeup();
+    log.info("waiting for the polling-loop to finish...");
+    try
+    {
+      future.get();
+    }
+    catch (InterruptedException|ExecutionException e)
+    {
+      log.error("Exception while joining polling task!", e);
+      return e.getMessage();
+    }
+    finally
+    {
+      future = null;
+      log.info("unsubscribing");
+      consumer.unsubscribe();
+    }
+
+    return "Stoped";
+  }
+
+  public synchronized void shutdown()
+  {
+    log.info("shutdown initiated!");
+    stop();
+    log.info("closing consumer");
+    consumer.close();
+  }
+}
index e20f9bf..f31d1a8 100644 (file)
@@ -3,7 +3,7 @@ package de.juplo.kafka.payment.transfer.adapter;
 
 import de.juplo.kafka.payment.transfer.domain.Transfer;
 import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase;
-import de.juplo.kafka.payment.transfer.ports.InitiateTransferUseCase;
+import de.juplo.kafka.payment.transfer.ports.ReceiveTransferUseCase;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.http.HttpStatus;
@@ -12,6 +12,7 @@ import org.springframework.http.ResponseEntity;
 import org.springframework.validation.FieldError;
 import org.springframework.web.bind.MethodArgumentNotValidException;
 import org.springframework.web.bind.annotation.*;
+import org.springframework.web.context.request.async.DeferredResult;
 
 import javax.servlet.http.HttpServletRequest;
 import javax.validation.Valid;
@@ -28,7 +29,7 @@ import java.util.Map;
 {
   public final static String PATH = "/transfers";
 
-  private final InitiateTransferUseCase initiateTransferUseCase;
+  private final ReceiveTransferUseCase receiveTransferUseCase;
   private final GetTransferUseCase getTransferUseCase;
 
 
@@ -36,7 +37,9 @@ import java.util.Map;
       path = PATH,
       consumes = MediaType.APPLICATION_JSON_VALUE,
       produces = MediaType.APPLICATION_JSON_VALUE)
-  public ResponseEntity<?> transfer(@Valid @RequestBody TransferDTO transferDTO)
+  public DeferredResult<ResponseEntity<?>> transfer(
+      HttpServletRequest request,
+      @Valid @RequestBody TransferDTO transferDTO)
   {
     Transfer transfer =
         Transfer
@@ -47,9 +50,25 @@ import java.util.Map;
             .amount(transferDTO.getAmount())
             .build();
 
-    initiateTransferUseCase.initiate(transfer);
+    DeferredResult<ResponseEntity<?>> result = new DeferredResult<>();
+
+    receiveTransferUseCase
+        .receive(transfer)
+        .thenApply(
+            $ ->
+                ResponseEntity
+                    .created(URI.create(PATH + "/" + transferDTO.getId()))
+                    .build())
+        .thenAccept(
+            responseEntity -> result.setResult(responseEntity))
+        .exceptionally(
+            e ->
+            {
+              result.setErrorResult(e);
+              return null;
+            });
 
-    return ResponseEntity.created(URI.create(PATH + "/" + transferDTO.getId())).build();
+    return result;
   }
 
   @GetMapping(
index 5556a1b..82891b7 100644 (file)
@@ -5,19 +5,29 @@ import lombok.Builder;
 import lombok.Data;
 import lombok.EqualsAndHashCode;
 
+import java.util.LinkedList;
+import java.util.List;
+
 
 @Data
 @Builder
-@EqualsAndHashCode(exclude = "state")
+@EqualsAndHashCode(exclude = { "state", "messages" })
 public class Transfer
 {
   public enum State
   {
-    SENT,
-    FAILED,
-    PENDING,
-    APPROVED,
-    REJECTED
+    RECEIVED(false),
+    INVALID(false),
+    CHECKED(false),
+    APPROVED(true),
+    REJECTED(true);
+
+    public final boolean foreign;
+
+    State(boolean foreign)
+    {
+      this.foreign = foreign;
+    }
   }
 
   private final long id;
@@ -26,4 +36,19 @@ public class Transfer
   private final int amount;
 
   private State state;
+
+  private final List<String> messages = new LinkedList<>();
+
+
+  public Transfer setState(State state)
+  {
+    this.state = state;
+    return this;
+  }
+
+  public Transfer addMessage(String message)
+  {
+    messages.add(message);
+    return this;
+  }
 }
index 3e6265f..0cbcd2c 100644 (file)
@@ -1,26 +1,53 @@
 package de.juplo.kafka.payment.transfer.domain;
 
 
-import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase;
-import de.juplo.kafka.payment.transfer.ports.InitiateTransferUseCase;
-import de.juplo.kafka.payment.transfer.ports.MessagingService;
-import de.juplo.kafka.payment.transfer.ports.TransferRepository;
+import de.juplo.kafka.payment.transfer.ports.*;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.common.TopicPartition;
 
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 
-import static de.juplo.kafka.payment.transfer.domain.Transfer.State.*;
+import static de.juplo.kafka.payment.transfer.domain.Transfer.State.CHECKED;
+import static de.juplo.kafka.payment.transfer.domain.Transfer.State.RECEIVED;
 
 
 @Slf4j
 @RequiredArgsConstructor
-public class TransferService implements InitiateTransferUseCase, GetTransferUseCase
+public class TransferService implements ReceiveTransferUseCase, HandleTransferUseCase, GetTransferUseCase
 {
   private final TransferRepository repository;
   private final MessagingService messagingService;
 
-  public synchronized void initiate(Transfer transfer)
+  public CompletableFuture<TopicPartition> receive(Transfer transfer)
+  {
+    transfer.setState(RECEIVED);
+    return messagingService.send(transfer);
+  }
+
+  @Override
+  public void handle(Transfer transfer)
+  {
+    Transfer.State state = transfer.getState();
+    switch (state)
+    {
+      case RECEIVED:
+        repository.store(transfer);
+        check(transfer);
+        break;
+
+      case CHECKED:
+        repository.store(transfer);
+        // TODO: What's next...?
+        break;
+
+      default:
+        log.warn("TODO: handle {} state {}", state.foreign ? "foreign" : "domain", state);
+    }
+  }
+
+  private void check(Transfer transfer)
   {
     repository
         .get(transfer.getId())
@@ -28,42 +55,14 @@ public class TransferService implements InitiateTransferUseCase, GetTransferUseC
             stored ->
             {
               if (!transfer.equals(stored))
-                throw new IllegalArgumentException(
-                    "Re-Initiation of transfer with different data: old=" +
-                        stored +
-                        ", new=" +
-                        transfer);
-
-              if (stored.getState() == FAILED)
-              {
-                repository.update(transfer.getId(), FAILED, SENT);
-                log.info("Resending faild transfer: " + stored);
-                send(transfer);
-              }
+                log.error("ignoring already received transfer with differing data: old={}, new={}", stored, transfer);
             },
             () ->
             {
-              send(transfer);
-              transfer.setState(SENT);
               repository.store(transfer);
-            });
-  }
-
-  private void send(Transfer transfer)
-  {
-    messagingService
-        .send(transfer)
-        .thenApply(
-            $ ->
-            {
-              repository.update(transfer.getId(), SENT, PENDING);
-              return null;
-            })
-        .exceptionally(
-            e ->
-            {
-              repository.update(transfer.getId(), SENT, FAILED);
-              return null;
+              // TODO: Do some time consuming checks...
+              transfer.setState(CHECKED);
+              messagingService.send(transfer);
             });
   }
 
index c5af531..ec293ad 100644 (file)
@@ -23,19 +23,7 @@ public class InMemoryTransferRepository implements TransferRepository
 
 
   @Override
-  public synchronized void store(Transfer transfer)
-  {
-    Optional
-        .ofNullable(map.get(transfer.getId()))
-        .ifPresentOrElse(
-            json ->
-            {
-              throw new IllegalArgumentException("Could not overwrite " + json + " with " + transfer);
-            },
-            () -> put(transfer));
-  }
-
-  private void put(Transfer transfer)
+  public void store(Transfer transfer)
   {
     try
     {
@@ -43,12 +31,12 @@ public class InMemoryTransferRepository implements TransferRepository
     }
     catch (JsonProcessingException e)
     {
-      log.error("Could not convert Transfer.class: {}", transfer, e);
+      throw new RuntimeException(e);
     }
   }
 
   @Override
-  public synchronized Optional<Transfer> get(Long id)
+  public Optional<Transfer> get(Long id)
   {
     return
         Optional
@@ -65,18 +53,6 @@ public class InMemoryTransferRepository implements TransferRepository
             });
   }
 
-  @Override
-  public synchronized void update(Long id, Transfer.State oldState, Transfer.State newState)
-  {
-    Transfer transfer = get(id).orElseThrow(() -> new IllegalArgumentException("Could not find transfer " + id));
-
-    if (transfer.getState() != oldState)
-      throw new IllegalArgumentException(("Unexpectd state for " + transfer + ", expected: " + oldState));
-
-    transfer.setState(newState);
-    put(transfer);
-  }
-
   @Override
   public void remove(Long id)
   {
diff --git a/src/main/java/de/juplo/kafka/payment/transfer/ports/HandleTransferUseCase.java b/src/main/java/de/juplo/kafka/payment/transfer/ports/HandleTransferUseCase.java
new file mode 100644 (file)
index 0000000..5d1a2b2
--- /dev/null
@@ -0,0 +1,9 @@
+package de.juplo.kafka.payment.transfer.ports;
+
+import de.juplo.kafka.payment.transfer.domain.Transfer;
+
+
+public interface HandleTransferUseCase
+{
+  void handle(Transfer transfer);
+}
diff --git a/src/main/java/de/juplo/kafka/payment/transfer/ports/InitiateTransferUseCase.java b/src/main/java/de/juplo/kafka/payment/transfer/ports/InitiateTransferUseCase.java
deleted file mode 100644 (file)
index b7dfc64..0000000
+++ /dev/null
@@ -1,9 +0,0 @@
-package de.juplo.kafka.payment.transfer.ports;
-
-import de.juplo.kafka.payment.transfer.domain.Transfer;
-
-
-public interface InitiateTransferUseCase
-{
-  void initiate(Transfer transfer);
-}
diff --git a/src/main/java/de/juplo/kafka/payment/transfer/ports/ReceiveTransferUseCase.java b/src/main/java/de/juplo/kafka/payment/transfer/ports/ReceiveTransferUseCase.java
new file mode 100644 (file)
index 0000000..f892fb3
--- /dev/null
@@ -0,0 +1,12 @@
+package de.juplo.kafka.payment.transfer.ports;
+
+import de.juplo.kafka.payment.transfer.domain.Transfer;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.concurrent.CompletableFuture;
+
+
+public interface ReceiveTransferUseCase
+{
+  CompletableFuture<TopicPartition> receive(Transfer transfer);
+}
index 2423ab3..e44a1d6 100644 (file)
@@ -11,7 +11,5 @@ public interface TransferRepository
 
   Optional<Transfer> get(Long id);
 
-  void update(Long id, Transfer.State oldState, Transfer.State newState) throws IllegalArgumentException;
-
   void remove(Long id);
 }
index 55f6c03..b7e8b86 100644 (file)
@@ -2,8 +2,8 @@ package de.juplo.kafka.payment.transfer.domain;
 
 import org.junit.jupiter.api.Test;
 
-import static de.juplo.kafka.payment.transfer.domain.Transfer.State.PENDING;
-import static de.juplo.kafka.payment.transfer.domain.Transfer.State.SENT;
+import static de.juplo.kafka.payment.transfer.domain.Transfer.State.CHECKED;
+import static de.juplo.kafka.payment.transfer.domain.Transfer.State.RECEIVED;
 import static org.assertj.core.api.Assertions.assertThat;
 
 
@@ -12,8 +12,8 @@ public class TransferTest
   @Test
   public void testEqualsIgnoresState()
   {
-    Transfer a = Transfer.builder().id(1).payer(1).payee(1).amount(1).state(SENT).build();
-    Transfer b = Transfer.builder().id(1).payer(1).payee(1).amount(1).state(PENDING).build();
+    Transfer a = Transfer.builder().id(1).payer(1).payee(1).amount(1).state(RECEIVED).build();
+    Transfer b = Transfer.builder().id(1).payer(1).payee(1).amount(1).state(CHECKED).build();
 
     assertThat(a).isEqualTo(b);
   }