Switched from single-node (assign) to multi-instance (subscribe)
[demos/kafka/demos-kafka-payment-system-transfer] / src / main / java / de / juplo / kafka / payment / transfer / adapter / TransferConsumer.java
index 1fd2689..501bfd0 100644 (file)
@@ -5,8 +5,11 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import de.juplo.kafka.payment.transfer.ports.CreateTransferUseCase;
 import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase;
 import de.juplo.kafka.payment.transfer.ports.HandleStateChangeUseCase;
-import lombok.RequiredArgsConstructor;
+import de.juplo.kafka.payment.transfer.ports.TransferRepository;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.MemberDescription;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
@@ -19,8 +22,7 @@ import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.ResponseBody;
 
 import java.time.Duration;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
@@ -29,20 +31,62 @@ import java.util.stream.Collectors;
 
 @RequestMapping("/consumer")
 @ResponseBody
-@RequiredArgsConstructor
 @Slf4j
-public class TransferConsumer implements Runnable
+public class TransferConsumer implements Runnable, ConsumerRebalanceListener
 {
   private final String topic;
+  private final int numPartitions;
   private final KafkaConsumer<String, String> consumer;
+  private final AdminClient adminClient;
+  private final TransferRepository repository;
   private final ObjectMapper mapper;
   private final ConsumerUseCases productionUseCases, restoreUseCases;
 
-  private boolean restoring = true;
   private boolean running = false;
   private boolean shutdown = false;
   private Future<?> future = null;
 
+  private final String groupId;
+  private final String groupInstanceId;
+  private final Map<String, String> instanceIdUriMapping;
+  private final String[] instanceIdByPartition;
+
+  private volatile boolean partitionOwnershipUnknown = true;
+
+
+  public TransferConsumer(
+      String topic,
+      int numPartitions,
+      Map<String, String> instanceIdUriMapping,
+      KafkaConsumer<String, String> consumer,
+      AdminClient adminClient,
+      TransferRepository repository,
+      ObjectMapper mapper,
+      ConsumerUseCases productionUseCases,
+      ConsumerUseCases restoreUseCases)
+  {
+    this.topic = topic;
+    this.numPartitions = numPartitions;
+    this.groupId = consumer.groupMetadata().groupId();
+    this.groupInstanceId = consumer.groupMetadata().groupInstanceId().get();
+    this.instanceIdByPartition = new String[numPartitions];
+    this.instanceIdUriMapping = new HashMap<>(instanceIdUriMapping.size());
+    for (String instanceId : instanceIdUriMapping.keySet())
+    {
+      // Requests are not redirected for the instance itself
+      String uri = instanceId.equals(groupInstanceId)
+          ? null
+          : instanceIdUriMapping.get(instanceId);
+      this.instanceIdUriMapping.put(instanceId, uri);
+    }
+    this.consumer = consumer;
+    this.adminClient = adminClient;
+    this.repository = repository;
+    this.mapper = mapper;
+    this.productionUseCases = productionUseCases;
+    this.restoreUseCases = restoreUseCases;
+  }
+
 
   @Override
   public void run()
@@ -116,27 +160,98 @@ public class TransferConsumer implements Runnable
     }
   }
 
+
+  public Optional<String> uriForKey(String key)
+  {
+    synchronized (this)
+    {
+      while (partitionOwnershipUnknown)
+      {
+        try { wait(); } catch (InterruptedException e) {}
+      }
+
+      int partition = TransferPartitioner.computeHashForKey(key, numPartitions);
+      return
+          Optional
+              .ofNullable(instanceIdByPartition[partition])
+              .map(id -> instanceIdUriMapping.get(id));
+    }
+  }
+
   @EventListener
   public synchronized void onApplicationEvent(ContextRefreshedEvent event)
   {
-    // Needed, because this method is called synchronously during the
-    // initialization pahse of Spring. If the restoring is processed
+    // "Needed", because this method is called synchronously during the
+    // initialization pahse of Spring. If the subscription happens
     // in the same thread, it would block the completion of the initialization.
     // Hence, the app would not react to any signal (CTRL-C, for example) except
     // a KILL until the restoring is finished.
-    future = CompletableFuture.runAsync(() -> restore());
+    future = CompletableFuture.runAsync(() -> start());
+  }
+
+
+  @Override
+  public void onPartitionsRevoked(Collection<TopicPartition> partitions)
+  {
+    partitionOwnershipUnknown = true;
+    log.info("partitions revoked: {}", partitions);
   }
 
