From 4467c5240397a47b181106a0ae902ed1b71d0c5d Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 13 Jun 2021 23:40:56 +0200 Subject: [PATCH] TransferRepository does not need any synchronization * Only TransferService stores data in the repository * Since alle instances of Transfer, that are handled by TransferService are received through a single topic, no synchronization is needed at all in the repository. * This is, because records, that are received from a topic are guaranteed to be processed one after the other. * The topic simply is the single source for processing requests and a KafkaConsumer never handles multiple records in parallel. * Note: This implementation is not ready to run on multiple threads or nodes! --- .../transfer/TransferServiceApplication.java | 38 +++++ .../transfer/TransferServiceProperties.java | 1 + .../transfer/adapter/TransferConsumer.java | 135 ++++++++++++++++++ .../transfer/adapter/TransferController.java | 29 +++- .../payment/transfer/domain/Transfer.java | 37 ++++- .../transfer/domain/TransferService.java | 77 +++++----- .../InMemoryTransferRepository.java | 30 +--- ...seCase.java => HandleTransferUseCase.java} | 4 +- .../ports/ReceiveTransferUseCase.java | 12 ++ .../transfer/ports/TransferRepository.java | 2 - .../payment/transfer/domain/TransferTest.java | 8 +- 11 files changed, 288 insertions(+), 85 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java rename src/main/java/de/juplo/kafka/payment/transfer/ports/{InitiateTransferUseCase.java => HandleTransferUseCase.java} (58%) create mode 100644 src/main/java/de/juplo/kafka/payment/transfer/ports/ReceiveTransferUseCase.java diff --git a/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java b/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java index 65f683c..02842e5 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java @@ -3,12 +3,16 @@ package de.juplo.kafka.payment.transfer; import com.fasterxml.jackson.databind.ObjectMapper; import de.juplo.kafka.payment.transfer.adapter.KafkaMessagingService; +import de.juplo.kafka.payment.transfer.adapter.TransferConsumer; import de.juplo.kafka.payment.transfer.domain.TransferService; import de.juplo.kafka.payment.transfer.ports.MessagingService; import de.juplo.kafka.payment.transfer.ports.TransferRepository; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @@ -16,6 +20,8 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.annotation.Bean; import java.util.Properties; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; @SpringBootApplication @@ -34,6 +40,38 @@ public class TransferServiceApplication return new KafkaProducer<>(props); } + @Bean + KafkaConsumer consumer(TransferServiceProperties properties) + { + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.bootstrapServers); + props.put(ConsumerConfig.GROUP_ID_CONFIG, properties.groupId); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + + return new KafkaConsumer<>(props); + } + + @Bean(destroyMethod = "shutdown") + ExecutorService executorService() + { + return Executors.newFixedThreadPool(1); + } + + @Bean(destroyMethod = "shutdown") + TransferConsumer transferConsumer( + TransferServiceProperties properties, + KafkaConsumer consumer, + ExecutorService executorService, + ObjectMapper mapper, + TransferService transferService) + { + TransferConsumer transferConsumer = + new TransferConsumer(properties.topic, consumer, executorService, mapper, transferService); + transferConsumer.start(); + return transferConsumer; + } + @Bean MessagingService kafkaMessagingService( KafkaProducer producer, diff --git a/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceProperties.java b/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceProperties.java index ccd22a3..79473f8 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceProperties.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceProperties.java @@ -13,4 +13,5 @@ public class TransferServiceProperties { String bootstrapServers = "localhost:9092"; String topic = "transfers"; + String groupId = "transfers"; } diff --git a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java new file mode 100644 index 0000000..17d91de --- /dev/null +++ b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java @@ -0,0 +1,135 @@ +package de.juplo.kafka.payment.transfer.adapter; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import de.juplo.kafka.payment.transfer.domain.Transfer; +import de.juplo.kafka.payment.transfer.ports.HandleTransferUseCase; +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.errors.WakeupException; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.ResponseBody; + +import java.time.Duration; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + + +@RequestMapping("/consumer") +@ResponseBody +@RequiredArgsConstructor +@Slf4j +public class TransferConsumer implements Runnable +{ + private final String topic; + private final KafkaConsumer consumer; + private final ExecutorService executorService; + private final ObjectMapper mapper; + private final HandleTransferUseCase handleTransferUseCase; + + private boolean running = false; + private Future future = null; + + + @Override + public void run() + { + while (running) + { + try + { + ConsumerRecords records = consumer.poll(Duration.ofSeconds(1)); + log.debug("polled {} records", records.count()); + + records.forEach(record -> + { + try + { + Transfer transfer = mapper.readValue(record.value(), Transfer.class); + handleTransferUseCase.handle(transfer); + } + catch (JsonProcessingException e) + { + log.error( + "ignoring invalid json in message #{} on {}/{}: {}", + record.offset(), + record.topic(), + record.partition(), + record.value()); + } + }); + } + catch (WakeupException e) + { + log.info("polling aborted!"); + } + } + + log.info("polling stopped"); + } + + + @PostMapping("start") + public synchronized String start() + { + String result = "Started"; + + if (running) + { + stop(); + result = "Restarted"; + } + + log.info("subscribing to topic {}", topic); + consumer.subscribe(Set.of(topic)); + running = true; + future = executorService.submit(this); + + return result; + } + + @PostMapping("stop") + public synchronized String stop() + { + if (!running) + { + log.info("not running!"); + return "Not running"; + } + + running = false; + if (!future.isDone()) + consumer.wakeup(); + log.info("waiting for the polling-loop to finish..."); + try + { + future.get(); + } + catch (InterruptedException|ExecutionException e) + { + log.error("Exception while joining polling task!", e); + return e.getMessage(); + } + finally + { + future = null; + log.info("unsubscribing"); + consumer.unsubscribe(); + } + + return "Stoped"; + } + + public synchronized void shutdown() + { + log.info("shutdown initiated!"); + stop(); + log.info("closing consumer"); + consumer.close(); + } +} diff --git a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferController.java b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferController.java index e20f9bf..f31d1a8 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferController.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferController.java @@ -3,7 +3,7 @@ package de.juplo.kafka.payment.transfer.adapter; import de.juplo.kafka.payment.transfer.domain.Transfer; import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase; -import de.juplo.kafka.payment.transfer.ports.InitiateTransferUseCase; +import de.juplo.kafka.payment.transfer.ports.ReceiveTransferUseCase; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.http.HttpStatus; @@ -12,6 +12,7 @@ import org.springframework.http.ResponseEntity; import org.springframework.validation.FieldError; import org.springframework.web.bind.MethodArgumentNotValidException; import org.springframework.web.bind.annotation.*; +import org.springframework.web.context.request.async.DeferredResult; import javax.servlet.http.HttpServletRequest; import javax.validation.Valid; @@ -28,7 +29,7 @@ import java.util.Map; { public final static String PATH = "/transfers"; - private final InitiateTransferUseCase initiateTransferUseCase; + private final ReceiveTransferUseCase receiveTransferUseCase; private final GetTransferUseCase getTransferUseCase; @@ -36,7 +37,9 @@ import java.util.Map; path = PATH, consumes = MediaType.APPLICATION_JSON_VALUE, produces = MediaType.APPLICATION_JSON_VALUE) - public ResponseEntity transfer(@Valid @RequestBody TransferDTO transferDTO) + public DeferredResult> transfer( + HttpServletRequest request, + @Valid @RequestBody TransferDTO transferDTO) { Transfer transfer = Transfer @@ -47,9 +50,25 @@ import java.util.Map; .amount(transferDTO.getAmount()) .build(); - initiateTransferUseCase.initiate(transfer); + DeferredResult> result = new DeferredResult<>(); + + receiveTransferUseCase + .receive(transfer) + .thenApply( + $ -> + ResponseEntity + .created(URI.create(PATH + "/" + transferDTO.getId())) + .build()) + .thenAccept( + responseEntity -> result.setResult(responseEntity)) + .exceptionally( + e -> + { + result.setErrorResult(e); + return null; + }); - return ResponseEntity.created(URI.create(PATH + "/" + transferDTO.getId())).build(); + return result; } @GetMapping( diff --git a/src/main/java/de/juplo/kafka/payment/transfer/domain/Transfer.java b/src/main/java/de/juplo/kafka/payment/transfer/domain/Transfer.java index 5556a1b..82891b7 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/domain/Transfer.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/domain/Transfer.java @@ -5,19 +5,29 @@ import lombok.Builder; import lombok.Data; import lombok.EqualsAndHashCode; +import java.util.LinkedList; +import java.util.List; + @Data @Builder -@EqualsAndHashCode(exclude = "state") +@EqualsAndHashCode(exclude = { "state", "messages" }) public class Transfer { public enum State { - SENT, - FAILED, - PENDING, - APPROVED, - REJECTED + RECEIVED(false), + INVALID(false), + CHECKED(false), + APPROVED(true), + REJECTED(true); + + public final boolean foreign; + + State(boolean foreign) + { + this.foreign = foreign; + } } private final long id; @@ -26,4 +36,19 @@ public class Transfer private final int amount; private State state; + + private final List messages = new LinkedList<>(); + + + public Transfer setState(State state) + { + this.state = state; + return this; + } + + public Transfer addMessage(String message) + { + messages.add(message); + return this; + } } diff --git a/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferService.java b/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferService.java index 3e6265f..0cbcd2c 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferService.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferService.java @@ -1,26 +1,53 @@ package de.juplo.kafka.payment.transfer.domain; -import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase; -import de.juplo.kafka.payment.transfer.ports.InitiateTransferUseCase; -import de.juplo.kafka.payment.transfer.ports.MessagingService; -import de.juplo.kafka.payment.transfer.ports.TransferRepository; +import de.juplo.kafka.payment.transfer.ports.*; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.common.TopicPartition; import java.util.Optional; +import java.util.concurrent.CompletableFuture; -import static de.juplo.kafka.payment.transfer.domain.Transfer.State.*; +import static de.juplo.kafka.payment.transfer.domain.Transfer.State.CHECKED; +import static de.juplo.kafka.payment.transfer.domain.Transfer.State.RECEIVED; @Slf4j @RequiredArgsConstructor -public class TransferService implements InitiateTransferUseCase, GetTransferUseCase +public class TransferService implements ReceiveTransferUseCase, HandleTransferUseCase, GetTransferUseCase { private final TransferRepository repository; private final MessagingService messagingService; - public synchronized void initiate(Transfer transfer) + public CompletableFuture receive(Transfer transfer) + { + transfer.setState(RECEIVED); + return messagingService.send(transfer); + } + + @Override + public void handle(Transfer transfer) + { + Transfer.State state = transfer.getState(); + switch (state) + { + case RECEIVED: + repository.store(transfer); + check(transfer); + break; + + case CHECKED: + repository.store(transfer); + // TODO: What's next...? + break; + + default: + log.warn("TODO: handle {} state {}", state.foreign ? "foreign" : "domain", state); + } + } + + private void check(Transfer transfer) { repository .get(transfer.getId()) @@ -28,42 +55,14 @@ public class TransferService implements InitiateTransferUseCase, GetTransferUseC stored -> { if (!transfer.equals(stored)) - throw new IllegalArgumentException( - "Re-Initiation of transfer with different data: old=" + - stored + - ", new=" + - transfer); - - if (stored.getState() == FAILED) - { - repository.update(transfer.getId(), FAILED, SENT); - log.info("Resending faild transfer: " + stored); - send(transfer); - } + log.error("ignoring already received transfer with differing data: old={}, new={}", stored, transfer); }, () -> { - send(transfer); - transfer.setState(SENT); repository.store(transfer); - }); - } - - private void send(Transfer transfer) - { - messagingService - .send(transfer) - .thenApply( - $ -> - { - repository.update(transfer.getId(), SENT, PENDING); - return null; - }) - .exceptionally( - e -> - { - repository.update(transfer.getId(), SENT, FAILED); - return null; + // TODO: Do some time consuming checks... + transfer.setState(CHECKED); + messagingService.send(transfer); }); } diff --git a/src/main/java/de/juplo/kafka/payment/transfer/persistence/InMemoryTransferRepository.java b/src/main/java/de/juplo/kafka/payment/transfer/persistence/InMemoryTransferRepository.java index c5af531..ec293ad 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/persistence/InMemoryTransferRepository.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/persistence/InMemoryTransferRepository.java @@ -23,19 +23,7 @@ public class InMemoryTransferRepository implements TransferRepository @Override - public synchronized void store(Transfer transfer) - { - Optional - .ofNullable(map.get(transfer.getId())) - .ifPresentOrElse( - json -> - { - throw new IllegalArgumentException("Could not overwrite " + json + " with " + transfer); - }, - () -> put(transfer)); - } - - private void put(Transfer transfer) + public void store(Transfer transfer) { try { @@ -43,12 +31,12 @@ public class InMemoryTransferRepository implements TransferRepository } catch (JsonProcessingException e) { - log.error("Could not convert Transfer.class: {}", transfer, e); + throw new RuntimeException(e); } } @Override - public synchronized Optional get(Long id) + public Optional get(Long id) { return Optional @@ -65,18 +53,6 @@ public class InMemoryTransferRepository implements TransferRepository }); } - @Override - public synchronized void update(Long id, Transfer.State oldState, Transfer.State newState) - { - Transfer transfer = get(id).orElseThrow(() -> new IllegalArgumentException("Could not find transfer " + id)); - - if (transfer.getState() != oldState) - throw new IllegalArgumentException(("Unexpectd state for " + transfer + ", expected: " + oldState)); - - transfer.setState(newState); - put(transfer); - } - @Override public void remove(Long id) { diff --git a/src/main/java/de/juplo/kafka/payment/transfer/ports/InitiateTransferUseCase.java b/src/main/java/de/juplo/kafka/payment/transfer/ports/HandleTransferUseCase.java similarity index 58% rename from src/main/java/de/juplo/kafka/payment/transfer/ports/InitiateTransferUseCase.java rename to src/main/java/de/juplo/kafka/payment/transfer/ports/HandleTransferUseCase.java index b7dfc64..5d1a2b2 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/ports/InitiateTransferUseCase.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/ports/HandleTransferUseCase.java @@ -3,7 +3,7 @@ package de.juplo.kafka.payment.transfer.ports; import de.juplo.kafka.payment.transfer.domain.Transfer; -public interface InitiateTransferUseCase +public interface HandleTransferUseCase { - void initiate(Transfer transfer); + void handle(Transfer transfer); } diff --git a/src/main/java/de/juplo/kafka/payment/transfer/ports/ReceiveTransferUseCase.java b/src/main/java/de/juplo/kafka/payment/transfer/ports/ReceiveTransferUseCase.java new file mode 100644 index 0000000..f892fb3 --- /dev/null +++ b/src/main/java/de/juplo/kafka/payment/transfer/ports/ReceiveTransferUseCase.java @@ -0,0 +1,12 @@ +package de.juplo.kafka.payment.transfer.ports; + +import de.juplo.kafka.payment.transfer.domain.Transfer; +import org.apache.kafka.common.TopicPartition; + +import java.util.concurrent.CompletableFuture; + + +public interface ReceiveTransferUseCase +{ + CompletableFuture receive(Transfer transfer); +} diff --git a/src/main/java/de/juplo/kafka/payment/transfer/ports/TransferRepository.java b/src/main/java/de/juplo/kafka/payment/transfer/ports/TransferRepository.java index 2423ab3..e44a1d6 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/ports/TransferRepository.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/ports/TransferRepository.java @@ -11,7 +11,5 @@ public interface TransferRepository Optional get(Long id); - void update(Long id, Transfer.State oldState, Transfer.State newState) throws IllegalArgumentException; - void remove(Long id); } diff --git a/src/test/java/de/juplo/kafka/payment/transfer/domain/TransferTest.java b/src/test/java/de/juplo/kafka/payment/transfer/domain/TransferTest.java index 55f6c03..b7e8b86 100644 --- a/src/test/java/de/juplo/kafka/payment/transfer/domain/TransferTest.java +++ b/src/test/java/de/juplo/kafka/payment/transfer/domain/TransferTest.java @@ -2,8 +2,8 @@ package de.juplo.kafka.payment.transfer.domain; import org.junit.jupiter.api.Test; -import static de.juplo.kafka.payment.transfer.domain.Transfer.State.PENDING; -import static de.juplo.kafka.payment.transfer.domain.Transfer.State.SENT; +import static de.juplo.kafka.payment.transfer.domain.Transfer.State.CHECKED; +import static de.juplo.kafka.payment.transfer.domain.Transfer.State.RECEIVED; import static org.assertj.core.api.Assertions.assertThat; @@ -12,8 +12,8 @@ public class TransferTest @Test public void testEqualsIgnoresState() { - Transfer a = Transfer.builder().id(1).payer(1).payee(1).amount(1).state(SENT).build(); - Transfer b = Transfer.builder().id(1).payer(1).payee(1).amount(1).state(PENDING).build(); + Transfer a = Transfer.builder().id(1).payer(1).payee(1).amount(1).state(RECEIVED).build(); + Transfer b = Transfer.builder().id(1).payer(1).payee(1).amount(1).state(CHECKED).build(); assertThat(a).isEqualTo(b); } -- 2.20.1