1 package de.juplo.kafka.payment.transfer.persistence;
3 import com.fasterxml.jackson.core.JsonProcessingException;
4 import com.fasterxml.jackson.databind.ObjectMapper;
5 import de.juplo.kafka.payment.transfer.adapter.TransferPartitioner;
6 import de.juplo.kafka.payment.transfer.domain.Transfer;
7 import de.juplo.kafka.payment.transfer.ports.TransferRepository;
8 import lombok.RequiredArgsConstructor;
9 import lombok.extern.slf4j.Slf4j;
10 import org.springframework.stereotype.Component;
12 import java.util.HashMap;
14 import java.util.Optional;
18 public class InMemoryTransferRepository implements TransferRepository
20 private final int numPartitions;
21 private final Map<Long, String> map[];
22 private final ObjectMapper mapper;
25 public InMemoryTransferRepository(int numPartitions, ObjectMapper mapper)
27 this.numPartitions = numPartitions;
28 this.map = new HashMap[numPartitions];
29 for (int i = 0; i < numPartitions; i++)
30 this.map[i] = new HashMap<>();
36 public void store(Transfer transfer)
40 int partition = partitionForId(transfer.getId());
41 map[partition].put(transfer.getId(), mapper.writeValueAsString(transfer));
43 catch (JsonProcessingException e)
45 throw new RuntimeException(e);
50 public Optional<Transfer> get(Long id)
54 .ofNullable(map[partitionForId(id)].get(id))
58 return mapper.readValue(json, Transfer.class);
60 catch (JsonProcessingException e)
62 throw new RuntimeException("Could not convert JSON: " + json, e);
68 public void remove(Long id)
70 map[partitionForId(id)].remove(id);
74 public void resetStorageForPartition(int partition)
77 "reseting storage for partition {}: dropping {} entries",
79 map[partition].size());
80 map[partition].clear();
83 private int partitionForId(long id)
85 String key = Long.toString(id);
86 return TransferPartitioner.computeHashForKey(key, numPartitions);