WIP
[demos/kafka/demos-kafka-payment-system-transfer] / src / main / java / de / juplo / kafka / payment / transfer / persistence / InMemoryTransferRepository.java
1 package de.juplo.kafka.payment.transfer.persistence;
2
3 import com.fasterxml.jackson.core.JsonProcessingException;
4 import com.fasterxml.jackson.databind.ObjectMapper;
5 import de.juplo.kafka.payment.transfer.adapter.TransferPartitioner;
6 import de.juplo.kafka.payment.transfer.domain.Transfer;
7 import de.juplo.kafka.payment.transfer.ports.TransferRepository;
8 import lombok.extern.slf4j.Slf4j;
9
10 import java.io.*;
11 import java.util.HashMap;
12 import java.util.Map;
13 import java.util.Optional;
14 import java.util.stream.IntStream;
15
16
17 @Slf4j
18 public class InMemoryTransferRepository implements TransferRepository
19 {
20   private final int numPartitions;
21   private final ObjectMapper mapper;
22
23   private final Data data;
24   private final Optional<File> stateStore;
25
26
27   public InMemoryTransferRepository(Optional<File> stateStore, int numPartitions, ObjectMapper mapper)
28   {
29     this.stateStore = stateStore;
30     this.numPartitions = numPartitions;
31     this.mapper = mapper;
32
33     Data data = null;
34     try
35     {
36       if (stateStore.isPresent())
37       {
38         try (
39             FileInputStream fis = new FileInputStream(stateStore.get());
40             ObjectInputStream ois = new ObjectInputStream(fis))
41         {
42           data = (Data) ois.readObject();
43           final long offsets[] = data.offsets;
44           final Map<Long, String> map[] = data.mappings;
45           IntStream
46               .range(0, numPartitions)
47               .forEach(i -> log.info(
48                   "restored locally stored state for partition {}: position={}, entries={}",
49                   i,
50                   offsets[i],
51                   offsets[i] == 0 ? 0 : map[i].size()));
52           return;
53         }
54         catch (IOException | ClassNotFoundException e)
55         {
56           log.error("could not read state from local store {}: {}", stateStore.get(), e.getMessage());
57         }
58       }
59
60       log.info("will restore state from Kafka");
61       data = new Data(numPartitions);
62     }
63     finally
64     {
65       this.data = data;
66     }
67   }
68
69
70   @Override
71   public void store(Transfer transfer)
72   {
73     try
74     {
75       int partition = partitionForId(transfer.getId());
76       data.mappings[partition].put(transfer.getId(), mapper.writeValueAsString(transfer));
77       // We reset the offset for the state of the modified partition,
78       // because the corresponding offset is not known (yet).
79       data.offsets[partition] = 0;
80     }
81     catch (JsonProcessingException e)
82     {
83       throw new RuntimeException(e);
84     }
85   }
86
87   @Override
88   public Optional<Transfer> get(Long id)
89   {
90     return
91         Optional
92             .ofNullable(data.mappings[partitionForId(id)])
93             .map(mapping -> mapping.get(id))
94             .map(json -> {
95               try
96               {
97                 return mapper.readValue(json, Transfer.class);
98               }
99               catch (JsonProcessingException e)
100               {
101                 throw new RuntimeException("Could not convert JSON: " + json, e);
102               }
103             });
104   }
105
106   @Override
107   public void remove(Long id)
108   {
109     data.mappings[partitionForId(id)].remove(id);
110   }
111
112   @Override
113   public long activatePartition(int partition)
114   {
115     if (data.offsets[partition] == 0)
116     {
117       // Initialize the state of the partition, if
118       // no corresponding offset is known.
119       if (data.mappings[partition] != null)
120         log.warn(
121             "dropping state for partition {} ({} entries), because the corresponding offset is unknown!",
122             partition,
123             data.mappings[partition].size());
124
125       data.mappings[partition] = new HashMap<>();
126     }
127
128     return data.offsets[partition];
129   }
130
131   @Override
132   public void deactivatePartition(int partition, long offset)
133   {
134     data.offsets[partition] = offset;
135   }
136
137   @Override
138   public long storedPosition(int partition)
139   {
140     return data.offsets[partition];
141   }
142
143   @Override
144   public void storeState(Map<Integer, Long> offsets)
145   {
146     offsets.forEach((partition, offset) -> data.offsets[partition] = offset);
147     for (int i = 0; i < numPartitions; i++)
148     {
149       if (data.offsets[i] == 0 && data.mappings[i] != null)
150       {
151         log.warn(
152             "dropping state for partition {} ({} entries), because the corresponding offset is unknown!",
153             i,
154             data.mappings[i].size());
155
156         data.mappings[i] = null;
157       }
158     }
159     stateStore.ifPresent(file ->
160     {
161       try (
162           FileOutputStream fos = new FileOutputStream(file);
163           ObjectOutputStream oos = new ObjectOutputStream(fos))
164       {
165         oos.writeObject(data);
166         IntStream
167             .range(0, numPartitions)
168             .forEach(i -> log.info(
169                 "locally stored state for partition {}: position={}, entries={}",
170                 i,
171                 data.offsets[i],
172                 data.offsets[i] == 0 ? 0 : data.mappings[i].size()));
173       }
174       catch (IOException e)
175       {
176         log.error("could not write state to store {}: {}", file, e.getMessage());
177       }
178     });
179   }
180
181
182   private int partitionForId(long id)
183   {
184     String key = Long.toString(id);
185     return TransferPartitioner.computeHashForKey(key, numPartitions);
186   }
187
188
189   static class Data implements Serializable
190   {
191     final long offsets[];
192     final Map<Long, String> mappings[];
193
194     Data(int numPartitions)
195     {
196       offsets = new long[numPartitions];
197       mappings = new Map[numPartitions];
198       for (int i = 0; i < numPartitions; i++)
199         offsets[i] = 0;
200     }
201   }
202 }