The state is periodically stored in a local file, that is read on startup
authorKai Moritz <kai@juplo.de>
Tue, 29 Jun 2021 18:51:54 +0000 (20:51 +0200)
committerKai Moritz <kai@juplo.de>
Wed, 30 Jun 2021 16:46:26 +0000 (18:46 +0200)
application.yml
src/main/java/de/juplo/kafka/payment/transfer/TransferServiceApplication.java
src/main/java/de/juplo/kafka/payment/transfer/TransferServiceProperties.java
src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java
src/main/java/de/juplo/kafka/payment/transfer/persistence/InMemoryTransferRepository.java
src/main/java/de/juplo/kafka/payment/transfer/ports/TransferRepository.java

index 818a596..daac570 100644 (file)
@@ -1,3 +1,5 @@
 juplo:
   transfer:
     group-instance-id: peter
+    state-store-interval: 15
+    local-state-store-path: state.bin
index a82f8b1..5114a1c 100644 (file)
@@ -7,6 +7,7 @@ import de.juplo.kafka.payment.transfer.domain.Transfer;
 import de.juplo.kafka.payment.transfer.persistence.InMemoryTransferRepository;
 import de.juplo.kafka.payment.transfer.ports.TransferRepository;
 import de.juplo.kafka.payment.transfer.ports.TransferService;
+import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.AdminClientConfig;
@@ -22,7 +23,13 @@ import org.springframework.boot.autoconfigure.SpringBootApplication;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.annotation.Bean;
 import org.springframework.util.Assert;
+import org.springframework.util.StringUtils;
 
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.time.Clock;
 import java.util.Optional;
 import java.util.Properties;
 
@@ -87,6 +94,7 @@ public class TransferServiceApplication
       KafkaConsumer<String, String> consumer,
       AdminClient adminClient,
       TransferRepository repository,
+      LocalStateStoreSettings localStateStoreSettings,
       ObjectMapper mapper,
       TransferService productionTransferService,
       TransferService restoreTransferService)
@@ -99,6 +107,8 @@ public class TransferServiceApplication
             consumer,
             adminClient,
             repository,
+            Clock.systemDefaultZone(),
+            localStateStoreSettings.interval,
             mapper,
             new TransferConsumer.ConsumerUseCases() {
               @Override
@@ -149,12 +159,58 @@ public class TransferServiceApplication
     return new KafkaMessagingService(producer, mapper, properties.getTopic());
   }
 
+  @RequiredArgsConstructor
+  static class LocalStateStoreSettings
+  {
+    final Optional<File> file;
+    final int interval;
+  }
+
+  @Bean
+  LocalStateStoreSettings localStateStoreSettings(TransferServiceProperties properties)
+  {
+    if (properties.getStateStoreInterval() < 1)
+    {
+      log.info("juplo.transfer.state-store-interval is < 1: local storage of state is deactivated");
+      return new LocalStateStoreSettings(Optional.empty(), 0);
+    }
+
+    if (!StringUtils.hasText(properties.getLocalStateStorePath()))
+    {
+      log.info("juplo.transfer.local-state-store-path is not set: local storage of state is deactivated!");
+      return new LocalStateStoreSettings(Optional.empty(), 0);
+    }
+
+    Path path = Path.of(properties.getLocalStateStorePath());
+    log.info("using {} as local state store", path.toAbsolutePath());
+
+    if (Files.notExists(path))
+    {
+      try
+      {
+        Files.createFile(path);
+      }
+      catch (IOException e)
+      {
+        throw new IllegalArgumentException("Could not create local state store: " + path.toAbsolutePath());
+      }
+    }
+
+    if (!(Files.isReadable(path) && Files.isWritable(path)))
+    {
+      throw new IllegalArgumentException("No R/W-access on local state store: " + path.toAbsolutePath());
+    }
+
+    return new LocalStateStoreSettings(Optional.of(path.toFile()), properties.getStateStoreInterval());
+  }
+
   @Bean
   InMemoryTransferRepository inMemoryTransferRepository(
+      LocalStateStoreSettings localStateStoreSettings,
       TransferServiceProperties properties,
       ObjectMapper mapper)
   {
-    return new InMemoryTransferRepository(properties.getNumPartitions(), mapper);
+    return new InMemoryTransferRepository(localStateStoreSettings.file, properties.getNumPartitions(), mapper);
   }
 
   @Bean
index e001748..907a8b9 100644 (file)
@@ -20,6 +20,8 @@ public class TransferServiceProperties
   private String groupId = "transfers";
   private String groupInstanceId;
   private Map<String, String> instanceIdUriMapping;
