1 package de.juplo.kafka.payment.transfer.persistence;
3 import com.fasterxml.jackson.core.JsonProcessingException;
4 import com.fasterxml.jackson.databind.ObjectMapper;
5 import de.juplo.kafka.payment.transfer.adapter.TransferPartitioner;
6 import de.juplo.kafka.payment.transfer.domain.Transfer;
7 import de.juplo.kafka.payment.transfer.ports.TransferRepository;
8 import lombok.extern.slf4j.Slf4j;
11 import java.util.HashMap;
13 import java.util.Optional;
14 import java.util.stream.IntStream;
18 public class InMemoryTransferRepository implements TransferRepository
20 private final int numPartitions;
21 private final ObjectMapper mapper;
23 private final Data data;
24 private final Optional<File> stateStore;
27 public InMemoryTransferRepository(Optional<File> stateStore, int numPartitions, ObjectMapper mapper)
29 this.stateStore = stateStore;
30 this.numPartitions = numPartitions;
36 if (stateStore.isPresent())
39 FileInputStream fis = new FileInputStream(stateStore.get());
40 ObjectInputStream ois = new ObjectInputStream(fis))
42 data = (Data) ois.readObject();
43 final long offsets[] = data.offsets;
44 final Map<Long, String> map[] = data.mappings;
46 .range(0, numPartitions)
47 .forEach(i -> log.info(
48 "restored locally stored state for partition {}: position={}, entries={}",
51 offsets[i] == 0 ? 0 : map[i].size()));
54 catch (IOException | ClassNotFoundException e)
56 log.error("could not read state from local store {}: {}", stateStore.get(), e.getMessage());
60 log.info("will restore state from Kafka");
61 data = new Data(numPartitions);
71 public void store(Transfer transfer)
75 int partition = partitionForId(transfer.getId());
76 data.mappings[partition].put(transfer.getId(), mapper.writeValueAsString(transfer));
77 // We reset the offset for the state of the modified partition,
78 // because the corresponding offset is not known (yet).
79 data.offsets[partition] = 0;
81 catch (JsonProcessingException e)
83 throw new RuntimeException(e);
88 public Optional<Transfer> get(Long id)
92 .ofNullable(data.mappings[partitionForId(id)])
93 .map(mapping -> mapping.get(id))
97 return mapper.readValue(json, Transfer.class);
99 catch (JsonProcessingException e)
101 throw new RuntimeException("Could not convert JSON: " + json, e);
107 public void remove(Long id)
109 data.mappings[partitionForId(id)].remove(id);
113 public long activatePartition(int partition)
115 if (data.offsets[partition] == 0)
117 // Initialize the state of the partition, if
118 // no corresponding offset is known.
119 if (data.mappings[partition] != null)
121 "dropping state for partition {} ({} entries), because the corresponding offset is unknown!",
123 data.mappings[partition].size());
125 data.mappings[partition] = new HashMap<>();
128 return data.offsets[partition];
132 public void deactivatePartition(int partition, long offset)
134 data.offsets[partition] = offset;
138 public long storedPosition(int partition)
140 return data.offsets[partition];
144 public void storeState(Map<Integer, Long> offsets)
146 offsets.forEach((partition, offset) -> data.offsets[partition] = offset);
147 for (int i = 0; i < numPartitions; i++)
149 if (data.offsets[i] == 0 && data.mappings[i] != null)
152 "dropping state for partition {} ({} entries), because the corresponding offset is unknown!",
154 data.mappings[i].size());
156 data.mappings[i] = null;
159 stateStore.ifPresent(file ->
162 FileOutputStream fos = new FileOutputStream(file);
163 ObjectOutputStream oos = new ObjectOutputStream(fos))
165 oos.writeObject(data);
167 .range(0, numPartitions)
168 .forEach(i -> log.info(
169 "locally stored state for partition {}: position={}, entries={}",
172 data.offsets[i] == 0 ? 0 : data.mappings[i].size()));
174 catch (IOException e)
176 log.error("could not write state to store {}: {}", file, e.getMessage());
182 private int partitionForId(long id)
184 String key = Long.toString(id);
185 return TransferPartitioner.computeHashForKey(key, numPartitions);
189 static class Data implements Serializable
191 final long offsets[];
192 final Map<Long, String> mappings[];
194 Data(int numPartitions)
196 offsets = new long[numPartitions];
197 mappings = new Map[numPartitions];
198 for (int i = 0; i < numPartitions; i++)