1 package de.juplo.kafka.payment.transfer.persistence;
3 import com.fasterxml.jackson.core.JsonProcessingException;
4 import com.fasterxml.jackson.databind.ObjectMapper;
5 import de.juplo.kafka.payment.transfer.adapter.TransferPartitioner;
6 import de.juplo.kafka.payment.transfer.domain.Transfer;
7 import de.juplo.kafka.payment.transfer.ports.TransferRepository;
8 import lombok.extern.slf4j.Slf4j;
11 import java.util.HashMap;
13 import java.util.Optional;
14 import java.util.stream.IntStream;
18 public class InMemoryTransferRepository implements TransferRepository
20 private final int numPartitions;
21 private final ObjectMapper mapper;
23 private final Data data;
24 private final Optional<File> stateStore;
27 public InMemoryTransferRepository(Optional<File> stateStore, int numPartitions, ObjectMapper mapper)
29 this.stateStore = stateStore;
30 this.numPartitions = numPartitions;
36 if (stateStore.isPresent())
39 FileInputStream fis = new FileInputStream(stateStore.get());
40 ObjectInputStream ois = new ObjectInputStream(fis))
42 data = (Data) ois.readObject();
43 final long offsets[] = data.offsets;
44 final Map<Long, String> map[] = data.mappings;
46 .range(0, numPartitions)
47 .forEach(i -> log.info(
48 "restored locally stored state for partition {}: position={}, entries={}",
51 offsets[i] == 0 ? 0 : map[i].size()));
54 catch (IOException | ClassNotFoundException e)
56 log.error("could not read state from local store {}: {}", stateStore.get(), e.getMessage());
60 log.info("will restore state from Kafka");
61 data = new Data(numPartitions);
71 public void store(Transfer transfer)
75 int partition = partitionForId(transfer.getId());
76 data.mappings[partition].put(transfer.getId(), mapper.writeValueAsString(transfer));
78 catch (JsonProcessingException e)
80 throw new RuntimeException(e);
85 public Optional<Transfer> get(Long id)
89 .ofNullable(data.mappings[partitionForId(id)])
90 .map(mapping -> mapping.get(id))
94 return mapper.readValue(json, Transfer.class);
96 catch (JsonProcessingException e)
98 throw new RuntimeException("Could not convert JSON: " + json, e);
104 public void remove(Long id)
106 data.mappings[partitionForId(id)].remove(id);
110 public long activatePartition(int partition)
112 return data.offsets[partition];
116 public void deactivatePartition(int partition, long offset)
118 data.offsets[partition] = offset;
122 public long storedPosition(int partition)
124 return data.offsets[partition];
128 public void storeState(Map<Integer, Long> offsets)
130 offsets.forEach((partition, offset) -> data.offsets[partition] = offset);
131 stateStore.ifPresent(file ->
134 FileOutputStream fos = new FileOutputStream(file);
135 ObjectOutputStream oos = new ObjectOutputStream(fos))
137 oos.writeObject(data);
139 .range(0, numPartitions)
140 .forEach(i -> log.info(
141 "locally stored state for partition {}: position={}, entries={}",
144 data.offsets[i] == 0 ? 0 : data.mappings[i].size()));
146 catch (IOException e)
148 log.error("could not write state to store {}: {}", file, e.getMessage());
154 private int partitionForId(long id)
156 String key = Long.toString(id);
157 return TransferPartitioner.computeHashForKey(key, numPartitions);
161 static class Data implements Serializable
163 final long offsets[];
164 final Map<Long, String> mappings[];
166 Data(int numPartitions)
168 offsets = new long[numPartitions];
169 mappings = new Map[numPartitions];
170 for (int i = 0; i < numPartitions; i++)
173 mappings[i] = new HashMap<>();