+  private String localStateStorePath;
+  private int stateStoreInterval = 60;
 
   public Map<String, String> getInstanceIdUriMapping()
   {
index 2ef7ee3..aa00737 100644 (file)
@@ -21,7 +21,9 @@ import org.springframework.web.bind.annotation.PostMapping;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.ResponseBody;
 
+import java.time.Clock;
 import java.time.Duration;
+import java.time.Instant;
 import java.util.*;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -51,6 +53,9 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener
   private final Map<String, String> instanceIdUriMapping;
   private final String[] instanceIdByPartition;
 
+  private Clock clock;
+  private int stateStoreInterval;
+
   private volatile boolean partitionOwnershipUnknown = true;
 
 
@@ -61,6 +66,8 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener
       KafkaConsumer<String, String> consumer,
       AdminClient adminClient,
       TransferRepository repository,
+      Clock clock,
+      int stateStoreInterval,
       ObjectMapper mapper,
       ConsumerUseCases productionUseCases,
       ConsumerUseCases restoreUseCases)
@@ -82,6 +89,8 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener
     this.consumer = consumer;
     this.adminClient = adminClient;
     this.repository = repository;
+    this.clock = clock;
+    this.stateStoreInterval = stateStoreInterval;
     this.mapper = mapper;
     this.productionUseCases = productionUseCases;
     this.restoreUseCases = restoreUseCases;
