The restore-process no longer happens inside onPartitionsAssigned()
authorKai Moritz <kai@juplo.de>
Sun, 27 Jun 2021 13:19:42 +0000 (15:19 +0200)
committerKai Moritz <kai@juplo.de>
Wed, 30 Jun 2021 16:57:54 +0000 (18:57 +0200)
src/main/java/de/juplo/kafka/payment/transfer/adapter/TransferConsumer.java

index 920d79a..1cae540 100644 (file)
@@ -29,7 +29,7 @@ import java.util.*;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
-import java.util.stream.Collectors;
+import java.util.function.Consumer;
 
 
 @RequestMapping("/consumer")
@@ -43,7 +43,7 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener
   private final AdminClient adminClient;
   private final TransferRepository repository;
   private final ObjectMapper mapper;
-  private final ConsumerUseCases productionUseCases, restoreUseCases;
+  private final ConsumerUseCases restoreUseCases;
 
   private boolean running = false;
   private boolean shutdown = false;
@@ -54,9 +54,13 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener
   private final Map<String, String> instanceIdUriMapping;
   private final String[] instanceIdByPartition;
 
+
   private Clock clock;
   private int stateStoreInterval;
 
+  private final Consumer<ConsumerRecord<String, String>> productionRecordHandler;
+  private final Consumer<ConsumerRecord<String, String>> recordHandlers[];
+
   private volatile boolean partitionOwnershipUnknown = true;
 
 
@@ -93,8 +97,10 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener
     this.clock = clock;
     this.stateStoreInterval = stateStoreInterval;
     this.mapper = mapper;
-    this.productionUseCases = productionUseCases;
     this.restoreUseCases = restoreUseCases;
+
+    productionRecordHandler = (record) -> handleRecord(record, productionUseCases);
+    this.recordHandlers = new Consumer[numPartitions];
   }
 
 
@@ -112,7 +118,7 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener
           continue;
 
         log.debug("polled {} records", records.count());
-        records.forEach(record -> handleRecord(record, productionUseCases));
+        records.forEach(record -> recordHandlers[record.partition()].accept(record));
 
         Instant now = clock.instant();
         if (
@@ -258,15 +264,25 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener
     fetchAssignmentsAsync();
     if (partitions.size() > 0)
     {
-      for (TopicPartition topicPartition : partitions)
+      for (Map.Entry<TopicPartition, Long> entry : consumer.endOffsets(partitions).entrySet())
       {
-        int partition = topicPartition.partition();
+        TopicPartition topicPartition = entry.getKey();
+        Integer partition = topicPartition.partition();
         long offset = repository.activatePartition(partition);
         log.info("activated partition {}, seeking to offset {}", partition, offset);
         consumer.seek(topicPartition, offset);
+        Long endOffset = entry.getValue();
+        if (offset < endOffset)
+        {
+          log.info("--> starting restore of partition {}: {} -> {}", partition, offset, endOffset);
+          recordHandlers[partition] = new RestoreRecordHandler(endOffset);
+        }
+        else
+        {
+          log.info("--> partition {} is up-to-date, offset: {}", partition, offset);
+          recordHandlers[partition] = productionRecordHandler;
+        }
       }
-
-      restore(partitions);
     }
   }
 
@@ -308,56 +324,45 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener
   }
 
 
-  private void restore(Collection<TopicPartition> partitions)
+  class RestoreRecordHandler implements Consumer<ConsumerRecord<String, String>>
   {
-    log.info("--> starting restore...");
-
-    Map<Integer, Long> lastSeen =
-        consumer
-            .endOffsets(partitions)
-            .entrySet()
-            .stream()
-            .collect(Collectors.toMap(
-                entry -> entry.getKey().partition(),
-                entry -> entry.getValue() - 1));
-
-    Map<Integer, Long> positions =
-        lastSeen
-            .keySet()
-            .stream()
-            .collect(Collectors.toMap(
-                partition -> partition,
-                partition -> repository.storedPosition(partition)));
-
-    while (
-        positions
-            .entrySet()
-            .stream()
-            .map(entry -> entry.getValue() < lastSeen.get(entry.getKey()))
-            .reduce(false, (a, b) -> a || b))
+    final long seen;
+
+
+    RestoreRecordHandler(Long endOffset)
     {
-      try
-      {
-        ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
-        if (records.count() == 0)
-          continue;
+      this.seen = endOffset - 1;
+    }
 
-        log.debug("polled {} records", records.count());
-        records.forEach(record ->
-        {
-          handleRecord(record, restoreUseCases);
-          positions.put(record.partition(), record.offset());
-        });
+
+    @Override
+    public void accept(ConsumerRecord<String, String> record)
+    {
+      if (seen < record.offset())
+      {
+        int partition = record.partition();
+        log.info(
+            "--> restore of partition {} completed: needed={}, seen={}!",
+            partition,
+            seen,
+            record.offset());
+        recordHandlers[partition] = productionRecordHandler;
+        productionRecordHandler.accept(record);
       }
-      catch(WakeupException e)
+      else
       {
-        log.info("--> cleanly interrupted while restoring");
+        handleRecord(record, restoreUseCases);
+        if (seen == record.offset())
+        {
+          int partition = record.partition();
+          log.info( "--> restore of partition {} completed!", partition);
+          recordHandlers[partition] = productionRecordHandler;
+        }
       }
     }
-
-    log.info("--> restore completed!");
   }
 
+
   @PostMapping("start")
   public synchronized String start()
   {