--- /dev/null
+package de.juplo.kafka.payment.transfer.controller;
+
+import de.juplo.kafka.payment.transfer.domain.Transfer;
+import lombok.Builder;
+import lombok.Data;
+
+import javax.validation.constraints.NotNull;
+
+
+/**
+ * Simple DTO used by the REST interface
+ */
+@Data
+@Builder
+public class TransferDTO
+{
+ @NotNull(message = "Cannot be null")
+ private long id;
+ @NotNull(message = "Cannot be null")
+ private long payer;
+ @NotNull(message = "Cannot be null")
+ private long payee;
+ @NotNull(message = "Cannot be null")
+ private int amount;
+
+ private Transfer.State state;
+
+
+ public Transfer toTransfer()
+ {
+ return
+ Transfer
+ .builder()
+ .id(id)
+ .payer(payer)
+ .payee(payee)
+ .amount(amount)
+ .build();
+ }
+
+
+ public static TransferDTO of(Transfer transfer)
+ {
+ return
+ TransferDTO
+ .builder()
+ .id(transfer.getId())
+ .payer(transfer.getPayer())
+ .payee(transfer.getPayee())
+ .amount(transfer.getAmount())
+ .state(transfer.getState())
+ .build();
+ }
+}
--- /dev/null
+package de.juplo.kafka.payment.transfer.persistence;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.payment.transfer.domain.Transfer;
+import de.juplo.kafka.payment.transfer.domain.TransferRepository;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Component;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+
+@Component
+@RequiredArgsConstructor
+@Slf4j
+public class InMemoryTransferRepository implements TransferRepository
+{
+ private final Map<Long, String> map = new HashMap<>();
+ private final ObjectMapper mapper;
+
+
+ @Override
+ public synchronized void store(Transfer transfer)
+ {
+ Optional
+ .ofNullable(map.get(transfer.getId()))
+ .ifPresent(
+ json ->
+ {
+ throw new IllegalArgumentException("Could not overwrite " + json + " with " + transfer);
+ });
+
+ try
+ {
+ map.put(transfer.getId(), mapper.writeValueAsString(transfer));
+ }
+ catch (JsonProcessingException e)
+ {
+ log.error("Could not convert Transfer.class: {}", transfer, e);
+ }
+ }
+
+ @Override
+ public synchronized Optional<Transfer> get(Long id)
+ {
+ return
+ Optional
+ .ofNullable(map.get(id))
+ .map(json -> {
+ try
+ {
+ return mapper.readValue(json, Transfer.class);
+ }
+ catch (JsonProcessingException e)
+ {
+ throw new RuntimeException("Could not convert JSON: " + json, e);
+ }
+ });
+ }
+
+ @Override
+ public synchronized void update(Long id, Transfer.State oldState, Transfer.State newState)
+ {
+ Transfer transfer = get(id).orElseThrow(() -> new IllegalArgumentException("Could not find transfer " + id));
+
+ if (transfer.getState() != oldState)
+ throw new IllegalArgumentException(("Unexpectd state for " + transfer + ", expected: " + oldState));
+
+ transfer.setState(newState);
+ store(transfer);
+ }
+
+ @Override
+ public void remove(Long id)
+ {
+ map.remove(id);
+ }
+}