import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.payment.transfer.adapter.TransferPartitioner;
import de.juplo.kafka.payment.transfer.domain.Transfer;
import de.juplo.kafka.payment.transfer.ports.TransferRepository;
-import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
-@Component
-@RequiredArgsConstructor
@Slf4j
public class InMemoryTransferRepository implements TransferRepository
{
- private final Map<Long, String> map = new HashMap<>();
+ private final int numPartitions;
+ private final Map<Long, String> mappings[];
private final ObjectMapper mapper;
+ public InMemoryTransferRepository(int numPartitions, ObjectMapper mapper)
+ {
+ this.numPartitions = numPartitions;
+ this.mappings = new HashMap[numPartitions];
+ for (int i = 0; i < numPartitions; i++)
+ this.mappings[i] = new HashMap<>();
+ this.mapper = mapper;
+ }
+
+
@Override
public void store(Transfer transfer)
{
try
{
- map.put(transfer.getId(), mapper.writeValueAsString(transfer));
+ int partition = partitionForId(transfer.getId());
+ mappings[partition].put(transfer.getId(), mapper.writeValueAsString(transfer));
}
catch (JsonProcessingException e)
{
{
return
Optional
- .ofNullable(map.get(id))
+ .ofNullable(this.mappings[partitionForId(id)])
+ .map(mapping -> mapping.get(id))
.map(json -> {
try
{
@Override
public void remove(Long id)
{
- map.remove(id);
+ mappings[partitionForId(id)].remove(id);
+ }
+
+ @Override
+ public void resetStorageForPartition(int partition)
+ {
+ log.info(
+ "resetting storage for partition {}: dropping {} entries",
+ partition,
+ mappings[partition].size());
+ mappings[partition].clear();
+ }
+
+ private int partitionForId(long id)
+ {
+ String key = Long.toString(id);
+ return TransferPartitioner.computeHashForKey(key, numPartitions);
}
}