c5af531c3412468f8cf23ff3d48a1de4a221153c
[demos/kafka/demos-kafka-payment-system-transfer] / src / main / java / de / juplo / kafka / payment / transfer / persistence / InMemoryTransferRepository.java
1 package de.juplo.kafka.payment.transfer.persistence;
2
3 import com.fasterxml.jackson.core.JsonProcessingException;
4 import com.fasterxml.jackson.databind.ObjectMapper;
5 import de.juplo.kafka.payment.transfer.domain.Transfer;
6 import de.juplo.kafka.payment.transfer.ports.TransferRepository;
7 import lombok.RequiredArgsConstructor;
8 import lombok.extern.slf4j.Slf4j;
9 import org.springframework.stereotype.Component;
10
11 import java.util.HashMap;
12 import java.util.Map;
13 import java.util.Optional;
14
15
16 @Component
17 @RequiredArgsConstructor
18 @Slf4j
19 public class InMemoryTransferRepository implements TransferRepository
20 {
21   private final Map<Long, String> map = new HashMap<>();
22   private final ObjectMapper mapper;
23
24
25   @Override
26   public synchronized void store(Transfer transfer)
27   {
28     Optional
29         .ofNullable(map.get(transfer.getId()))
30         .ifPresentOrElse(
31             json ->
32             {
33               throw new IllegalArgumentException("Could not overwrite " + json + " with " + transfer);
34             },
35             () -> put(transfer));
36   }
37
38   private void put(Transfer transfer)
39   {
40     try
41     {
42       map.put(transfer.getId(), mapper.writeValueAsString(transfer));
43     }
44     catch (JsonProcessingException e)
45     {
46       log.error("Could not convert Transfer.class: {}", transfer, e);
47     }
48   }
49
50   @Override
51   public synchronized Optional<Transfer> get(Long id)
52   {
53     return
54         Optional
55             .ofNullable(map.get(id))
56             .map(json -> {
57               try
58               {
59                 return mapper.readValue(json, Transfer.class);
60               }
61               catch (JsonProcessingException e)
62               {
63                 throw new RuntimeException("Could not convert JSON: " + json, e);
64               }
65             });
66   }
67
68   @Override
69   public synchronized void update(Long id, Transfer.State oldState, Transfer.State newState)
70   {
71     Transfer transfer = get(id).orElseThrow(() -> new IllegalArgumentException("Could not find transfer " + id));
72
73     if (transfer.getState() != oldState)
74       throw new IllegalArgumentException(("Unexpectd state for " + transfer + ", expected: " + oldState));
75
76     transfer.setState(newState);
77     put(transfer);
78   }
79
80   @Override
81   public void remove(Long id)
82   {
83     map.remove(id);
84   }
85 }