-  private void restore()
+  @Override
+  public void onPartitionsAssigned(Collection<TopicPartition> partitions)
+  {
+    log.info("partitions assigned: {}", partitions);
+    fetchAssignmentsAsync();
+    if (partitions.size() > 0)
+      restore(partitions);
+  }
+
+  private void fetchAssignmentsAsync()
+  {
+    adminClient
+        .describeConsumerGroups(List.of(groupId))
+        .describedGroups()
+        .get(groupId)
+        .whenComplete((descriptions, e) ->
+        {
+          if (e != null)
+          {
+            log.error("could not fetch group data: {}", e.getMessage());
+          }
+          else
+          {
+            synchronized (this)
+            {
+              for (MemberDescription description : descriptions.members())
+              {
+                description
+                    .assignment()
+                    .topicPartitions()
+                    .forEach(tp -> instanceIdByPartition[tp.partition()] = description.groupInstanceId().get());
+              }
+              partitionOwnershipUnknown = false;
+              notifyAll();
+            }
+          }
+        });
+  }
+
+  @Override
+  public void onPartitionsLost(Collection<TopicPartition> partitions)
+  {
+    partitionOwnershipUnknown = true;
+    log.info("partiotions lost: {}", partitions);
+  }
+
+
+  private void restore(Collection<TopicPartition> partitions)
   {
     log.info("--> starting restore...");
 
-    List<TopicPartition> partitions =
-        consumer
-            .partitionsFor(topic)
-            .stream()
-            .map(info -> new TopicPartition(topic, info.partition()))
-            .collect(Collectors.toList());
+    partitions
+        .stream()
+        .map(topicPartition -> topicPartition.partition())
+        .forEach(partition -> repository.resetStorageForPartition(partition));
 
     Map<Integer, Long> lastSeen =
         consumer
@@ -155,11 +270,7 @@ public class TransferConsumer implements Runnable
                 partition -> partition,
                 partition -> 0l));
 
-    log.info("assigning {}}", partitions);
-    consumer.assign(partitions);
-
     while (
-        restoring &&
         positions
             .entrySet()
             .stream()
@@ -182,47 +293,46 @@ public class TransferConsumer implements Runnable
       catch(WakeupException e)
       {
         log.info("--> cleanly interrupted while restoring");
-        return;
       }
     }
 
     log.info("--> restore completed!");
-    restoring = false;
-
-    // We are intentionally _not_ unsubscribing here, since that would
-    // reset the offset to _earliest_, because we disabled offset-commits.
-
-    start();
   }
 
   @PostMapping("start")
   public synchronized String start()
   {
-    if (restoring)
+    if (running)
     {
-      log.error("cannot start while restoring");
-      return "Denied: Restoring!";
+      log.info("already running!");
+      return "Already running!";
     }
 
-    String result = "Started";
-
-    if (running)
+    int foundNumPartitions = consumer.partitionsFor(topic).size();
+    if (foundNumPartitions != numPartitions)
     {
-      stop();
-      result = "Restarted";
+      log.error(
+          "unexpected number of partitions for topic {}: expected={}, found={}",
+          topic,
+          numPartitions,
+          foundNumPartitions
+          );
+      return "Wrong number of partitions for topic " + topic + ": " + foundNumPartitions;
     }
 
+    consumer.subscribe(List.of(topic), this);
+
     running = true;
     future = CompletableFuture.runAsync(this);
 
     log.info("started");
-    return result;
+    return "Started";
   }
 
   @PostMapping("stop")
   public synchronized String stop()
   {
-    if (!(running || restoring))
+    if (!running)
     {
       log.info("not running!");
       return "Not running";
@@ -246,6 +356,7 @@ public class TransferConsumer implements Runnable
     finally
     {
       future = null;
+      consumer.unsubscribe();
     }
 
     log.info("stopped");
@@ -262,6 +373,7 @@ public class TransferConsumer implements Runnable
   }
 
 
+
   public interface ConsumerUseCases
       extends
         GetTransferUseCase,