X-Git-Url: https://juplo.de/gitweb/?p=demos%2Fkafka%2Fdemos-kafka-payment-system-transfer;a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fpayment%2Ftransfer%2Fpersistence%2FInMemoryTransferRepository.java;h=c29adf7657a6015f315b64f0281e6df235517657;hp=2a4d7349c534f37635d2087ddee2030386dfa2c8;hb=2432aeedb30ac4c1405045514d8eacb791a4d352;hpb=c64f93de3e59af674885fdad08c521d82f4802d1 diff --git a/src/main/java/de/juplo/kafka/payment/transfer/persistence/InMemoryTransferRepository.java b/src/main/java/de/juplo/kafka/payment/transfer/persistence/InMemoryTransferRepository.java index 2a4d734..c29adf7 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/persistence/InMemoryTransferRepository.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/persistence/InMemoryTransferRepository.java @@ -7,26 +7,63 @@ import de.juplo.kafka.payment.transfer.domain.Transfer; import de.juplo.kafka.payment.transfer.ports.TransferRepository; import lombok.extern.slf4j.Slf4j; +import java.io.*; import java.util.HashMap; import java.util.Map; import java.util.Optional; +import java.util.stream.IntStream; @Slf4j public class InMemoryTransferRepository implements TransferRepository { private final int numPartitions; - private final Map mappings[]; private final ObjectMapper mapper; + private final Data data; + private final Optional stateStore; - public InMemoryTransferRepository(int numPartitions, ObjectMapper mapper) + + public InMemoryTransferRepository(Optional stateStore, int numPartitions, ObjectMapper mapper) { + this.stateStore = stateStore; this.numPartitions = numPartitions; - this.mappings = new HashMap[numPartitions]; - for (int i = 0; i < numPartitions; i++) - this.mappings[i] = new HashMap<>(); this.mapper = mapper; + + Data data = null; + try + { + if (stateStore.isPresent()) + { + try ( + FileInputStream fis = new FileInputStream(stateStore.get()); + ObjectInputStream ois = new ObjectInputStream(fis)) + { + data = (Data) ois.readObject(); + final long offsets[] = data.offsets; + final Map map[] = data.mappings; + IntStream + .range(0, numPartitions) + .forEach(i -> log.info( + "restored locally stored state for partition {}: position={}, entries={}", + i, + offsets[i], + offsets[i] == 0 ? 0 : map[i].size())); + return; + } + catch (IOException | ClassNotFoundException e) + { + log.error("could not read state from local store {}: {}", stateStore.get(), e.getMessage()); + } + } + + log.info("will restore state from Kafka"); + data = new Data(numPartitions); + } + finally + { + this.data = data; + } } @@ -36,7 +73,7 @@ public class InMemoryTransferRepository implements TransferRepository try { int partition = partitionForId(transfer.getId()); - mappings[partition].put(transfer.getId(), mapper.writeValueAsString(transfer)); + data.mappings[partition].put(transfer.getId(), mapper.writeValueAsString(transfer)); } catch (JsonProcessingException e) { @@ -49,7 +86,7 @@ public class InMemoryTransferRepository implements TransferRepository { return Optional - .ofNullable(this.mappings[partitionForId(id)]) + .ofNullable(data.mappings[partitionForId(id)]) .map(mapping -> mapping.get(id)) .map(json -> { try @@ -66,22 +103,75 @@ public class InMemoryTransferRepository implements TransferRepository @Override public void remove(Long id) { - mappings[partitionForId(id)].remove(id); + data.mappings[partitionForId(id)].remove(id); } @Override - public void resetStorageForPartition(int partition) + public long activatePartition(int partition) { - log.info( - "resetting storage for partition {}: dropping {} entries", - partition, - mappings[partition].size()); - mappings[partition].clear(); + return data.offsets[partition]; } + @Override + public void deactivatePartition(int partition, long offset) + { + data.offsets[partition] = offset; + } + + @Override + public long storedPosition(int partition) + { + return data.offsets[partition]; + } + + @Override + public void storeState(Map offsets) + { + offsets.forEach((partition, offset) -> data.offsets[partition] = offset); + stateStore.ifPresent(file -> + { + try ( + FileOutputStream fos = new FileOutputStream(file); + ObjectOutputStream oos = new ObjectOutputStream(fos)) + { + oos.writeObject(data); + IntStream + .range(0, numPartitions) + .forEach(i -> log.info( + "locally stored state for partition {}: position={}, entries={}", + i, + data.offsets[i], + data.offsets[i] == 0 ? 0 : data.mappings[i].size())); + } + catch (IOException e) + { + log.error("could not write state to store {}: {}", file, e.getMessage()); + } + }); + } + + private int partitionForId(long id) { String key = Long.toString(id); return TransferPartitioner.computeHashForKey(key, numPartitions); } + + + static class Data implements Serializable + { + final long offsets[]; + final Map mappings[]; + + Data(int numPartitions) + { + offsets = new long[numPartitions]; + mappings = new Map[numPartitions]; + for (int i = 0; i < numPartitions; i++) + { + offsets[i] = 0; + mappings[i] = new HashMap<>(); + } + } + } }