X-Git-Url: https://juplo.de/gitweb/?p=demos%2Fkafka%2Fdemos-kafka-payment-system-transfer;a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fpayment%2Ftransfer%2Fpersistence%2FInMemoryTransferRepository.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fpayment%2Ftransfer%2Fpersistence%2FInMemoryTransferRepository.java;h=2a4d7349c534f37635d2087ddee2030386dfa2c8;hp=ec293ad53cb65b3fdcbd7bea17a1636397339fd0;hb=43ea59755f9673864a3ef95250009f091e99a760;hpb=cbfe4b796266ff7b9689fb69c5a8efee8ebb130a diff --git a/src/main/java/de/juplo/kafka/payment/transfer/persistence/InMemoryTransferRepository.java b/src/main/java/de/juplo/kafka/payment/transfer/persistence/InMemoryTransferRepository.java index ec293ad..2a4d734 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/persistence/InMemoryTransferRepository.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/persistence/InMemoryTransferRepository.java @@ -2,32 +2,41 @@ package de.juplo.kafka.payment.transfer.persistence; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import de.juplo.kafka.payment.transfer.adapter.TransferPartitioner; import de.juplo.kafka.payment.transfer.domain.Transfer; import de.juplo.kafka.payment.transfer.ports.TransferRepository; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Component; import java.util.HashMap; import java.util.Map; import java.util.Optional; -@Component -@RequiredArgsConstructor @Slf4j public class InMemoryTransferRepository implements TransferRepository { - private final Map map = new HashMap<>(); + private final int numPartitions; + private final Map mappings[]; private final ObjectMapper mapper; + public InMemoryTransferRepository(int numPartitions, ObjectMapper mapper) + { + this.numPartitions = numPartitions; + this.mappings = new HashMap[numPartitions]; + for (int i = 0; i < numPartitions; i++) + this.mappings[i] = new HashMap<>(); + this.mapper = mapper; + } + + @Override public void store(Transfer transfer) { try { - map.put(transfer.getId(), mapper.writeValueAsString(transfer)); + int partition = partitionForId(transfer.getId()); + mappings[partition].put(transfer.getId(), mapper.writeValueAsString(transfer)); } catch (JsonProcessingException e) { @@ -40,7 +49,8 @@ public class InMemoryTransferRepository implements TransferRepository { return Optional - .ofNullable(map.get(id)) + .ofNullable(this.mappings[partitionForId(id)]) + .map(mapping -> mapping.get(id)) .map(json -> { try { @@ -56,6 +66,22 @@ public class InMemoryTransferRepository implements TransferRepository @Override public void remove(Long id) { - map.remove(id); + mappings[partitionForId(id)].remove(id); + } + + @Override + public void resetStorageForPartition(int partition) + { + log.info( + "resetting storage for partition {}: dropping {} entries", + partition, + mappings[partition].size()); + mappings[partition].clear(); + } + + private int partitionForId(long id) + { + String key = Long.toString(id); + return TransferPartitioner.computeHashForKey(key, numPartitions); } }