TransferService transferService)
{
TransferConsumer transferConsumer =
- new TransferConsumer(properties.topic, consumer, executorService, mapper, transferService);
+ new TransferConsumer(
+ properties.topic,
+ consumer,
+ executorService,
+ mapper,
+ transferService,
+ transferService,
+ transferService);
transferConsumer.start();
return transferConsumer;
}
--- /dev/null
+package de.juplo.kafka.payment.transfer.adapter;
+
+public abstract class EventType
+{
+ public final static String HEADER = "$";
+
+ public final static byte NEW_TRANSFER = 1;
+ public final static byte TRANSFER_STATE_CHANGED = 2;
+}
@Override
public CompletableFuture<?> send(Transfer transfer)
+ {
+ return send(transfer.getId(), EventType.NEW_TRANSFER, NewTransferEvent.ofTransfer(transfer));
+ }
+
+ public CompletableFuture<?> send(Long id, Transfer.State state)
+ {
+ return send(id, EventType.TRANSFER_STATE_CHANGED, new TransferStateChangedEvent(id, state));
+ }
+
+ private CompletableFuture send(Long id, byte eventType, Object payload)
{
try
{
CompletableFuture<TopicPartition> future = new CompletableFuture<>();
+
ProducerRecord<String, String> record =
new ProducerRecord<>(
topic,
- Long.toString(transfer.getId()),
- mapper.writeValueAsString(transfer));
+ Long.toString(id),
+ mapper.writeValueAsString(payload));
+ record.headers().add(EventType.HEADER, new byte[] { eventType });
producer.send(record, (metadata, exception) ->
{
if (metadata != null)
{
- log.debug("Sent {} to {}/{}:{}", transfer, metadata.topic(), metadata.partition(), metadata.offset());
+ log.debug("Sent {} to {}/{}:{}", payload, metadata.topic(), metadata.partition(), metadata.offset());
future.complete(new TopicPartition(metadata.topic(), metadata.partition()));
}
else
{
- log.error("Could not send {}: {}", transfer, exception.getMessage());
+ log.error("Could not send {}: {}", payload, exception.getMessage());
future.completeExceptionally(exception);
}
});
}
catch (JsonProcessingException e)
{
- throw new RuntimeException("Could not convert " + transfer, e);
+ throw new RuntimeException("Could not convert " + payload, e);
}
}
--- /dev/null
+package de.juplo.kafka.payment.transfer.adapter;
+
+import de.juplo.kafka.payment.transfer.domain.Transfer;
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+
+@Data
+@EqualsAndHashCode
+@Builder
+public class NewTransferEvent
+{
+ private Long id;
+ private Long payer;
+ private Long payee;
+ private Integer amount;
+
+ public Transfer toTransfer()
+ {
+ return
+ Transfer
+ .builder()
+ .id(id)
+ .payer(payer)
+ .payee(payee)
+ .amount(amount)
+ .build();
+ }
+
+ public static NewTransferEvent ofTransfer(Transfer transfer)
+ {
+ return
+ NewTransferEvent
+ .builder()
+ .id(transfer.getId())
+ .payer(transfer.getPayer())
+ .payee(transfer.getPayee())
+ .amount(transfer.getAmount())
+ .build();
+ }
+}
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import de.juplo.kafka.payment.transfer.domain.Transfer;
-import de.juplo.kafka.payment.transfer.ports.HandleTransferUseCase;
+import de.juplo.kafka.payment.transfer.ports.CreateTransferUseCase;
+import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase;
+import de.juplo.kafka.payment.transfer.ports.HandleStateChangeUseCase;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import static de.juplo.kafka.payment.transfer.domain.Transfer.State.CREATED;
+
@RequestMapping("/consumer")
@ResponseBody
private final KafkaConsumer<String, String> consumer;
private final ExecutorService executorService;
private final ObjectMapper mapper;
- private final HandleTransferUseCase handleTransferUseCase;
+ private final GetTransferUseCase getTransferUseCase;
+ private final CreateTransferUseCase createTransferUseCase;
+ private final HandleStateChangeUseCase handleStateChangeUseCase;
private boolean running = false;
private Future<?> future = null;
{
try
{
- Transfer transfer = mapper.readValue(record.value(), Transfer.class);
- handleTransferUseCase.handle(transfer);
+ byte eventType = record.headers().lastHeader(EventType.HEADER).value()[0];
+
+ switch (eventType)
+ {
+ case EventType.NEW_TRANSFER:
+
+ NewTransferEvent newTransferEvent =
+ mapper.readValue(record.value(), NewTransferEvent.class);
+ createTransferUseCase.create(newTransferEvent.toTransfer().setState(CREATED));
+ break;
+
+ case EventType.TRANSFER_STATE_CHANGED:
+
+ TransferStateChangedEvent stateChangedEvent =
+ mapper.readValue(record.value(), TransferStateChangedEvent.class);
+ getTransferUseCase
+ .get(stateChangedEvent.getId())
+ .ifPresentOrElse(
+ transfer -> handleStateChangeUseCase.handle(transfer.setState(stateChangedEvent.getState())),
+ () -> log.error("unknown transfer: {}", stateChangedEvent.getId()));
+ break;
+ }
}
catch (JsonProcessingException e)
{
.payer(transferDTO.getPayer())
.payee(transferDTO.getPayee())
.amount(transferDTO.getAmount())
- .state(Transfer.State.RECEIVED)
.build())
.thenApply($ ->
ResponseEntity
--- /dev/null
+package de.juplo.kafka.payment.transfer.adapter;
+
+
+import de.juplo.kafka.payment.transfer.domain.Transfer;
+import lombok.Builder;
+import lombok.Data;
+import lombok.EqualsAndHashCode;
+
+import java.util.LinkedList;
+import java.util.List;
+
+
+@Data
+@EqualsAndHashCode
+@Builder
+public class TransferStateChangedEvent
+{
+ private long id;
+ private Transfer.State state;
+}
{
public enum State
{
- RECEIVED(false),
CREATED(false),
INVALID(false),
CHECKED(false),
package de.juplo.kafka.payment.transfer.domain;
-import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase;
-import de.juplo.kafka.payment.transfer.ports.HandleTransferUseCase;
-import de.juplo.kafka.payment.transfer.ports.MessagingService;
-import de.juplo.kafka.payment.transfer.ports.TransferRepository;
+import de.juplo.kafka.payment.transfer.ports.*;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import java.util.Optional;
-import static de.juplo.kafka.payment.transfer.domain.Transfer.State.CHECKED;
-import static de.juplo.kafka.payment.transfer.domain.Transfer.State.CREATED;
+import static de.juplo.kafka.payment.transfer.domain.Transfer.State.*;
@Slf4j
@RequiredArgsConstructor
-public class TransferService implements HandleTransferUseCase, GetTransferUseCase
+public class TransferService implements CreateTransferUseCase, HandleStateChangeUseCase, GetTransferUseCase
{
private final TransferRepository repository;
private final MessagingService messagingService;
- private void create(Transfer transfer)
+ @Override
+ public void create(Transfer transfer)
{
repository
.get(transfer.getId())
() ->
{
repository.store(transfer);
- transfer.setState(CREATED);
- messagingService.send(transfer);
+ messagingService.send(transfer.getId(), CREATED);
});
}
Transfer.State state = transfer.getState();
switch (state)
{
- case RECEIVED:
- repository.store(transfer);
- create(transfer);
- break;
-
case CREATED:
repository.store(transfer);
check(transfer);
private void check(Transfer transfer)
{
// TODO: Do some time consuming checks...
- transfer.setState(CHECKED);
- messagingService.send(transfer);
+ messagingService.send(transfer.getId(), CHECKED);
}
public Optional<Transfer> get(Long id)
--- /dev/null
+package de.juplo.kafka.payment.transfer.ports;
+
+import de.juplo.kafka.payment.transfer.domain.Transfer;
+
+
+public interface CreateTransferUseCase
+{
+ void create(Transfer transfer);
+}
--- /dev/null
+package de.juplo.kafka.payment.transfer.ports;
+
+import de.juplo.kafka.payment.transfer.domain.Transfer;
+
+
+public interface HandleStateChangeUseCase
+{
+ void handle(Transfer transfer);
+}
+++ /dev/null
-package de.juplo.kafka.payment.transfer.ports;
-
-import de.juplo.kafka.payment.transfer.domain.Transfer;
-
-
-public interface HandleTransferUseCase
-{
- void handle(Transfer transfer);
-}
public interface MessagingService
{
CompletableFuture<?> send(Transfer transfer);
+ CompletableFuture<?> send(Long id, Transfer.State state);
}
import org.junit.jupiter.api.Test;
-import static de.juplo.kafka.payment.transfer.domain.Transfer.State.CHECKED;
-import static de.juplo.kafka.payment.transfer.domain.Transfer.State.RECEIVED;
+import static de.juplo.kafka.payment.transfer.domain.Transfer.State.*;
import static org.assertj.core.api.Assertions.assertThat;
@Test
public void testEqualsIgnoresState()
{
- Transfer a = Transfer.builder().id(1).payer(1).payee(1).amount(1).state(RECEIVED).build();
+ Transfer a = Transfer.builder().id(1).payer(1).payee(1).amount(1).state(CREATED).build();
Transfer b = Transfer.builder().id(1).payer(1).payee(1).amount(1).state(CHECKED).build();
assertThat(a).isEqualTo(b);