The state is periodically stored in a local file, that is read on startup
[demos/kafka/demos-kafka-payment-system-transfer] / src / main / java / de / juplo / kafka / payment / transfer / persistence / InMemoryTransferRepository.java
index 2a4d734..c29adf7 100644 (file)
@@ -7,26 +7,63 @@ import de.juplo.kafka.payment.transfer.domain.Transfer;
 import de.juplo.kafka.payment.transfer.ports.TransferRepository;
 import lombok.extern.slf4j.Slf4j;
 
+import java.io.*;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
+import java.util.stream.IntStream;
 
 
 @Slf4j
 public class InMemoryTransferRepository implements TransferRepository
 {
   private final int numPartitions;
-  private final Map<Long, String> mappings[];
   private final ObjectMapper mapper;
 
+  private final Data data;
+  private final Optional<File> stateStore;
 
-  public InMemoryTransferRepository(int numPartitions, ObjectMapper mapper)
+
+  public InMemoryTransferRepository(Optional<File> stateStore, int numPartitions, ObjectMapper mapper)
   {
+    this.stateStore = stateStore;
     this.numPartitions = numPartitions;
-    this.mappings = new HashMap[numPartitions];
-    for (int i = 0; i < numPartitions; i++)
-      this.mappings[i] = new HashMap<>();
     this.mapper = mapper;
+
+    Data data = null;
+    try
+    {
+      if (stateStore.isPresent())
+      {
+        try (
+            FileInputStream fis = new FileInputStream(stateStore.get());
+            ObjectInputStream ois = new ObjectInputStream(fis))
+        {
+          data = (Data) ois.readObject();
+          final long offsets[] = data.offsets;
+          final Map<Long, String> map[] = data.mappings;
+          IntStream
+              .range(0, numPartitions)
+              .forEach(i -> log.info(
+                  "restored locally stored state for partition {}: position={}, entries={}",
+                  i,
+                  offsets[i],
+                  offsets[i] == 0 ? 0 : map[i].size()));
+          return;
+        }
+        catch (IOException | ClassNotFoundException e)
+        {
+          log.error("could not read state from local store {}: {}", stateStore.get(), e.getMessage());
+        }
+      }
+
+      log.info("will restore state from Kafka");
+      data = new Data(numPartitions);
+    }
+    finally
+    {
+      this.data = data;
+    }
   }
 
 
@@ -36,7 +73,7 @@ public class InMemoryTransferRepository implements TransferRepository
     try
     {
       int partition = partitionForId(transfer.getId());
-      mappings[partition].put(transfer.getId(), mapper.writeValueAsString(transfer));
+      data.mappings[partition].put(transfer.getId(), mapper.writeValueAsString(transfer));
     }
     catch (JsonProcessingException e)
     {
@@ -49,7 +86,7 @@ public class InMemoryTransferRepository implements TransferRepository
   {
     return
         Optional
-            .ofNullable(this.mappings[partitionForId(id)])
+            .ofNullable(data.mappings[partitionForId(id)])
             .map(mapping -> mapping.get(id))
             .map(json -> {
               try
@@ -66,22 +103,75 @@ public class InMemoryTransferRepository implements TransferRepository
   @Override
   public void remove(Long id)
   {
-    mappings[partitionForId(id)].remove(id);
+    data.mappings[partitionForId(id)].remove(id);
   }
 
   @Override
-  public void resetStorageForPartition(int partition)
+  public long activatePartition(int partition)
   {
-    log.info(
-        "resetting storage for partition {}: dropping {} entries",
-        partition,
-        mappings[partition].size());
-    mappings[partition].clear();
+    return data.offsets[partition];
   }
 
+  @Override
+  public void deactivatePartition(int partition, long offset)
+  {
+    data.offsets[partition] = offset;
+  }
+
+  @Override
+  public long storedPosition(int partition)
+  {
+    return data.offsets[partition];
+  }
+
+  @Override
+  public void storeState(Map<Integer, Long> offsets)
+  {
+    offsets.forEach((partition, offset) -> data.offsets[partition] = offset);
+    stateStore.ifPresent(file ->
+    {
+      try (
+          FileOutputStream fos = new FileOutputStream(file);
+          ObjectOutputStream oos = new ObjectOutputStream(fos))
+      {
+        oos.writeObject(data);
+        IntStream
+            .range(0, numPartitions)
+            .forEach(i -> log.info(
+                "locally stored state for partition {}: position={}, entries={}",
+                i,
+                data.offsets[i],
+                data.offsets[i] == 0 ? 0 : data.mappings[i].size()));
+      }
+      catch (IOException e)
+      {
+        log.error("could not write state to store {}: {}", file, e.getMessage());
+      }
+    });
+  }
+
+
   private int partitionForId(long id)
   {
     String key = Long.toString(id);
     return TransferPartitioner.computeHashForKey(key, numPartitions);
   }
+
+
+  static class Data implements Serializable
+  {
+    final long offsets[];
+    final Map<Long, String> mappings[];
+
+    Data(int numPartitions)
+    {
+      offsets = new long[numPartitions];
+      mappings = new Map[numPartitions];
+      for (int i = 0; i < numPartitions; i++)
+      {
+        offsets[i] = 0;
+        mappings[i] = new HashMap<>();
+      }
+    }
+  }
 }