State must only be stored locally, if the corresponding offset is known
authorKai Moritz <kai@juplo.de>
Tue, 29 Jun 2021 18:56:39 +0000 (20:56 +0200)
committerKai Moritz <kai@juplo.de>
Wed, 30 Jun 2021 16:56:04 +0000 (18:56 +0200)
src/main/java/de/juplo/kafka/payment/transfer/persistence/InMemoryTransferRepository.java

index c29adf7..dd1d7ad 100644 (file)
@@ -74,6 +74,9 @@ public class InMemoryTransferRepository implements TransferRepository
     {
       int partition = partitionForId(transfer.getId());
       data.mappings[partition].put(transfer.getId(), mapper.writeValueAsString(transfer));
+      // We reset the offset for the state of the modified partition,
+      // because the corresponding offset is not known (yet).
+      data.offsets[partition] = 0;
     }
     catch (JsonProcessingException e)
     {
@@ -109,6 +112,19 @@ public class InMemoryTransferRepository implements TransferRepository
   @Override
   public long activatePartition(int partition)
   {
+    if (data.offsets[partition] == 0)
+    {
+      // Initialize the state of the partition, if
+      // no corresponding offset is known.
+      if (data.mappings[partition] != null)
+        log.warn(
+            "dropping state for partition {} ({} entries), because the corresponding offset is unknown!",
+            partition,
+            data.mappings[partition].size());
+
+      data.mappings[partition] = new HashMap<>();
+    }
+
     return data.offsets[partition];
   }
 
@@ -128,6 +144,18 @@ public class InMemoryTransferRepository implements TransferRepository
   public void storeState(Map<Integer, Long> offsets)
   {
     offsets.forEach((partition, offset) -> data.offsets[partition] = offset);
+    for (int i = 0; i < numPartitions; i++)
+    {
+      if (data.offsets[i] == 0 && data.mappings[i] != null)
+      {
+        log.warn(
+            "dropping state for partition {} ({} entries), because the corresponding offset is unknown!",
+            i,
+            data.mappings[i].size());
+
+        data.mappings[i] = null;
+      }
+    }
     stateStore.ifPresent(file ->
     {
       try (
@@ -168,10 +196,7 @@ public class InMemoryTransferRepository implements TransferRepository
       offsets = new long[numPartitions];
       mappings = new Map[numPartitions];
       for (int i = 0; i < numPartitions; i++)
-      {
         offsets[i] = 0;
-        mappings[i] = new HashMap<>();
-      }
     }
   }
 }