Switched from single-node (assign) to multi-instance (subscribe)
[demos/kafka/demos-kafka-payment-system-transfer] / src / main / java / de / juplo / kafka / payment / transfer / persistence / InMemoryTransferRepository.java
index ec293ad..eaad3ee 100644 (file)
@@ -2,6 +2,7 @@ package de.juplo.kafka.payment.transfer.persistence;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.payment.transfer.adapter.TransferPartitioner;
 import de.juplo.kafka.payment.transfer.domain.Transfer;
 import de.juplo.kafka.payment.transfer.ports.TransferRepository;
 import lombok.RequiredArgsConstructor;
@@ -13,21 +14,31 @@ import java.util.Map;
 import java.util.Optional;
 
 
-@Component
-@RequiredArgsConstructor
 @Slf4j
 public class InMemoryTransferRepository implements TransferRepository
 {
-  private final Map<Long, String> map = new HashMap<>();
+  private final int numPartitions;
+  private final Map<Long, String> map[];
   private final ObjectMapper mapper;
 
 
+  public InMemoryTransferRepository(int numPartitions, ObjectMapper mapper)
+  {
+    this.numPartitions = numPartitions;
+    this.map = new HashMap[numPartitions];
+    for (int i = 0; i < numPartitions; i++)
+      this.map[i] = new HashMap<>();
+    this.mapper = mapper;
+  }
+
+
   @Override
   public void store(Transfer transfer)
   {
     try
     {
-      map.put(transfer.getId(), mapper.writeValueAsString(transfer));
+      int partition = partitionForId(transfer.getId());
+      map[partition].put(transfer.getId(), mapper.writeValueAsString(transfer));
     }
     catch (JsonProcessingException e)
     {
@@ -40,7 +51,7 @@ public class InMemoryTransferRepository implements TransferRepository
   {
     return
         Optional
-            .ofNullable(map.get(id))
+            .ofNullable(map[partitionForId(id)].get(id))
             .map(json -> {
               try
               {
@@ -56,6 +67,22 @@ public class InMemoryTransferRepository implements TransferRepository
   @Override
   public void remove(Long id)
   {
-    map.remove(id);
+    map[partitionForId(id)].remove(id);
+  }
+
+  @Override
+  public void resetStorageForPartition(int partition)
+  {
+    log.info(
+        "reseting storage for partition {}: dropping {} entries",
+        partition,
+        map[partition].size());
+    map[partition].clear();
+  }
+
+  private int partitionForId(long id)
+  {
+    String key = Long.toString(id);
+    return TransferPartitioner.computeHashForKey(key, numPartitions);
   }
 }