import de.juplo.kafka.payment.transfer.ports.TransferRepository;
import lombok.extern.slf4j.Slf4j;
+import java.io.*;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
+import java.util.stream.IntStream;
@Slf4j
public class InMemoryTransferRepository implements TransferRepository
{
private final int numPartitions;
- private final Map<Long, String> mappings[];
private final ObjectMapper mapper;
+ private final Data data;
+ private final Optional<File> stateStore;
- public InMemoryTransferRepository(int numPartitions, ObjectMapper mapper)
+
+ public InMemoryTransferRepository(Optional<File> stateStore, int numPartitions, ObjectMapper mapper)
{
+ this.stateStore = stateStore;
this.numPartitions = numPartitions;
- this.mappings = new HashMap[numPartitions];
- for (int i = 0; i < numPartitions; i++)
- this.mappings[i] = new HashMap<>();
this.mapper = mapper;
+
+ Data data = null;
+ try
+ {
+ if (stateStore.isPresent())
+ {
+ try (
+ FileInputStream fis = new FileInputStream(stateStore.get());
+ ObjectInputStream ois = new ObjectInputStream(fis))
+ {
+ data = (Data) ois.readObject();
+ final long offsets[] = data.offsets;
+ final Map<Long, String> map[] = data.mappings;
+ IntStream
+ .range(0, numPartitions)
+ .forEach(i -> log.info(
+ "restored locally stored state for partition {}: position={}, entries={}",
+ i,
+ offsets[i],
+ offsets[i] == 0 ? 0 : map[i].size()));
+ return;
+ }
+ catch (IOException | ClassNotFoundException e)
+ {
+ log.error("could not read state from local store {}: {}", stateStore.get(), e.getMessage());
+ }
+ }
+
+ log.info("will restore state from Kafka");
+ data = new Data(numPartitions);
+ }
+ finally
+ {
+ this.data = data;
+ }
}
try
{
int partition = partitionForId(transfer.getId());
- mappings[partition].put(transfer.getId(), mapper.writeValueAsString(transfer));
+ data.mappings[partition].put(transfer.getId(), mapper.writeValueAsString(transfer));
+ // We reset the offset for the state of the modified partition,
+ // because the corresponding offset is not known (yet).
+ data.offsets[partition] = 0;
}
catch (JsonProcessingException e)
{
{
return
Optional
- .ofNullable(this.mappings[partitionForId(id)])
+ .ofNullable(data.mappings[partitionForId(id)])
.map(mapping -> mapping.get(id))
.map(json -> {
try
@Override
public void remove(Long id)
{
- mappings[partitionForId(id)].remove(id);
+ data.mappings[partitionForId(id)].remove(id);
+ }
+
+ @Override
+ public long activatePartition(int partition)
+ {
+ if (data.offsets[partition] == 0)
+ {
+ // Initialize the state of the partition, if
+ // no corresponding offset is known.
+ if (data.mappings[partition] != null)
+ log.warn(
+ "dropping state for partition {} ({} entries), because the corresponding offset is unknown!",
+ partition,
+ data.mappings[partition].size());
+
+ data.mappings[partition] = new HashMap<>();
+ }
+
+ return data.offsets[partition];
+ }
+
+ @Override
+ public void deactivatePartition(int partition, long offset)
+ {
+ data.offsets[partition] = offset;
}
@Override
- public void resetStorageForPartition(int partition)
+ public long storedPosition(int partition)
{
- log.info(
- "resetting storage for partition {}: dropping {} entries",
- partition,
- mappings[partition].size());
- mappings[partition].clear();
+ return data.offsets[partition];
}
+ @Override
+ public void storeState(Map<Integer, Long> offsets)
+ {
+ offsets.forEach((partition, offset) -> data.offsets[partition] = offset);
+ for (int i = 0; i < numPartitions; i++)
+ {
+ if (data.offsets[i] == 0 && data.mappings[i] != null)
+ {
+ log.warn(
+ "dropping state for partition {} ({} entries), because the corresponding offset is unknown!",
+ i,
+ data.mappings[i].size());
+
+ data.mappings[i] = null;
+ }
+ }
+ stateStore.ifPresent(file ->
+ {
+ try (
+ FileOutputStream fos = new FileOutputStream(file);
+ ObjectOutputStream oos = new ObjectOutputStream(fos))
+ {
+ oos.writeObject(data);
+ IntStream
+ .range(0, numPartitions)
+ .forEach(i -> log.info(
+ "locally stored state for partition {}: position={}, entries={}",
+ i,
+ data.offsets[i],
+ data.offsets[i] == 0 ? 0 : data.mappings[i].size()));
+ }
+ catch (IOException e)
+ {
+ log.error("could not write state to store {}: {}", file, e.getMessage());
+ }
+ });
+ }
+
+
private int partitionForId(long id)
{
String key = Long.toString(id);
return TransferPartitioner.computeHashForKey(key, numPartitions);
}
+
+
+ static class Data implements Serializable
+ {
+ final long offsets[];
+ final Map<Long, String> mappings[];
+
+ Data(int numPartitions)
+ {
+ offsets = new long[numPartitions];
+ mappings = new Map[numPartitions];
+ for (int i = 0; i < numPartitions; i++)
+ offsets[i] = 0;
+ }
+ }
}