The state is periodically stored in a local file, that is read on startup
[demos/kafka/demos-kafka-payment-system-transfer] / src / main / java / de / juplo / kafka / payment / transfer / persistence / InMemoryTransferRepository.java
1 package de.juplo.kafka.payment.transfer.persistence;
2
3 import com.fasterxml.jackson.core.JsonProcessingException;
4 import com.fasterxml.jackson.databind.ObjectMapper;
5 import de.juplo.kafka.payment.transfer.adapter.TransferPartitioner;
6 import de.juplo.kafka.payment.transfer.domain.Transfer;
7 import de.juplo.kafka.payment.transfer.ports.TransferRepository;
8 import lombok.extern.slf4j.Slf4j;
9
10 import java.io.*;
11 import java.util.HashMap;
12 import java.util.Map;
13 import java.util.Optional;
14 import java.util.stream.IntStream;
15
16
17 @Slf4j
18 public class InMemoryTransferRepository implements TransferRepository
19 {
20   private final int numPartitions;
21   private final ObjectMapper mapper;
22
23   private final Data data;
24   private final Optional<File> stateStore;
25
26
27   public InMemoryTransferRepository(Optional<File> stateStore, int numPartitions, ObjectMapper mapper)
28   {
29     this.stateStore = stateStore;
30     this.numPartitions = numPartitions;
31     this.mapper = mapper;
32
33     Data data = null;
34     try
35     {
36       if (stateStore.isPresent())
37       {
38         try (
39             FileInputStream fis = new FileInputStream(stateStore.get());
40             ObjectInputStream ois = new ObjectInputStream(fis))
41         {
42           data = (Data) ois.readObject();
43           final long offsets[] = data.offsets;
44           final Map<Long, String> map[] = data.mappings;
45           IntStream
46               .range(0, numPartitions)
47               .forEach(i -> log.info(
48                   "restored locally stored state for partition {}: position={}, entries={}",
49                   i,
50                   offsets[i],
51                   offsets[i] == 0 ? 0 : map[i].size()));
52           return;
53         }
54         catch (IOException | ClassNotFoundException e)
55         {
56           log.error("could not read state from local store {}: {}", stateStore.get(), e.getMessage());
57         }
58       }
59
60       log.info("will restore state from Kafka");
61       data = new Data(numPartitions);
62     }
63     finally
64     {
65       this.data = data;
66     }
67   }
68
69
70   @Override
71   public void store(Transfer transfer)
72   {
73     try
74     {
75       int partition = partitionForId(transfer.getId());
76       data.mappings[partition].put(transfer.getId(), mapper.writeValueAsString(transfer));
77     }
78     catch (JsonProcessingException e)
79     {
80       throw new RuntimeException(e);
81     }
82   }
83
84   @Override
85   public Optional<Transfer> get(Long id)
86   {
87     return
88         Optional
89             .ofNullable(data.mappings[partitionForId(id)])
90             .map(mapping -> mapping.get(id))
91             .map(json -> {
92               try
93               {
94                 return mapper.readValue(json, Transfer.class);
95               }
96               catch (JsonProcessingException e)
97               {
98                 throw new RuntimeException("Could not convert JSON: " + json, e);
99               }
100             });
101   }
102
103   @Override
104   public void remove(Long id)
105   {
106     data.mappings[partitionForId(id)].remove(id);
107   }
108
109   @Override
110   public long activatePartition(int partition)
111   {
112     return data.offsets[partition];
113   }
114
115   @Override
116   public void deactivatePartition(int partition, long offset)
117   {
118     data.offsets[partition] = offset;
119   }
120
121   @Override
122   public long storedPosition(int partition)
123   {
124     return data.offsets[partition];
125   }
126
127   @Override
128   public void storeState(Map<Integer, Long> offsets)
129   {
130     offsets.forEach((partition, offset) -> data.offsets[partition] = offset);
131     stateStore.ifPresent(file ->
132     {
133       try (
134           FileOutputStream fos = new FileOutputStream(file);
135           ObjectOutputStream oos = new ObjectOutputStream(fos))
136       {
137         oos.writeObject(data);
138         IntStream
139             .range(0, numPartitions)
140             .forEach(i -> log.info(
141                 "locally stored state for partition {}: position={}, entries={}",
142                 i,
143                 data.offsets[i],
144                 data.offsets[i] == 0 ? 0 : data.mappings[i].size()));
145       }
146       catch (IOException e)
147       {
148         log.error("could not write state to store {}: {}", file, e.getMessage());
149       }
150     });
151   }
152
153
154   private int partitionForId(long id)
155   {
156     String key = Long.toString(id);
157     return TransferPartitioner.computeHashForKey(key, numPartitions);
158   }
159
160
161   static class Data implements Serializable
162   {
163     final long offsets[];
164     final Map<Long, String> mappings[];
165
166     Data(int numPartitions)
167     {
168       offsets = new long[numPartitions];
169       mappings = new Map[numPartitions];
170       for (int i = 0; i < numPartitions; i++)
171       {
172         offsets[i] = 0;
173         mappings[i] = new HashMap<>();
174       }
175     }
176   }
177 }