1 package de.juplo.kafka.payment.transfer.persistence;
3 import com.fasterxml.jackson.core.JsonProcessingException;
4 import com.fasterxml.jackson.databind.ObjectMapper;
5 import de.juplo.kafka.payment.transfer.adapter.TransferPartitioner;
6 import de.juplo.kafka.payment.transfer.domain.Transfer;
7 import de.juplo.kafka.payment.transfer.ports.TransferRepository;
8 import lombok.extern.slf4j.Slf4j;
10 import java.util.HashMap;
12 import java.util.Optional;
16 public class InMemoryTransferRepository implements TransferRepository
18 private final int numPartitions;
19 private final Map<Long, String> mappings[];
20 private final ObjectMapper mapper;
23 public InMemoryTransferRepository(int numPartitions, ObjectMapper mapper)
25 this.numPartitions = numPartitions;
26 this.mappings = new HashMap[numPartitions];
27 for (int i = 0; i < numPartitions; i++)
28 this.mappings[i] = new HashMap<>();
34 public void store(Transfer transfer)
38 int partition = partitionForId(transfer.getId());
39 mappings[partition].put(transfer.getId(), mapper.writeValueAsString(transfer));
41 catch (JsonProcessingException e)
43 throw new RuntimeException(e);
48 public Optional<Transfer> get(Long id)
52 .ofNullable(this.mappings[partitionForId(id)])
53 .map(mapping -> mapping.get(id))
57 return mapper.readValue(json, Transfer.class);
59 catch (JsonProcessingException e)
61 throw new RuntimeException("Could not convert JSON: " + json, e);
67 public void remove(Long id)
69 mappings[partitionForId(id)].remove(id);
73 public void resetStorageForPartition(int partition)
76 "resetting storage for partition {}: dropping {} entries",
78 mappings[partition].size());
79 mappings[partition].clear();
82 private int partitionForId(long id)
84 String key = Long.toString(id);
85 return TransferPartitioner.computeHashForKey(key, numPartitions);