Switched from single-node (assign) to multi-instance (subscribe)
[demos/kafka/demos-kafka-payment-system-transfer] / src / main / java / de / juplo / kafka / payment / transfer / persistence / InMemoryTransferRepository.java
index ec293ad..2a4d734 100644 (file)
@@ -2,32 +2,41 @@ package de.juplo.kafka.payment.transfer.persistence;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.payment.transfer.adapter.TransferPartitioner;
 import de.juplo.kafka.payment.transfer.domain.Transfer;
 import de.juplo.kafka.payment.transfer.ports.TransferRepository;
 import de.juplo.kafka.payment.transfer.domain.Transfer;
 import de.juplo.kafka.payment.transfer.ports.TransferRepository;
-import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import lombok.extern.slf4j.Slf4j;
-import org.springframework.stereotype.Component;
 
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 
 
 
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 
 
-@Component
-@RequiredArgsConstructor
 @Slf4j
 public class InMemoryTransferRepository implements TransferRepository
 {
 @Slf4j
 public class InMemoryTransferRepository implements TransferRepository
 {
-  private final Map<Long, String> map = new HashMap<>();
+  private final int numPartitions;
+  private final Map<Long, String> mappings[];
   private final ObjectMapper mapper;
 
 
   private final ObjectMapper mapper;
 
 
+  public InMemoryTransferRepository(int numPartitions, ObjectMapper mapper)
+  {
+    this.numPartitions = numPartitions;
+    this.mappings = new HashMap[numPartitions];
+    for (int i = 0; i < numPartitions; i++)
+      this.mappings[i] = new HashMap<>();
+    this.mapper = mapper;
+  }
+
+
   @Override
   public void store(Transfer transfer)
   {
     try
     {
   @Override
   public void store(Transfer transfer)
   {
     try
     {
-      map.put(transfer.getId(), mapper.writeValueAsString(transfer));
+      int partition = partitionForId(transfer.getId());
+      mappings[partition].put(transfer.getId(), mapper.writeValueAsString(transfer));
     }
     catch (JsonProcessingException e)
     {
     }
     catch (JsonProcessingException e)
     {
@@ -40,7 +49,8 @@ public class InMemoryTransferRepository implements TransferRepository
   {
     return
         Optional
   {
     return
         Optional
-            .ofNullable(map.get(id))
+            .ofNullable(this.mappings[partitionForId(id)])
+            .map(mapping -> mapping.get(id))
             .map(json -> {
               try
               {
             .map(json -> {
               try
               {
@@ -56,6 +66,22 @@ public class InMemoryTransferRepository implements TransferRepository
   @Override
   public void remove(Long id)
   {
   @Override
   public void remove(Long id)
   {
-    map.remove(id);
+    mappings[partitionForId(id)].remove(id);
+  }
+
+  @Override
+  public void resetStorageForPartition(int partition)
+  {
+    log.info(
+        "resetting storage for partition {}: dropping {} entries",
+        partition,
+        mappings[partition].size());
+    mappings[partition].clear();
+  }
+
+  private int partitionForId(long id)
+  {
+    String key = Long.toString(id);
+    return TransferPartitioner.computeHashForKey(key, numPartitions);
   }
 }
   }
 }