import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.payment.transfer.adapter.TransferPartitioner;
import de.juplo.kafka.payment.transfer.domain.Transfer;
import de.juplo.kafka.payment.transfer.ports.TransferRepository;
-import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
-import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
-@Component
-@RequiredArgsConstructor
@Slf4j
public class InMemoryTransferRepository implements TransferRepository
{
- private final Map<Long, String> map = new HashMap<>();
+ private final int numPartitions;
+ private final Map<Long, String> mappings[];
private final ObjectMapper mapper;
- @Override
- public synchronized void store(Transfer transfer)
+ public InMemoryTransferRepository(int numPartitions, ObjectMapper mapper)
{
- Optional
- .ofNullable(map.get(transfer.getId()))
- .ifPresentOrElse(
- json ->
- {
- throw new IllegalArgumentException("Could not overwrite " + json + " with " + transfer);
- },
- () -> put(transfer));
+ this.numPartitions = numPartitions;
+ this.mappings = new HashMap[numPartitions];
+ for (int i = 0; i < numPartitions; i++)
+ this.mappings[i] = new HashMap<>();
+ this.mapper = mapper;
}
- private void put(Transfer transfer)
+
+ @Override
+ public void store(Transfer transfer)
{
try
{
- map.put(transfer.getId(), mapper.writeValueAsString(transfer));
+ int partition = partitionForId(transfer.getId());
+ mappings[partition].put(transfer.getId(), mapper.writeValueAsString(transfer));
}
catch (JsonProcessingException e)
{
- log.error("Could not convert Transfer.class: {}", transfer, e);
+ throw new RuntimeException(e);
}
}
@Override
- public synchronized Optional<Transfer> get(Long id)
+ public Optional<Transfer> get(Long id)
{
return
Optional
- .ofNullable(map.get(id))
+ .ofNullable(this.mappings[partitionForId(id)])
+ .map(mapping -> mapping.get(id))
.map(json -> {
try
{
}
@Override
- public synchronized void update(Long id, Transfer.State oldState, Transfer.State newState)
+ public void remove(Long id)
{
- Transfer transfer = get(id).orElseThrow(() -> new IllegalArgumentException("Could not find transfer " + id));
-
- if (transfer.getState() != oldState)
- throw new IllegalArgumentException(("Unexpectd state for " + transfer + ", expected: " + oldState));
-
- transfer.setState(newState);
- put(transfer);
+ mappings[partitionForId(id)].remove(id);
}
@Override
- public void remove(Long id)
+ public void resetStorageForPartition(int partition)
+ {
+ log.info(
+ "resetting storage for partition {}: dropping {} entries",
+ partition,
+ mappings[partition].size());
+ mappings[partition].clear();
+ }
+
+ private int partitionForId(long id)
{
- map.remove(id);
+ String key = Long.toString(id);
+ return TransferPartitioner.computeHashForKey(key, numPartitions);
}
}