The state is periodically stored in a local file, that is read on startup
[demos/kafka/demos-kafka-payment-system-transfer] / src / main / java / de / juplo / kafka / payment / transfer / persistence / InMemoryTransferRepository.java
index ec293ad..c29adf7 100644 (file)
@@ -2,32 +2,78 @@ package de.juplo.kafka.payment.transfer.persistence;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.payment.transfer.adapter.TransferPartitioner;
 import de.juplo.kafka.payment.transfer.domain.Transfer;
 import de.juplo.kafka.payment.transfer.ports.TransferRepository;
-import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
-import org.springframework.stereotype.Component;
 
+import java.io.*;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
+import java.util.stream.IntStream;
 
 
-@Component
-@RequiredArgsConstructor
 @Slf4j
 public class InMemoryTransferRepository implements TransferRepository
 {
-  private final Map<Long, String> map = new HashMap<>();
+  private final int numPartitions;
   private final ObjectMapper mapper;
 
+  private final Data data;
+  private final Optional<File> stateStore;
+
+
+  public InMemoryTransferRepository(Optional<File> stateStore, int numPartitions, ObjectMapper mapper)
+  {
+    this.stateStore = stateStore;
+    this.numPartitions = numPartitions;
+    this.mapper = mapper;
+
+    Data data = null;
+    try
+    {
+      if (stateStore.isPresent())
+      {
+        try (
+            FileInputStream fis = new FileInputStream(stateStore.get());
+            ObjectInputStream ois = new ObjectInputStream(fis))
+        {
+          data = (Data) ois.readObject();
+          final long offsets[] = data.offsets;
+          final Map<Long, String> map[] = data.mappings;
+          IntStream
+              .range(0, numPartitions)
+              .forEach(i -> log.info(
+                  "restored locally stored state for partition {}: position={}, entries={}",
+                  i,
+                  offsets[i],
+                  offsets[i] == 0 ? 0 : map[i].size()));
+          return;
+        }
+        catch (IOException | ClassNotFoundException e)
+        {
+          log.error("could not read state from local store {}: {}", stateStore.get(), e.getMessage());
+        }
+      }
+
+      log.info("will restore state from Kafka");
+      data = new Data(numPartitions);
+    }
+    finally
+    {
+      this.data = data;
+    }
+  }
+
 
   @Override
   public void store(Transfer transfer)
   {
     try
     {
-      map.put(transfer.getId(), mapper.writeValueAsString(transfer));
+      int partition = partitionForId(transfer.getId());
+      data.mappings[partition].put(transfer.getId(), mapper.writeValueAsString(transfer));
     }
     catch (JsonProcessingException e)
     {
@@ -40,7 +86,8 @@ public class InMemoryTransferRepository implements TransferRepository
   {
     return
         Optional
-            .ofNullable(map.get(id))
+            .ofNullable(data.mappings[partitionForId(id)])
+            .map(mapping -> mapping.get(id))
             .map(json -> {
               try
               {
@@ -56,6 +103,75 @@ public class InMemoryTransferRepository implements TransferRepository
   @Override
   public void remove(Long id)
   {
-    map.remove(id);
+    data.mappings[partitionForId(id)].remove(id);
+  }
+
+  @Override
+  public long activatePartition(int partition)
+  {
+    return data.offsets[partition];
+  }
+
+  @Override
+  public void deactivatePartition(int partition, long offset)
+  {
+    data.offsets[partition] = offset;
+  }
+
+  @Override
+  public long storedPosition(int partition)
+  {
+    return data.offsets[partition];
+  }
+
+  @Override
+  public void storeState(Map<Integer, Long> offsets)
+  {
+    offsets.forEach((partition, offset) -> data.offsets[partition] = offset);
+    stateStore.ifPresent(file ->
+    {
+      try (
+          FileOutputStream fos = new FileOutputStream(file);
+          ObjectOutputStream oos = new ObjectOutputStream(fos))
+      {
+        oos.writeObject(data);
+        IntStream
+            .range(0, numPartitions)
+            .forEach(i -> log.info(
+                "locally stored state for partition {}: position={}, entries={}",
+                i,
+                data.offsets[i],
+                data.offsets[i] == 0 ? 0 : data.mappings[i].size()));
+      }
+      catch (IOException e)
+      {
+        log.error("could not write state to store {}: {}", file, e.getMessage());
+      }
+    });
+  }
+
+
+  private int partitionForId(long id)
+  {
+    String key = Long.toString(id);
+    return TransferPartitioner.computeHashForKey(key, numPartitions);
+  }
+
+
+  static class Data implements Serializable
+  {
+    final long offsets[];
+    final Map<Long, String> mappings[];
+
+    Data(int numPartitions)
+    {
+      offsets = new long[numPartitions];
+      mappings = new Map[numPartitions];
+      for (int i = 0; i < numPartitions; i++)
+      {
+        offsets[i] = 0;
+        mappings[i] = new HashMap<>();
+      }
+    }
   }
 }