X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fpayment%2Ftransfer%2Fpersistence%2FInMemoryTransferRepository.java;h=2a4d7349c534f37635d2087ddee2030386dfa2c8;hb=43ea59755f9673864a3ef95250009f091e99a760;hp=c5af531c3412468f8cf23ff3d48a1de4a221153c;hpb=6191849fee717b080118717c86df79fad12bafc8;p=demos%2Fkafka%2Fdemos-kafka-payment-system-transfer diff --git a/src/main/java/de/juplo/kafka/payment/transfer/persistence/InMemoryTransferRepository.java b/src/main/java/de/juplo/kafka/payment/transfer/persistence/InMemoryTransferRepository.java index c5af531..2a4d734 100644 --- a/src/main/java/de/juplo/kafka/payment/transfer/persistence/InMemoryTransferRepository.java +++ b/src/main/java/de/juplo/kafka/payment/transfer/persistence/InMemoryTransferRepository.java @@ -2,57 +2,55 @@ package de.juplo.kafka.payment.transfer.persistence; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import de.juplo.kafka.payment.transfer.adapter.TransferPartitioner; import de.juplo.kafka.payment.transfer.domain.Transfer; import de.juplo.kafka.payment.transfer.ports.TransferRepository; -import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Component; import java.util.HashMap; import java.util.Map; import java.util.Optional; -@Component -@RequiredArgsConstructor @Slf4j public class InMemoryTransferRepository implements TransferRepository { - private final Map map = new HashMap<>(); + private final int numPartitions; + private final Map mappings[]; private final ObjectMapper mapper; - @Override - public synchronized void store(Transfer transfer) + public InMemoryTransferRepository(int numPartitions, ObjectMapper mapper) { - Optional - .ofNullable(map.get(transfer.getId())) - .ifPresentOrElse( - json -> - { - throw new IllegalArgumentException("Could not overwrite " + json + " with " + transfer); - }, - () -> put(transfer)); + this.numPartitions = numPartitions; + this.mappings = new HashMap[numPartitions]; + for (int i = 0; i < numPartitions; i++) + this.mappings[i] = new HashMap<>(); + this.mapper = mapper; } - private void put(Transfer transfer) + + @Override + public void store(Transfer transfer) { try { - map.put(transfer.getId(), mapper.writeValueAsString(transfer)); + int partition = partitionForId(transfer.getId()); + mappings[partition].put(transfer.getId(), mapper.writeValueAsString(transfer)); } catch (JsonProcessingException e) { - log.error("Could not convert Transfer.class: {}", transfer, e); + throw new RuntimeException(e); } } @Override - public synchronized Optional get(Long id) + public Optional get(Long id) { return Optional - .ofNullable(map.get(id)) + .ofNullable(this.mappings[partitionForId(id)]) + .map(mapping -> mapping.get(id)) .map(json -> { try { @@ -66,20 +64,24 @@ public class InMemoryTransferRepository implements TransferRepository } @Override - public synchronized void update(Long id, Transfer.State oldState, Transfer.State newState) + public void remove(Long id) { - Transfer transfer = get(id).orElseThrow(() -> new IllegalArgumentException("Could not find transfer " + id)); - - if (transfer.getState() != oldState) - throw new IllegalArgumentException(("Unexpectd state for " + transfer + ", expected: " + oldState)); - - transfer.setState(newState); - put(transfer); + mappings[partitionForId(id)].remove(id); } @Override - public void remove(Long id) + public void resetStorageForPartition(int partition) + { + log.info( + "resetting storage for partition {}: dropping {} entries", + partition, + mappings[partition].size()); + mappings[partition].clear(); + } + + private int partitionForId(long id) { - map.remove(id); + String key = Long.toString(id); + return TransferPartitioner.computeHashForKey(key, numPartitions); } }