The restore-process no longer happens inside onPartitionsAssigned()
[demos/kafka/demos-kafka-payment-system-transfer] / src / main / java / de / juplo / kafka / payment / transfer / adapter / TransferConsumer.java
index 501bfd0..1cae540 100644 (file)
@@ -2,6 +2,7 @@ package de.juplo.kafka.payment.transfer.adapter;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.payment.transfer.domain.Transfer;
 import de.juplo.kafka.payment.transfer.ports.CreateTransferUseCase;
 import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase;
 import de.juplo.kafka.payment.transfer.ports.HandleStateChangeUseCase;
@@ -21,12 +22,14 @@ import org.springframework.web.bind.annotation.PostMapping;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.ResponseBody;
 
+import java.time.Clock;
 import java.time.Duration;
+import java.time.Instant;
 import java.util.*;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
-import java.util.stream.Collectors;
+import java.util.function.Consumer;
 
 
 @RequestMapping("/consumer")
@@ -40,7 +43,7 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener
   private final AdminClient adminClient;
   private final TransferRepository repository;
   private final ObjectMapper mapper;
-  private final ConsumerUseCases productionUseCases, restoreUseCases;
+  private final ConsumerUseCases restoreUseCases;
 
   private boolean running = false;
   private boolean shutdown = false;
@@ -51,6 +54,13 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener
   private final Map<String, String> instanceIdUriMapping;
   private final String[] instanceIdByPartition;
 
+
+  private Clock clock;
+  private int stateStoreInterval;
+
+  private final Consumer<ConsumerRecord<String, String>> productionRecordHandler;
+  private final Consumer<ConsumerRecord<String, String>> recordHandlers[];
+
   private volatile boolean partitionOwnershipUnknown = true;
 
 
@@ -61,6 +71,8 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener
       KafkaConsumer<String, String> consumer,
       AdminClient adminClient,
       TransferRepository repository,
+      Clock clock,
+      int stateStoreInterval,
       ObjectMapper mapper,
       ConsumerUseCases productionUseCases,
       ConsumerUseCases restoreUseCases)
@@ -82,15 +94,21 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener
     this.consumer = consumer;
     this.adminClient = adminClient;
     this.repository = repository;
+    this.clock = clock;
+    this.stateStoreInterval = stateStoreInterval;
     this.mapper = mapper;
-    this.productionUseCases = productionUseCases;
     this.restoreUseCases = restoreUseCases;
+
+    productionRecordHandler = (record) -> handleRecord(record, productionUseCases);
+    this.recordHandlers = new Consumer[numPartitions];
   }
 
 
   @Override
   public void run()
   {
+    Instant stateStored = clock.instant();
+
     while (running)
     {
       try
@@ -100,7 +118,26 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener
           continue;
 
         log.debug("polled {} records", records.count());
-        records.forEach(record -> handleRecord(record, productionUseCases));
+        records.forEach(record -> recordHandlers[record.partition()].accept(record));
+
+        Instant now = clock.instant();
+        if (
+            stateStoreInterval > 0 &&
+            Duration.between(stateStored, now).getSeconds() >= stateStoreInterval)
+        {
+          Map<Integer, Long> offsets = new HashMap<>();
+
+          for (TopicPartition topicPartition : consumer.assignment())
+          {
+            Integer partition = topicPartition.partition();
+            Long offset = consumer.position(topicPartition);
+            log.info("storing state locally for {}/{}: {}", topic, partition, offset);
+            offsets.put(partition, offset);
+          }
+
+          repository.storeState(offsets);
+          stateStored = now;
+        }
       }
       catch (WakeupException e)
       {
@@ -161,6 +198,21 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener
   }
 
 
+  /**
+   * Identifies the URI, at which the Group-Instance can be reached,
+   * that holds the state for a specific {@link Transfer}.
+   *
+   * The {@link Transfer#getId() ID} of the {@link Transfer} is named
+   * {@code key} here and of type {@code String}, because this example
+   * project stores the key as a String in Kafka to simplify the listing
+   * and manual manipulation of the according topic.
+   *
+   * @param key A {@code String}, that represents the {@link Transfer#getId() ID} of a {@link Transfer}.
+   * @return An {@link Optional}, that holds the URI at which the Group-Instance
+   * can be reached, that holds the state for the {@link Transfer}, that
+   * is identified by the key (if present), or is empty, if the {@link Transfer}
+   * would be handled by the local instance.
+   */
   public Optional<String> uriForKey(String key)
   {
     synchronized (this)
@@ -187,6 +239,7 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener
     // Hence, the app would not react to any signal (CTRL-C, for example) except
     // a KILL until the restoring is finished.
     future = CompletableFuture.runAsync(() -> start());
+    log.info("start of application completed");
   }
 
 
@@ -195,6 +248,13 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener
   {
     partitionOwnershipUnknown = true;
     log.info("partitions revoked: {}", partitions);
+    for (TopicPartition topicPartition : partitions)
+    {
+      int partition = topicPartition.partition();
+      long offset = consumer.position(topicPartition);
+      log.info("deactivating partition {}, offset: {}", partition, offset);
+      repository.deactivatePartition(partition, offset);
+    }
   }
 
   @Override
@@ -203,7 +263,27 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener
     log.info("partitions assigned: {}", partitions);
     fetchAssignmentsAsync();
     if (partitions.size() > 0)
-      restore(partitions);
+    {
+      for (Map.Entry<TopicPartition, Long> entry : consumer.endOffsets(partitions).entrySet())
+      {
+        TopicPartition topicPartition = entry.getKey();
+        Integer partition = topicPartition.partition();
+        long offset = repository.activatePartition(partition);
+        log.info("activated partition {}, seeking to offset {}", partition, offset);
+        consumer.seek(topicPartition, offset);
+        Long endOffset = entry.getValue();
+        if (offset < endOffset)
+        {
+          log.info("--> starting restore of partition {}: {} -> {}", partition, offset, endOffset);
+          recordHandlers[partition] = new RestoreRecordHandler(endOffset);
+        }
+        else
+        {
+          log.info("--> partition {} is up-to-date, offset: {}", partition, offset);
+          recordHandlers[partition] = productionRecordHandler;
+        }
+      }
+    }
   }
 
   private void fetchAssignmentsAsync()
