From: Kai Moritz Date: Sat, 19 Jun 2021 07:11:08 +0000 (+0200) Subject: Introduced different Events for the creation and the state-changes X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=26809d379a0e024017f70db8c70382f94faf98b6;p=demos%2Fkafka%2Fdemos-kafka-payment-system-transfer Introduced different Events for the creation and the state-changes --- diff --git a/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java b/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java index 02842e5..58a3af2 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java @@ -67,7 +67,14 @@ public class TransferServiceApplication TransferService transferService) { TransferConsumer transferConsumer = - new TransferConsumer(properties.topic, consumer, executorService, mapper, transferService); + new TransferConsumer( + properties.topic, + consumer, + executorService, + mapper, + transferService, + transferService, + transferService); transferConsumer.start(); return transferConsumer; } diff --git a/src/main/java/de/juplo/kafka/payment/transfer/adapter/EventType.java b/src/main/java/de/juplo/kafka/payment/transfer/adapter/EventType.java new file mode 100644 index 0000000..aa4b897 --- /dev/null +++ b/src/main/java/de/juplo/kafka/payment/transfer/adapter/EventType.java @@ -0,0 +1,9 @@ +package de.juplo.kafka.payment.transfer.adapter; + +public abstract class EventType +{ + public final static String HEADER = "$"; + + public final static byte NEW_TRANSFER = 1; + public final static byte TRANSFER_STATE_CHANGED = 2; +} diff --git a/src/main/java/de/juplo/kafka/payment/transfer/adapter/KafkaMessagingService.java b/src/main/java/de/juplo/kafka/payment/transfer/adapter/KafkaMessagingService.java index 3161af3..6da7937 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/adapter/KafkaMessagingService.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/adapter/KafkaMessagingService.java @@ -24,26 +24,38 @@ public class KafkaMessagingService implements MessagingService @Override public CompletableFuture send(Transfer transfer) + { + return send(transfer.getId(), EventType.NEW_TRANSFER, NewTransferEvent.ofTransfer(transfer)); + } + + public CompletableFuture send(Long id, Transfer.State state) + { + return send(id, EventType.TRANSFER_STATE_CHANGED, new TransferStateChangedEvent(id, state)); + } + + private CompletableFuture send(Long id, byte eventType, Object payload) { try { CompletableFuture future = new CompletableFuture<>(); + ProducerRecord record = new ProducerRecord<>( topic, - Long.toString(transfer.getId()), - mapper.writeValueAsString(transfer)); + Long.toString(id), + mapper.writeValueAsString(payload)); + record.headers().add(EventType.HEADER, new byte[] { eventType }); producer.send(record, (metadata, exception) -> { if (metadata != null) { - log.debug("Sent {} to {}/{}:{}", transfer, metadata.topic(), metadata.partition(), metadata.offset()); + log.debug("Sent {} to {}/{}:{}", payload, metadata.topic(), metadata.partition(), metadata.offset()); future.complete(new TopicPartition(metadata.topic(), metadata.partition())); } else { - log.error("Could not send {}: {}", transfer, exception.getMessage()); + log.error("Could not send {}: {}", payload, exception.getMessage()); future.completeExceptionally(exception); } }); @@ -52,7 +64,7 @@ public class KafkaMessagingService implements MessagingService } catch (JsonProcessingException e) { - throw new RuntimeException("Could not convert " + transfer, e); + throw new RuntimeException("Could not convert " + payload, e); } } diff --git a/src/main/java/de/juplo/kafka/payment/transfer/adapter/NewTransferEvent.java b/src/main/java/de/juplo/kafka/payment/transfer/adapter/NewTransferEvent.java new file mode 100644 index 0000000..0c5e271 --- /dev/null +++ b/src/main/java/de/juplo/kafka/payment/transfer/adapter/NewTransferEvent.java @@ -0,0 +1,42 @@ +package de.juplo.kafka.payment.transfer.adapter; + +import de.juplo.kafka.payment.transfer.domain.Transfer; +import lombok.Builder; +import lombok.Data; +import lombok.EqualsAndHashCode; + + +@Data +@EqualsAndHashCode +@Builder +public class NewTransferEvent +{ + private Long id; + private Long payer; + private Long payee; + private Integer amount; + + public Transfer toTransfer() + { + return + Transfer + .builder() + .id(id) + .payer(payer) + .payee(payee) + .amount(amount) + .build(); + } + + public static NewTransferEvent ofTransfer(Transfer transfer) + { + return + NewTransferEvent + .builder() + .id(transfer.getId()) + .payer(transfer.getPayer()) + .payee(transfer.getPayee()) + .amount(transfer.getAmount()) + .build(); + } +} diff --git a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java index e7c2430..24f3e88 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java @@ -3,7 +3,9 @@ package de.juplo.kafka.payment.transfer.adapter; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import de.juplo.kafka.payment.transfer.domain.Transfer; -import de.juplo.kafka.payment.transfer.ports.HandleTransferUseCase; +import de.juplo.kafka.payment.transfer.ports.CreateTransferUseCase; +import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase; +import de.juplo.kafka.payment.transfer.ports.HandleStateChangeUseCase; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecords; @@ -19,6 +21,8 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import static de.juplo.kafka.payment.transfer.domain.Transfer.State.CREATED; + @RequestMapping("/consumer") @ResponseBody @@ -30,7 +34,9 @@ public class TransferConsumer implements Runnable private final KafkaConsumer consumer; private final ExecutorService executorService; private final ObjectMapper mapper; - private final HandleTransferUseCase handleTransferUseCase; + private final GetTransferUseCase getTransferUseCase; + private final CreateTransferUseCase createTransferUseCase; + private final HandleStateChangeUseCase handleStateChangeUseCase; private boolean running = false; private Future future = null; @@ -51,8 +57,28 @@ public class TransferConsumer implements Runnable { try { - Transfer transfer = mapper.readValue(record.value(), Transfer.class); - handleTransferUseCase.handle(transfer); + byte eventType = record.headers().lastHeader(EventType.HEADER).value()[0]; + + switch (eventType) + { + case EventType.NEW_TRANSFER: + + NewTransferEvent newTransferEvent = + mapper.readValue(record.value(), NewTransferEvent.class); + createTransferUseCase.create(newTransferEvent.toTransfer().setState(CREATED)); + break; + + case EventType.TRANSFER_STATE_CHANGED: + + TransferStateChangedEvent stateChangedEvent = + mapper.readValue(record.value(), TransferStateChangedEvent.class); + getTransferUseCase + .get(stateChangedEvent.getId()) + .ifPresentOrElse( + transfer -> handleStateChangeUseCase.handle(transfer.setState(stateChangedEvent.getState())), + () -> log.error("unknown transfer: {}", stateChangedEvent.getId())); + break; + } } catch (JsonProcessingException e) { diff --git a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferController.java b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferController.java index 8240310..b9a2fb0 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferController.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferController.java @@ -63,7 +63,6 @@ import java.util.concurrent.CompletableFuture; .payer(transferDTO.getPayer()) .payee(transferDTO.getPayee()) .amount(transferDTO.getAmount()) - .state(Transfer.State.RECEIVED) .build()) .thenApply($ -> ResponseEntity diff --git a/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferStateChangedEvent.java b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferStateChangedEvent.java new file mode 100644 index 0000000..cdb4178 --- /dev/null +++ b/src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferStateChangedEvent.java @@ -0,0 +1,20 @@ +package de.juplo.kafka.payment.transfer.adapter; + + +import de.juplo.kafka.payment.transfer.domain.Transfer; +import lombok.Builder; +import lombok.Data; +import lombok.EqualsAndHashCode; + +import java.util.LinkedList; +import java.util.List; + + +@Data +@EqualsAndHashCode +@Builder +public class TransferStateChangedEvent +{ + private long id; + private Transfer.State state; +} diff --git a/src/main/java/de/juplo/kafka/payment/transfer/domain/Transfer.java b/src/main/java/de/juplo/kafka/payment/transfer/domain/Transfer.java index cc207d9..cac0f19 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/domain/Transfer.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/domain/Transfer.java @@ -16,7 +16,6 @@ public class Transfer { public enum State { - RECEIVED(false), CREATED(false), INVALID(false), CHECKED(false), diff --git a/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferService.java b/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferService.java index 90ef682..7a2349a 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferService.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/domain/TransferService.java @@ -1,27 +1,24 @@ package de.juplo.kafka.payment.transfer.domain; -import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase; -import de.juplo.kafka.payment.transfer.ports.HandleTransferUseCase; -import de.juplo.kafka.payment.transfer.ports.MessagingService; -import de.juplo.kafka.payment.transfer.ports.TransferRepository; +import de.juplo.kafka.payment.transfer.ports.*; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import java.util.Optional; -import static de.juplo.kafka.payment.transfer.domain.Transfer.State.CHECKED; -import static de.juplo.kafka.payment.transfer.domain.Transfer.State.CREATED; +import static de.juplo.kafka.payment.transfer.domain.Transfer.State.*; @Slf4j @RequiredArgsConstructor -public class TransferService implements HandleTransferUseCase, GetTransferUseCase +public class TransferService implements CreateTransferUseCase, HandleStateChangeUseCase, GetTransferUseCase { private final TransferRepository repository; private final MessagingService messagingService; - private void create(Transfer transfer) + @Override + public void create(Transfer transfer) { repository .get(transfer.getId()) @@ -30,8 +27,7 @@ public class TransferService implements HandleTransferUseCase, GetTransferUseCas () -> { repository.store(transfer); - transfer.setState(CREATED); - messagingService.send(transfer); + messagingService.send(transfer.getId(), CREATED); }); } @@ -41,11 +37,6 @@ public class TransferService implements HandleTransferUseCase, GetTransferUseCas Transfer.State state = transfer.getState(); switch (state) { - case RECEIVED: - repository.store(transfer); - create(transfer); - break; - case CREATED: repository.store(transfer); check(transfer); @@ -64,8 +55,7 @@ public class TransferService implements HandleTransferUseCase, GetTransferUseCas private void check(Transfer transfer) { // TODO: Do some time consuming checks... - transfer.setState(CHECKED); - messagingService.send(transfer); + messagingService.send(transfer.getId(), CHECKED); } public Optional get(Long id) diff --git a/src/main/java/de/juplo/kafka/payment/transfer/ports/CreateTransferUseCase.java b/src/main/java/de/juplo/kafka/payment/transfer/ports/CreateTransferUseCase.java new file mode 100644 index 0000000..34ae0e9 --- /dev/null +++ b/src/main/java/de/juplo/kafka/payment/transfer/ports/CreateTransferUseCase.java @@ -0,0 +1,9 @@ +package de.juplo.kafka.payment.transfer.ports; + +import de.juplo.kafka.payment.transfer.domain.Transfer; + + +public interface CreateTransferUseCase +{ + void create(Transfer transfer); +} diff --git a/src/main/java/de/juplo/kafka/payment/transfer/ports/HandleStateChangeUseCase.java b/src/main/java/de/juplo/kafka/payment/transfer/ports/HandleStateChangeUseCase.java new file mode 100644 index 0000000..2e75fc0 --- /dev/null +++ b/src/main/java/de/juplo/kafka/payment/transfer/ports/HandleStateChangeUseCase.java @@ -0,0 +1,9 @@ +package de.juplo.kafka.payment.transfer.ports; + +import de.juplo.kafka.payment.transfer.domain.Transfer; + + +public interface HandleStateChangeUseCase +{ + void handle(Transfer transfer); +} diff --git a/src/main/java/de/juplo/kafka/payment/transfer/ports/HandleTransferUseCase.java b/src/main/java/de/juplo/kafka/payment/transfer/ports/HandleTransferUseCase.java deleted file mode 100644 index 5d1a2b2..0000000 --- a/src/main/java/de/juplo/kafka/payment/transfer/ports/HandleTransferUseCase.java +++ /dev/null @@ -1,9 +0,0 @@ -package de.juplo.kafka.payment.transfer.ports; - -import de.juplo.kafka.payment.transfer.domain.Transfer; - - -public interface HandleTransferUseCase -{ - void handle(Transfer transfer); -} diff --git a/src/main/java/de/juplo/kafka/payment/transfer/ports/MessagingService.java b/src/main/java/de/juplo/kafka/payment/transfer/ports/MessagingService.java index 4037a90..b651c20 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/ports/MessagingService.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/ports/MessagingService.java @@ -8,4 +8,5 @@ import java.util.concurrent.CompletableFuture; public interface MessagingService { CompletableFuture send(Transfer transfer); + CompletableFuture send(Long id, Transfer.State state); } diff --git a/src/test/java/de/juplo/kafka/payment/transfer/domain/TransferTest.java b/src/test/java/de/juplo/kafka/payment/transfer/domain/TransferTest.java index b7e8b86..6e2d21b 100644 --- a/src/test/java/de/juplo/kafka/payment/transfer/domain/TransferTest.java +++ b/src/test/java/de/juplo/kafka/payment/transfer/domain/TransferTest.java @@ -2,8 +2,7 @@ package de.juplo.kafka.payment.transfer.domain; import org.junit.jupiter.api.Test; -import static de.juplo.kafka.payment.transfer.domain.Transfer.State.CHECKED; -import static de.juplo.kafka.payment.transfer.domain.Transfer.State.RECEIVED; +import static de.juplo.kafka.payment.transfer.domain.Transfer.State.*; import static org.assertj.core.api.Assertions.assertThat; @@ -12,7 +11,7 @@ public class TransferTest @Test public void testEqualsIgnoresState() { - Transfer a = Transfer.builder().id(1).payer(1).payee(1).amount(1).state(RECEIVED).build(); + Transfer a = Transfer.builder().id(1).payer(1).payee(1).amount(1).state(CREATED).build(); Transfer b = Transfer.builder().id(1).payer(1).payee(1).amount(1).state(CHECKED).build(); assertThat(a).isEqualTo(b);