X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fpayment%2Ftransfer%2Fpersistence%2FInMemoryTransferRepository.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fpayment%2Ftransfer%2Fpersistence%2FInMemoryTransferRepository.java;h=eaad3ee25cccf56d9fca2eeb43ba81f5252606c6;hb=225fdb5fe597b10ea29ee56895aa2b0df98e0604;hp=ec293ad53cb65b3fdcbd7bea17a1636397339fd0;hpb=cbfe4b796266ff7b9689fb69c5a8efee8ebb130a;p=demos%2Fkafka%2Fdemos-kafka-payment-system-transfer diff --git a/src/main/java/de/juplo/kafka/payment/transfer/persistence/InMemoryTransferRepository.java b/src/main/java/de/juplo/kafka/payment/transfer/persistence/InMemoryTransferRepository.java index ec293ad..eaad3ee 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/persistence/InMemoryTransferRepository.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/persistence/InMemoryTransferRepository.java @@ -2,6 +2,7 @@ package de.juplo.kafka.payment.transfer.persistence; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import de.juplo.kafka.payment.transfer.adapter.TransferPartitioner; import de.juplo.kafka.payment.transfer.domain.Transfer; import de.juplo.kafka.payment.transfer.ports.TransferRepository; import lombok.RequiredArgsConstructor; @@ -13,21 +14,31 @@ import java.util.Map; import java.util.Optional; -@Component -@RequiredArgsConstructor @Slf4j public class InMemoryTransferRepository implements TransferRepository { - private final Map map = new HashMap<>(); + private final int numPartitions; + private final Map map[]; private final ObjectMapper mapper; + public InMemoryTransferRepository(int numPartitions, ObjectMapper mapper) + { + this.numPartitions = numPartitions; + this.map = new HashMap[numPartitions]; + for (int i = 0; i < numPartitions; i++) + this.map[i] = new HashMap<>(); + this.mapper = mapper; + } + + @Override public void store(Transfer transfer) { try { - map.put(transfer.getId(), mapper.writeValueAsString(transfer)); + int partition = partitionForId(transfer.getId()); + map[partition].put(transfer.getId(), mapper.writeValueAsString(transfer)); } catch (JsonProcessingException e) { @@ -40,7 +51,7 @@ public class InMemoryTransferRepository implements TransferRepository { return Optional - .ofNullable(map.get(id)) + .ofNullable(map[partitionForId(id)].get(id)) .map(json -> { try { @@ -56,6 +67,22 @@ public class InMemoryTransferRepository implements TransferRepository @Override public void remove(Long id) { - map.remove(id); + map[partitionForId(id)].remove(id); + } + + @Override + public void resetStorageForPartition(int partition) + { + log.info( + "reseting storage for partition {}: dropping {} entries", + partition, + map[partition].size()); + map[partition].clear(); + } + + private int partitionForId(long id) + { + String key = Long.toString(id); + return TransferPartitioner.computeHashForKey(key, numPartitions); } }