@@ -244,67 +324,51 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener
   }
 
 
-  private void restore(Collection<TopicPartition> partitions)
+  class RestoreRecordHandler implements Consumer<ConsumerRecord<String, String>>
   {
-    log.info("--> starting restore...");
-
-    partitions
-        .stream()
-        .map(topicPartition -> topicPartition.partition())
-        .forEach(partition -> repository.resetStorageForPartition(partition));
-
-    Map<Integer, Long> lastSeen =
-        consumer
-            .endOffsets(partitions)
-            .entrySet()
-            .stream()
-            .collect(Collectors.toMap(
-                entry -> entry.getKey().partition(),
-                entry -> entry.getValue() - 1));
-
-    Map<Integer, Long> positions =
-        lastSeen
-            .keySet()
-            .stream()
-            .collect(Collectors.toMap(
-                partition -> partition,
-                partition -> 0l));
-
-    while (
-        positions
-            .entrySet()
-            .stream()
-            .map(entry -> entry.getValue() < lastSeen.get(entry.getKey()))
-            .reduce(false, (a, b) -> a || b))
+    final long seen;
+
+
+    RestoreRecordHandler(Long endOffset)
     {
-      try
-      {
-        ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
-        if (records.count() == 0)
-          continue;
+      this.seen = endOffset - 1;
+    }
 
-        log.debug("polled {} records", records.count());
-        records.forEach(record ->
-        {
-          handleRecord(record, restoreUseCases);
-          positions.put(record.partition(), record.offset());
-        });
+
+    @Override
+    public void accept(ConsumerRecord<String, String> record)
+    {
+      if (seen < record.offset())
+      {
+        int partition = record.partition();
+        log.info(
+            "--> restore of partition {} completed: needed={}, seen={}!",
+            partition,
+            seen,
+            record.offset());
+        recordHandlers[partition] = productionRecordHandler;
+        productionRecordHandler.accept(record);
       }
-      catch(WakeupException e)
+      else
       {
-        log.info("--> cleanly interrupted while restoring");
+        handleRecord(record, restoreUseCases);
+        if (seen == record.offset())
+        {
+          int partition = record.partition();
+          log.info( "--> restore of partition {} completed!", partition);
+          recordHandlers[partition] = productionRecordHandler;
+        }
       }
     }
-
-    log.info("--> restore completed!");
   }
 
+
   @PostMapping("start")
   public synchronized String start()
   {
     if (running)
     {
-      log.info("already running!");
+      log.info("consumer already running!");
       return "Already running!";
     }
 
@@ -325,7 +389,7 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener
     running = true;
     future = CompletableFuture.runAsync(this);
 
-    log.info("started");
+    log.info("consumer started");
     return "Started";
   }
 
@@ -334,7 +398,7 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener
   {
     if (!running)
     {
-      log.info("not running!");
+      log.info("consumer not running!");
       return "Not running";
     }
 
@@ -359,7 +423,7 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener
       consumer.unsubscribe();
     }
 
-    log.info("stopped");
+    log.info("consumer stopped");
     return "Stopped";
   }