X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fpayment%2Ftransfer%2Fpersistence%2FInMemoryTransferRepository.java;h=dd1d7add5e13f9b8cc4968df8e869ebfb7ab6adc;hb=a9a2158de30cb47c96cfabf06c797d81a352a1e1;hp=c5af531c3412468f8cf23ff3d48a1de4a221153c;hpb=6191849fee717b080118717c86df79fad12bafc8;p=demos%2Fkafka%2Fdemos-kafka-payment-system-transfer diff --git a/src/main/java/de/juplo/kafka/payment/transfer/persistence/InMemoryTransferRepository.java b/src/main/java/de/juplo/kafka/payment/transfer/persistence/InMemoryTransferRepository.java index c5af531..dd1d7ad 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/persistence/InMemoryTransferRepository.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/persistence/InMemoryTransferRepository.java @@ -2,57 +2,95 @@ package de.juplo.kafka.payment.transfer.persistence; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import de.juplo.kafka.payment.transfer.adapter.TransferPartitioner; import de.juplo.kafka.payment.transfer.domain.Transfer; import de.juplo.kafka.payment.transfer.ports.TransferRepository; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Component; +import java.io.*; import java.util.HashMap; import java.util.Map; import java.util.Optional; +import java.util.stream.IntStream; -@Component -@RequiredArgsConstructor @Slf4j public class InMemoryTransferRepository implements TransferRepository { - private final Map map = new HashMap<>(); + private final int numPartitions; private final ObjectMapper mapper; + private final Data data; + private final Optional stateStore; - @Override - public synchronized void store(Transfer transfer) + + public InMemoryTransferRepository(Optional stateStore, int numPartitions, ObjectMapper mapper) { - Optional - .ofNullable(map.get(transfer.getId())) - .ifPresentOrElse( - json -> - { - throw new IllegalArgumentException("Could not overwrite " + json + " with " + transfer); - }, - () -> put(transfer)); + this.stateStore = stateStore; + this.numPartitions = numPartitions; + this.mapper = mapper; + + Data data = null; + try + { + if (stateStore.isPresent()) + { + try ( + FileInputStream fis = new FileInputStream(stateStore.get()); + ObjectInputStream ois = new ObjectInputStream(fis)) + { + data = (Data) ois.readObject(); + final long offsets[] = data.offsets; + final Map map[] = data.mappings; + IntStream + .range(0, numPartitions) + .forEach(i -> log.info( + "restored locally stored state for partition {}: position={}, entries={}", + i, + offsets[i], + offsets[i] == 0 ? 0 : map[i].size())); + return; + } + catch (IOException | ClassNotFoundException e) + { + log.error("could not read state from local store {}: {}", stateStore.get(), e.getMessage()); + } + } + + log.info("will restore state from Kafka"); + data = new Data(numPartitions); + } + finally + { + this.data = data; + } } - private void put(Transfer transfer) + + @Override + public void store(Transfer transfer) { try { - map.put(transfer.getId(), mapper.writeValueAsString(transfer)); + int partition = partitionForId(transfer.getId()); + data.mappings[partition].put(transfer.getId(), mapper.writeValueAsString(transfer)); + // We reset the offset for the state of the modified partition, + // because the corresponding offset is not known (yet). + data.offsets[partition] = 0; } catch (JsonProcessingException e) { - log.error("Could not convert Transfer.class: {}", transfer, e); + throw new RuntimeException(e); } } @Override - public synchronized Optional get(Long id) + public Optional get(Long id) { return Optional - .ofNullable(map.get(id)) + .ofNullable(data.mappings[partitionForId(id)]) + .map(mapping -> mapping.get(id)) .map(json -> { try { @@ -66,20 +104,99 @@ public class InMemoryTransferRepository implements TransferRepository } @Override - public synchronized void update(Long id, Transfer.State oldState, Transfer.State newState) + public void remove(Long id) + { + data.mappings[partitionForId(id)].remove(id); + } + + @Override + public long activatePartition(int partition) { - Transfer transfer = get(id).orElseThrow(() -> new IllegalArgumentException("Could not find transfer " + id)); + if (data.offsets[partition] == 0) + { + // Initialize the state of the partition, if + // no corresponding offset is known. + if (data.mappings[partition] != null) + log.warn( + "dropping state for partition {} ({} entries), because the corresponding offset is unknown!", + partition, + data.mappings[partition].size()); - if (transfer.getState() != oldState) - throw new IllegalArgumentException(("Unexpectd state for " + transfer + ", expected: " + oldState)); + data.mappings[partition] = new HashMap<>(); + } - transfer.setState(newState); - put(transfer); + return data.offsets[partition]; } @Override - public void remove(Long id) + public void deactivatePartition(int partition, long offset) + { + data.offsets[partition] = offset; + } + + @Override + public long storedPosition(int partition) + { + return data.offsets[partition]; + } + + @Override + public void storeState(Map offsets) { - map.remove(id); + offsets.forEach((partition, offset) -> data.offsets[partition] = offset); + for (int i = 0; i < numPartitions; i++) + { + if (data.offsets[i] == 0 && data.mappings[i] != null) + { + log.warn( + "dropping state for partition {} ({} entries), because the corresponding offset is unknown!", + i, + data.mappings[i].size()); + + data.mappings[i] = null; + } + } + stateStore.ifPresent(file -> + { + try ( + FileOutputStream fos = new FileOutputStream(file); + ObjectOutputStream oos = new ObjectOutputStream(fos)) + { + oos.writeObject(data); + IntStream + .range(0, numPartitions) + .forEach(i -> log.info( + "locally stored state for partition {}: position={}, entries={}", + i, + data.offsets[i], + data.offsets[i] == 0 ? 0 : data.mappings[i].size())); + } + catch (IOException e) + { + log.error("could not write state to store {}: {}", file, e.getMessage()); + } + }); + } + + + private int partitionForId(long id) + { + String key = Long.toString(id); + return TransferPartitioner.computeHashForKey(key, numPartitions); + } + + + static class Data implements Serializable + { + final long offsets[]; + final Map mappings[]; + + Data(int numPartitions) + { + offsets = new long[numPartitions]; + mappings = new Map[numPartitions]; + for (int i = 0; i < numPartitions; i++) + offsets[i] = 0; + } } }