The restore-process no longer happens inside onPartitionsAssigned()
[demos/kafka/demos-kafka-payment-system-transfer] / src / main / java / de / juplo / kafka / payment / transfer / adapter / TransferConsumer.java
index 1fd2689..1cae540 100644 (file)
@@ -2,11 +2,15 @@ package de.juplo.kafka.payment.transfer.adapter;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.payment.transfer.domain.Transfer;
 import de.juplo.kafka.payment.transfer.ports.CreateTransferUseCase;
 import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase;
 import de.juplo.kafka.payment.transfer.ports.HandleStateChangeUseCase;
-import lombok.RequiredArgsConstructor;
+import de.juplo.kafka.payment.transfer.ports.TransferRepository;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.MemberDescription;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -18,35 +22,93 @@ import org.springframework.web.bind.annotation.PostMapping;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.ResponseBody;
 
+import java.time.Clock;
 import java.time.Duration;
-import java.util.List;
-import java.util.Map;
+import java.time.Instant;
+import java.util.*;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
-import java.util.stream.Collectors;
+import java.util.function.Consumer;
 
 
 @RequestMapping("/consumer")
 @ResponseBody
-@RequiredArgsConstructor
 @Slf4j
-public class TransferConsumer implements Runnable
+public class TransferConsumer implements Runnable, ConsumerRebalanceListener
 {
   private final String topic;
+  private final int numPartitions;
   private final KafkaConsumer<String, String> consumer;
+  private final AdminClient adminClient;
+  private final TransferRepository repository;
   private final ObjectMapper mapper;
-  private final ConsumerUseCases productionUseCases, restoreUseCases;
+  private final ConsumerUseCases restoreUseCases;
 
-  private boolean restoring = true;
   private boolean running = false;
   private boolean shutdown = false;
   private Future<?> future = null;
 
+  private final String groupId;
+  private final String groupInstanceId;
+  private final Map<String, String> instanceIdUriMapping;
+  private final String[] instanceIdByPartition;
+
+
+  private Clock clock;
+  private int stateStoreInterval;
+
+  private final Consumer<ConsumerRecord<String, String>> productionRecordHandler;
+  private final Consumer<ConsumerRecord<String, String>> recordHandlers[];
+
+  private volatile boolean partitionOwnershipUnknown = true;
+
+
+  public TransferConsumer(
+      String topic,
+      int numPartitions,
+      Map<String, String> instanceIdUriMapping,
+      KafkaConsumer<String, String> consumer,
+      AdminClient adminClient,
+      TransferRepository repository,
+      Clock clock,
+      int stateStoreInterval,
+      ObjectMapper mapper,
+      ConsumerUseCases productionUseCases,
+      ConsumerUseCases restoreUseCases)
+  {
+    this.topic = topic;
+    this.numPartitions = numPartitions;
+    this.groupId = consumer.groupMetadata().groupId();
+    this.groupInstanceId = consumer.groupMetadata().groupInstanceId().get();
+    this.instanceIdByPartition = new String[numPartitions];
+    this.instanceIdUriMapping = new HashMap<>(instanceIdUriMapping.size());
+    for (String instanceId : instanceIdUriMapping.keySet())
+    {
+      // Requests are not redirected for the instance itself
+      String uri = instanceId.equals(groupInstanceId)
+          ? null
+          : instanceIdUriMapping.get(instanceId);
+      this.instanceIdUriMapping.put(instanceId, uri);
+    }
+    this.consumer = consumer;
+    this.adminClient = adminClient;
+    this.repository = repository;
+    this.clock = clock;
+    this.stateStoreInterval = stateStoreInterval;
+    this.mapper = mapper;
+    this.restoreUseCases = restoreUseCases;
+
+    productionRecordHandler = (record) -> handleRecord(record, productionUseCases);
+    this.recordHandlers = new Consumer[numPartitions];
+  }
+
 
   @Override
   public void run()
   {
+    Instant stateStored = clock.instant();
+
     while (running)
     {
       try
@@ -56,7 +118,26 @@ public class TransferConsumer implements Runnable
           continue;
 
         log.debug("polled {} records", records.count());
-        records.forEach(record -> handleRecord(record, productionUseCases));
+        records.forEach(record -> recordHandlers[record.partition()].accept(record));
+
+        Instant now = clock.instant();
+        if (
+            stateStoreInterval > 0 &&
+            Duration.between(stateStored, now).getSeconds() >= stateStoreInterval)
+        {
+          Map<Integer, Long> offsets = new HashMap<>();
+
+          for (TopicPartition topicPartition : consumer.assignment())
+          {
+            Integer partition = topicPartition.partition();
+            Long offset = consumer.position(topicPartition);
+            log.info("storing state locally for {}/{}: {}", topic, partition, offset);
+            offsets.put(partition, offset);
+          }
+
+          repository.storeState(offsets);
+          stateStored = now;
+        }
       }
       catch (WakeupException e)
       {
@@ -116,115 +197,208 @@ public class TransferConsumer implements Runnable
     }
   }
 
+
+  /**
+   * Identifies the URI, at which the Group-Instance can be reached,
+   * that holds the state for a specific {@link Transfer}.
+   *
+   * The {@link Transfer#getId() ID} of the {@link Transfer} is named
+   * {@code key} here and of type {@code String}, because this example
+   * project stores the key as a String in Kafka to simplify the listing
+   * and manual manipulation of the according topic.
+   *
+   * @param key A {@code String}, that represents the {@link Transfer#getId() ID} of a {@link Transfer}.
+   * @return An {@link Optional}, that holds the URI at which the Group-Instance
+   * can be reached, that holds the state for the {@link Transfer}, that
+   * is identified by the key (if present), or is empty, if the {@link Transfer}
+   * would be handled by the local instance.
+   */
+  public Optional<String> uriForKey(String key)
+  {
+    synchronized (this)
+    {
+      while (partitionOwnershipUnknown)
+      {
+        try { wait(); } catch (InterruptedException e) {}
+      }
+
+      int partition = TransferPartitioner.computeHashForKey(key, numPartitions);
+      return
+          Optional
+              .ofNullable(instanceIdByPartition[partition])
+              .map(id -> instanceIdUriMapping.get(id));
+    }
+  }
+
   @EventListener
   public synchronized void onApplicationEvent(ContextRefreshedEvent event)
   {
-    // Needed, because this method is called synchronously during the
-    // initialization pahse of Spring. If the restoring is processed
+    // "Needed", because this method is called synchronously during the
+    // initialization pahse of Spring. If the subscription happens
     // in the same thread, it would block the completion of the initialization.
     // Hence, the app would not react to any signal (CTRL-C, for example) except
     // a KILL until the restoring is finished.
-    future = CompletableFuture.runAsync(() -> restore());
+    future = CompletableFuture.runAsync(() -> start());
+    log.info("start of application completed");
   }
 
-  private void restore()
+
+  @Override
+  public void onPartitionsRevoked(Collection<TopicPartition> partitions)
   {
-    log.info("--> starting restore...");
-
-    List<TopicPartition> partitions =
-        consumer
-            .partitionsFor(topic)
-            .stream()
-            .map(info -> new TopicPartition(topic, info.partition()))
-            .collect(Collectors.toList());
-
-    Map<Integer, Long> lastSeen =
-        consumer
-            .endOffsets(partitions)
-            .entrySet()
-            .stream()
-            .collect(Collectors.toMap(
-                entry -> entry.getKey().partition(),
-                entry -> entry.getValue() - 1));
-
-    Map<Integer, Long> positions =
-        lastSeen
-            .keySet()
-            .stream()
-            .collect(Collectors.toMap(
-                partition -> partition,
-                partition -> 0l));
-
-    log.info("assigning {}}", partitions);
-    consumer.assign(partitions);
-
-    while (
-        restoring &&
-        positions
-            .entrySet()
-            .stream()
-            .map(entry -> entry.getValue() < lastSeen.get(entry.getKey()))
-            .reduce(false, (a, b) -> a || b))
+    partitionOwnershipUnknown = true;
+    log.info("partitions revoked: {}", partitions);
+    for (TopicPartition topicPartition : partitions)
     {
-      try
+      int partition = topicPartition.partition();
+      long offset = consumer.position(topicPartition);
+      log.info("deactivating partition {}, offset: {}", partition, offset);
+      repository.deactivatePartition(partition, offset);
+    }
+  }
+
+  @Override
+  public void onPartitionsAssigned(Collection<TopicPartition> partitions)
+  {
+    log.info("partitions assigned: {}", partitions);
+    fetchAssignmentsAsync();
+    if (partitions.size() > 0)
+    {
+      for (Map.Entry<TopicPartition, Long> entry : consumer.endOffsets(partitions).entrySet())
       {
-        ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
-        if (records.count() == 0)
-          continue;
+        TopicPartition topicPartition = entry.getKey();
+        Integer partition = topicPartition.partition();
+        long offset = repository.activatePartition(partition);
+        log.info("activated partition {}, seeking to offset {}", partition, offset);
+        consumer.seek(topicPartition, offset);
+        Long endOffset = entry.getValue();
+        if (offset < endOffset)
+        {
+          log.info("--> starting restore of partition {}: {} -> {}", partition, offset, endOffset);
+          recordHandlers[partition] = new RestoreRecordHandler(endOffset);
+        }
+        else
+        {
+          log.info("--> partition {} is up-to-date, offset: {}", partition, offset);
+          recordHandlers[partition] = productionRecordHandler;
+        }
+      }
+    }
+  }
 
-        log.debug("polled {} records", records.count());
-        records.forEach(record ->
+  private void fetchAssignmentsAsync()
+  {
+    adminClient
+        .describeConsumerGroups(List.of(groupId))
+        .describedGroups()
+        .get(groupId)
+        .whenComplete((descriptions, e) ->
         {
-          handleRecord(record, restoreUseCases);
-          positions.put(record.partition(), record.offset());
+          if (e != null)
+          {
+            log.error("could not fetch group data: {}", e.getMessage());
+          }
+          else
+          {
+            synchronized (this)
+            {
+              for (MemberDescription description : descriptions.members())
+              {
+                description
+                    .assignment()
+                    .topicPartitions()
+                    .forEach(tp -> instanceIdByPartition[tp.partition()] = description.groupInstanceId().get());
+              }
+              partitionOwnershipUnknown = false;
+              notifyAll();
+            }
+          }
         });
+  }
+
+  @Override
+  public void onPartitionsLost(Collection<TopicPartition> partitions)
+  {
+    partitionOwnershipUnknown = true;
+    log.info("partiotions lost: {}", partitions);
+  }
+
+
+  class RestoreRecordHandler implements Consumer<ConsumerRecord<String, String>>
+  {
+    final long seen;
+
+
+    RestoreRecordHandler(Long endOffset)
+    {
+      this.seen = endOffset - 1;
+    }
+
+
+    @Override
+    public void accept(ConsumerRecord<String, String> record)
+    {
+      if (seen < record.offset())
+      {
+        int partition = record.partition();
+        log.info(
+            "--> restore of partition {} completed: needed={}, seen={}!",
+            partition,
+            seen,
+            record.offset());
+        recordHandlers[partition] = productionRecordHandler;
+        productionRecordHandler.accept(record);
       }
-      catch(WakeupException e)
+      else
       {
-        log.info("--> cleanly interrupted while restoring");
-        return;
+        handleRecord(record, restoreUseCases);
+        if (seen == record.offset())
+        {
+          int partition = record.partition();
+          log.info( "--> restore of partition {} completed!", partition);
+          recordHandlers[partition] = productionRecordHandler;
+        }
       }
     }
-
-    log.info("--> restore completed!");
-    restoring = false;
-
-    // We are intentionally _not_ unsubscribing here, since that would
-    // reset the offset to _earliest_, because we disabled offset-commits.
-
-    start();
   }
 
+
   @PostMapping("start")
   public synchronized String start()
   {
-    if (restoring)
+    if (running)
     {
-      log.error("cannot start while restoring");
-      return "Denied: Restoring!";
+      log.info("consumer already running!");
+      return "Already running!";
     }
 
-    String result = "Started";
-
-    if (running)
+    int foundNumPartitions = consumer.partitionsFor(topic).size();
+    if (foundNumPartitions != numPartitions)
     {
-      stop();
-      result = "Restarted";
+      log.error(
+          "unexpected number of partitions for topic {}: expected={}, found={}",
+          topic,
+          numPartitions,
+          foundNumPartitions
+          );
+      return "Wrong number of partitions for topic " + topic + ": " + foundNumPartitions;
     }
 
+    consumer.subscribe(List.of(topic), this);
+
     running = true;
     future = CompletableFuture.runAsync(this);
 
-    log.info("started");
-    return result;
+    log.info("consumer started");
+    return "Started";
   }
 
   @PostMapping("stop")
   public synchronized String stop()
   {
-    if (!(running || restoring))
+    if (!running)
     {
-      log.info("not running!");
+      log.info("consumer not running!");
       return "Not running";
     }
 
@@ -246,9 +420,10 @@ public class TransferConsumer implements Runnable
     finally
     {
       future = null;
+      consumer.unsubscribe();
     }
 
-    log.info("stopped");
+    log.info("consumer stopped");
     return "Stopped";
   }
 
@@ -262,6 +437,7 @@ public class TransferConsumer implements Runnable
   }
 
 
+
   public interface ConsumerUseCases
       extends
         GetTransferUseCase,