Switched from single-node (assign) to multi-instance (subscribe)
[demos/kafka/demos-kafka-payment-system-transfer] / src / main / java / de / juplo / kafka / payment / transfer / persistence / InMemoryTransferRepository.java
index c5af531..2a4d734 100644 (file)
@@ -2,57 +2,55 @@ package de.juplo.kafka.payment.transfer.persistence;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.payment.transfer.adapter.TransferPartitioner;
 import de.juplo.kafka.payment.transfer.domain.Transfer;
 import de.juplo.kafka.payment.transfer.ports.TransferRepository;
-import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.springframework.stereotype.Component;
 
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
 
 
-@Component
-@RequiredArgsConstructor
 @Slf4j
 public class InMemoryTransferRepository implements TransferRepository
 {
-  private final Map<Long, String> map = new HashMap<>();
+  private final int numPartitions;
+  private final Map<Long, String> mappings[];
   private final ObjectMapper mapper;
 
 
-  @Override
-  public synchronized void store(Transfer transfer)
+  public InMemoryTransferRepository(int numPartitions, ObjectMapper mapper)
   {
-    Optional
-        .ofNullable(map.get(transfer.getId()))
-        .ifPresentOrElse(
-            json ->
-            {
-              throw new IllegalArgumentException("Could not overwrite " + json + " with " + transfer);
-            },
-            () -> put(transfer));
+    this.numPartitions = numPartitions;
+    this.mappings = new HashMap[numPartitions];
+    for (int i = 0; i < numPartitions; i++)
+      this.mappings[i] = new HashMap<>();
+    this.mapper = mapper;
   }
 
-  private void put(Transfer transfer)
+
+  @Override
+  public void store(Transfer transfer)
   {
     try
     {
-      map.put(transfer.getId(), mapper.writeValueAsString(transfer));
+      int partition = partitionForId(transfer.getId());
+      mappings[partition].put(transfer.getId(), mapper.writeValueAsString(transfer));
     }
     catch (JsonProcessingException e)
     {
-      log.error("Could not convert Transfer.class: {}", transfer, e);
+      throw new RuntimeException(e);
     }
   }
 
   @Override
-  public synchronized Optional<Transfer> get(Long id)
+  public Optional<Transfer> get(Long id)
   {
     return
         Optional
-            .ofNullable(map.get(id))
+            .ofNullable(this.mappings[partitionForId(id)])
+            .map(mapping -> mapping.get(id))
             .map(json -> {
               try
               {
@@ -66,20 +64,24 @@ public class InMemoryTransferRepository implements TransferRepository
   }
 
   @Override
-  public synchronized void update(Long id, Transfer.State oldState, Transfer.State newState)
+  public void remove(Long id)
   {
-    Transfer transfer = get(id).orElseThrow(() -> new IllegalArgumentException("Could not find transfer " + id));
-
-    if (transfer.getState() != oldState)
-      throw new IllegalArgumentException(("Unexpectd state for " + transfer + ", expected: " + oldState));
-
-    transfer.setState(newState);
-    put(transfer);
+    mappings[partitionForId(id)].remove(id);
   }
 
   @Override
-  public void remove(Long id)
+  public void resetStorageForPartition(int partition)
+  {
+    log.info(
+        "resetting storage for partition {}: dropping {} entries",
+        partition,
+        mappings[partition].size());
+    mappings[partition].clear();
+  }
+
+  private int partitionForId(long id)
   {
-    map.remove(id);
+    String key = Long.toString(id);
+    return TransferPartitioner.computeHashForKey(key, numPartitions);
   }
 }