WIP
[demos/kafka/demos-kafka-payment-system-transfer] / transfer / src / main / java / de / juplo / kafka / payment / transfer / persistence / InMemoryTransferRepository.java
1 package de.juplo.kafka.payment.transfer.persistence;
2
3 import com.fasterxml.jackson.core.JsonProcessingException;
4 import com.fasterxml.jackson.databind.ObjectMapper;
5 import de.juplo.kafka.payment.transfer.domain.Transfer;
6 import de.juplo.kafka.payment.transfer.domain.TransferRepository;
7 import lombok.RequiredArgsConstructor;
8 import lombok.extern.slf4j.Slf4j;
9 import org.springframework.stereotype.Component;
10
11 import java.util.HashMap;
12 import java.util.Map;
13 import java.util.Optional;
14
15
16 @Component
17 @RequiredArgsConstructor
18 @Slf4j
19 public class InMemoryTransferRepository implements TransferRepository
20 {
21   private final Map<Long, String> map = new HashMap<>();
22   private final ObjectMapper mapper;
23
24
25   @Override
26   public synchronized void store(Transfer transfer)
27   {
28     Optional
29         .ofNullable(map.get(transfer.getId()))
30         .ifPresent(
31             json ->
32             {
33               throw new IllegalArgumentException("Could not overwrite " + json + " with " + transfer);
34             });
35
36     try
37     {
38       map.put(transfer.getId(), mapper.writeValueAsString(transfer));
39     }
40     catch (JsonProcessingException e)
41     {
42       log.error("Could not convert Transfer.class: {}", transfer, e);
43     }
44   }
45
46   @Override
47   public synchronized Optional<Transfer> get(Long id)
48   {
49     return
50         Optional
51             .ofNullable(map.get(id))
52             .map(json -> {
53               try
54               {
55                 return mapper.readValue(json, Transfer.class);
56               }
57               catch (JsonProcessingException e)
58               {
59                 throw new RuntimeException("Could not convert JSON: " + json, e);
60               }
61             });
62   }
63
64   @Override
65   public synchronized void update(Long id, Transfer.State oldState, Transfer.State newState)
66   {
67     Transfer transfer = get(id).orElseThrow(() -> new IllegalArgumentException("Could not find transfer " + id));
68
69     if (transfer.getState() != oldState)
70       throw new IllegalArgumentException(("Unexpectd state for " + transfer + ", expected: " + oldState));
71
72     transfer.setState(newState);
73     store(transfer);
74   }
75
76   @Override
77   public void remove(Long id)
78   {
79     map.remove(id);
80   }
81 }