@@ -91,6 +100,8 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener
   @Override
   public void run()
   {
+    Instant stateStored = clock.instant();
+
     while (running)
     {
       try
@@ -101,6 +112,25 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener
 
         log.debug("polled {} records", records.count());
         records.forEach(record -> handleRecord(record, productionUseCases));
+
+        Instant now = clock.instant();
+        if (
+            stateStoreInterval > 0 &&
+            Duration.between(stateStored, now).getSeconds() >= stateStoreInterval)
+        {
+          Map<Integer, Long> offsets = new HashMap<>();
+
+          for (TopicPartition topicPartition : consumer.assignment())
+          {
+            Integer partition = topicPartition.partition();
+            Long offset = consumer.position(topicPartition);
+            log.info("storing state locally for {}/{}: {}", topic, partition, offset);
+            offsets.put(partition, offset);
+          }
+
+          repository.storeState(offsets);
+          stateStored = now;
+        }
       }
       catch (WakeupException e)
       {
@@ -196,6 +226,13 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener
   {
     partitionOwnershipUnknown = true;
     log.info("partitions revoked: {}", partitions);
+    for (TopicPartition topicPartition : partitions)
+    {
+      int partition = topicPartition.partition();
+      long offset = consumer.position(topicPartition);
+      log.info("deactivating partition {}, offset: {}", partition, offset);
+      repository.deactivatePartition(partition, offset);
+    }
   }
 
   @Override
@@ -204,7 +241,17 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener
     log.info("partitions assigned: {}", partitions);
     fetchAssignmentsAsync();
     if (partitions.size() > 0)
+    {
+      for (TopicPartition topicPartition : partitions)
+      {
+        int partition = topicPartition.partition();
+        long offset = repository.activatePartition(partition);
+        log.info("activated partition {}, seeking to offset {}", partition, offset);
+        consumer.seek(topicPartition, offset);
+      }
+
       restore(partitions);
+    }
   }
 
   private void fetchAssignmentsAsync()
@@ -249,11 +296,6 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener
   {
     log.info("--> starting restore...");
 
-    partitions
-        .stream()
-        .map(topicPartition -> topicPartition.partition())
-        .forEach(partition -> repository.resetStorageForPartition(partition));
-
     Map<Integer, Long> lastSeen =
         consumer
             .endOffsets(partitions)
@@ -269,7 +311,7 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener
             .stream()
             .collect(Collectors.toMap(
                 partition -> partition,
-                partition -> 0l));
+                partition -> repository.storedPosition(partition)));
 
     while (
         positions
index 2a4d734..c29adf7 100644 (file)
@@ -7,26 +7,63 @@ import de.juplo.kafka.payment.transfer.domain.Transfer;
 import de.juplo.kafka.payment.transfer.ports.TransferRepository;
 import lombok.extern.slf4j.Slf4j;
 
+import java.io.*;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
+import java.util.stream.IntStream;
 
 
 @Slf4j
 public class InMemoryTransferRepository implements TransferRepository
 {
   private final int numPartitions;
-  private final Map<Long, String> mappings[];
   private final ObjectMapper mapper;
 
+  private final Data data;
+  private final Optional<File> stateStore;
 
-  public InMemoryTransferRepository(int numPartitions, ObjectMapper mapper)
+
+  public InMemoryTransferRepository(Optional<File> stateStore, int numPartitions, ObjectMapper mapper)
   {
+    this.stateStore = stateStore;
     this.numPartitions = numPartitions;
-    this.mappings = new HashMap[numPartitions];
-    for (int i = 0; i < numPartitions; i++)
-      this.mappings[i] = new HashMap<>();
     this.mapper = mapper;
+
+    Data data = null;
+    try
+    {
+      if (stateStore.isPresent())
+      {
+        try (
+            FileInputStream fis = new FileInputStream(stateStore.get());
+            ObjectInputStream ois = new ObjectInputStream(fis))
+        {
+          data = (Data) ois.readObject();
+          final long offsets[] = data.offsets;
+          final Map<Long, String> map[] = data.mappings;
+          IntStream
+              .range(0, numPartitions)
+              .forEach(i -> log.info(
+                  "restored locally stored state for partition {}: position={}, entries={}",
+                  i,
+                  offsets[i],
+                  offsets[i] == 0 ? 0 : map[i].size()));
+          return;
+        }
+        catch (IOException | ClassNotFoundException e)
+        {
+          log.error("could not read state from local store {}: {}", stateStore.get(), e.getMessage());
+        }
+      }
+
+      log.info("will restore state from Kafka");
+      data = new Data(numPartitions);
+    }
+    finally
+    {
+      this.data = data;
+    }
   }
 
 
@@ -36,7 +73,7 @@ public class InMemoryTransferRepository implements TransferRepository
     try
     {
       int partition = partitionForId(transfer.getId());
-      mappings[partition].put(transfer.getId(), mapper.writeValueAsString(transfer));
+      data.mappings[partition].put(transfer.getId(), mapper.writeValueAsString(transfer));
     }
     catch (JsonProcessingException e)
     {
@@ -49,7 +86,7 @@ public class InMemoryTransferRepository implements TransferRepository
   {
     return
         Optional
-            .ofNullable(this.mappings[partitionForId(id)])
+            .ofNullable(data.mappings[partitionForId(id)])
             .map(mapping -> mapping.get(id))
             .map(json -> {
               try
@@ -66,22 +103,75 @@ public class InMemoryTransferRepository implements TransferRepository
   @Override
   public void remove(Long id)
   {
-    mappings[partitionForId(id)].remove(id);
+    data.mappings[partitionForId(id)].remove(id);
   }
 
   @Override
-  public void resetStorageForPartition(int partition)
+  public long activatePartition(int partition)
   {
-    log.info(
-        "resetting storage for partition {}: dropping {} entries",
-        partition,
-        mappings[partition].size());
-    mappings[partition].clear();
+    return data.offsets[partition];
   }
 
+  @Override
+  public void deactivatePartition(int partition, long offset)
+  {
+    data.offsets[partition] = offset;
+  }
+
+  @Override
+  public long storedPosition(int partition)
+  {
+    return data.offsets[partition];
+  }
+
+  @Override
+  public void storeState(Map<Integer, Long> offsets)
+  {
+    offsets.forEach((partition, offset) -> data.offsets[partition] = offset);
+    stateStore.ifPresent(file ->
+    {
+      try (
+          FileOutputStream fos = new FileOutputStream(file);
+          ObjectOutputStream oos = new ObjectOutputStream(fos))
+      {
+        oos.writeObject(data);
+        IntStream
+            .range(0, numPartitions)
+            .forEach(i -> log.info(
+                "locally stored state for partition {}: position={}, entries={}",
+                i,
+                data.offsets[i],
+                data.offsets[i] == 0 ? 0 : data.mappings[i].size()));
+      }
+      catch (IOException e)
+      {
+        log.error("could not write state to store {}: {}", file, e.getMessage());
+      }
+    });
+  }
+
+
   private int partitionForId(long id)
   {
     String key = Long.toString(id);
     return TransferPartitioner.computeHashForKey(key, numPartitions);
   }
+
+
+  static class Data implements Serializable
+  {
+    final long offsets[];
+    final Map<Long, String> mappings[];
+
+    Data(int numPartitions)
+    {
+      offsets = new long[numPartitions];
+      mappings = new Map[numPartitions];
+      for (int i = 0; i < numPartitions; i++)
+      {
+        offsets[i] = 0;
+        mappings[i] = new HashMap<>();
+      }
+    }
+  }
 }
index 0629eab..f21604d 100644 (file)
@@ -2,6 +2,7 @@ package de.juplo.kafka.payment.transfer.ports;
 
 import de.juplo.kafka.payment.transfer.domain.Transfer;
 
+import java.util.Map;
 import java.util.Optional;
 
 
@@ -13,5 +14,11 @@ public interface TransferRepository
 
   void remove(Long id);
 
-  void resetStorageForPartition(int partition);
+  long activatePartition(int partition);
+
+  void deactivatePartition(int partition, long offset);
+
+  long storedPosition(int partition);
+
+  void storeState(Map<Integer, Long> offsets);
 }