Bugfix: Check for existence of a new transfer requires a remote-call
[demos/kafka/demos-kafka-payment-system-transfer] / src / main / java / de / juplo / kafka / payment / transfer / adapter / TransferConsumer.java
index e7c2430..920d79a 100644 (file)
@@ -3,95 +3,389 @@ package de.juplo.kafka.payment.transfer.adapter;
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import de.juplo.kafka.payment.transfer.domain.Transfer;
-import de.juplo.kafka.payment.transfer.ports.HandleTransferUseCase;
-import lombok.RequiredArgsConstructor;
+import de.juplo.kafka.payment.transfer.ports.CreateTransferUseCase;
+import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase;
+import de.juplo.kafka.payment.transfer.ports.HandleStateChangeUseCase;
+import de.juplo.kafka.payment.transfer.ports.TransferRepository;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.MemberDescription;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
+import org.springframework.context.event.ContextRefreshedEvent;
+import org.springframework.context.event.EventListener;
 import org.springframework.web.bind.annotation.PostMapping;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.ResponseBody;
 
+import java.time.Clock;
 import java.time.Duration;
-import java.util.Set;
+import java.time.Instant;
+import java.util.*;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.stream.Collectors;
 
 
 @RequestMapping("/consumer")
 @ResponseBody
-@RequiredArgsConstructor
 @Slf4j
-public class TransferConsumer implements Runnable
+public class TransferConsumer implements Runnable, ConsumerRebalanceListener
 {
   private final String topic;
+  private final int numPartitions;
   private final KafkaConsumer<String, String> consumer;
-  private final ExecutorService executorService;
+  private final AdminClient adminClient;
+  private final TransferRepository repository;
   private final ObjectMapper mapper;
-  private final HandleTransferUseCase handleTransferUseCase;
+  private final ConsumerUseCases productionUseCases, restoreUseCases;
 
   private boolean running = false;
+  private boolean shutdown = false;
   private Future<?> future = null;
 
+  private final String groupId;
+  private final String groupInstanceId;
+  private final Map<String, String> instanceIdUriMapping;
+  private final String[] instanceIdByPartition;
+
+  private Clock clock;
+  private int stateStoreInterval;
+
+  private volatile boolean partitionOwnershipUnknown = true;
+
+
+  public TransferConsumer(
+      String topic,
+      int numPartitions,
+      Map<String, String> instanceIdUriMapping,
+      KafkaConsumer<String, String> consumer,
+      AdminClient adminClient,
+      TransferRepository repository,
+      Clock clock,
+      int stateStoreInterval,
+      ObjectMapper mapper,
+      ConsumerUseCases productionUseCases,
+      ConsumerUseCases restoreUseCases)
+  {
+    this.topic = topic;
+    this.numPartitions = numPartitions;
+    this.groupId = consumer.groupMetadata().groupId();
+    this.groupInstanceId = consumer.groupMetadata().groupInstanceId().get();
+    this.instanceIdByPartition = new String[numPartitions];
+    this.instanceIdUriMapping = new HashMap<>(instanceIdUriMapping.size());
+    for (String instanceId : instanceIdUriMapping.keySet())
+    {
+      // Requests are not redirected for the instance itself
+      String uri = instanceId.equals(groupInstanceId)
+          ? null
+          : instanceIdUriMapping.get(instanceId);
+      this.instanceIdUriMapping.put(instanceId, uri);
+    }
+    this.consumer = consumer;
+    this.adminClient = adminClient;
+    this.repository = repository;
+    this.clock = clock;
+    this.stateStoreInterval = stateStoreInterval;
+    this.mapper = mapper;
+    this.productionUseCases = productionUseCases;
+    this.restoreUseCases = restoreUseCases;
+  }
+
 
   @Override
   public void run()
   {
+    Instant stateStored = clock.instant();
+
     while (running)
     {
       try
       {
         ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
-        if (records.count() > 0)
-          log.debug("polled {} records", records.count());
+        if (records.count() == 0)
+          continue;
 
-        records.forEach(record ->
+        log.debug("polled {} records", records.count());
+        records.forEach(record -> handleRecord(record, productionUseCases));
+
+        Instant now = clock.instant();
+        if (
+            stateStoreInterval > 0 &&
+            Duration.between(stateStored, now).getSeconds() >= stateStoreInterval)
         {
-          try
-          {
-            Transfer transfer = mapper.readValue(record.value(), Transfer.class);
-            handleTransferUseCase.handle(transfer);
-          }
-          catch (JsonProcessingException e)
+          Map<Integer, Long> offsets = new HashMap<>();
+
+          for (TopicPartition topicPartition : consumer.assignment())
           {
-            log.error(
-                "ignoring invalid json in message #{} on {}/{}: {}",
-                record.offset(),
-                record.topic(),
-                record.partition(),
-                record.value());
+            Integer partition = topicPartition.partition();
+            Long offset = consumer.position(topicPartition);
+            log.info("storing state locally for {}/{}: {}", topic, partition, offset);
+            offsets.put(partition, offset);
           }
-        });
+
+          repository.storeState(offsets);
+          stateStored = now;
+        }
       }
       catch (WakeupException e)
       {
-        log.info("polling aborted!");
+        log.info("cleanly interrupted while polling");
       }
     }
 
     log.info("polling stopped");
   }
 
+  private void handleRecord(ConsumerRecord<String, String> record, ConsumerUseCases useCases)
+  {
+    try
+    {
+      byte eventType = record.headers().lastHeader(EventType.HEADER).value()[0];
+
+      switch (eventType)
+      {
+        case EventType.NEW_TRANSFER:
+
+          NewTransferEvent newTransferEvent =
+              mapper.readValue(record.value(), NewTransferEvent.class);
+          useCases
+              .create(
+                  newTransferEvent.getId(),
+                  newTransferEvent.getPayer(),
+                  newTransferEvent.getPayee(),
+                  newTransferEvent.getAmount());
+          break;
+
+        case EventType.TRANSFER_STATE_CHANGED:
+
+          TransferStateChangedEvent stateChangedEvent =
+              mapper.readValue(record.value(), TransferStateChangedEvent.class);
+          useCases.handleStateChange(stateChangedEvent.getId(), stateChangedEvent.getState());
+          break;
+      }
+    }
+    catch (JsonProcessingException e)
+    {
+      log.error(
+          "ignoring invalid json in message #{} on {}/{}: {}",
+          record.offset(),
+          record.topic(),
+          record.partition(),
+          record.value());
+    }
+    catch (IllegalArgumentException e)
+    {
+      log.error(
+          "ignoring invalid message #{} on {}/{}: {}, message={}",
+          record.offset(),
+          record.topic(),
+          record.partition(),
+          e.getMessage(),
+          record.value());
+    }
+  }
+
+
+  /**
+   * Identifies the URI, at which the Group-Instance can be reached,
+   * that holds the state for a specific {@link Transfer}.
+   *
+   * The {@link Transfer#getId() ID} of the {@link Transfer} is named
+   * {@code key} here and of type {@code String}, because this example
+   * project stores the key as a String in Kafka to simplify the listing
+   * and manual manipulation of the according topic.
+   *
+   * @param key A {@code String}, that represents the {@link Transfer#getId() ID} of a {@link Transfer}.
+   * @return An {@link Optional}, that holds the URI at which the Group-Instance
+   * can be reached, that holds the state for the {@link Transfer}, that
+   * is identified by the key (if present), or is empty, if the {@link Transfer}
+   * would be handled by the local instance.
+   */
+  public Optional<String> uriForKey(String key)
+  {
+    synchronized (this)
+    {
+      while (partitionOwnershipUnknown)
+      {
+        try { wait(); } catch (InterruptedException e) {}
+      }
+
+      int partition = TransferPartitioner.computeHashForKey(key, numPartitions);
+      return
+          Optional
+              .ofNullable(instanceIdByPartition[partition])
+              .map(id -> instanceIdUriMapping.get(id));
+    }
+  }
+
+  @EventListener
+  public synchronized void onApplicationEvent(ContextRefreshedEvent event)
+  {
+    // "Needed", because this method is called synchronously during the
+    // initialization pahse of Spring. If the subscription happens
+    // in the same thread, it would block the completion of the initialization.
+    // Hence, the app would not react to any signal (CTRL-C, for example) except
+    // a KILL until the restoring is finished.
+    future = CompletableFuture.runAsync(() -> start());
+    log.info("start of application completed");
+  }
+
+
+  @Override
+  public void onPartitionsRevoked(Collection<TopicPartition> partitions)
+  {
+    partitionOwnershipUnknown = true;
+    log.info("partitions revoked: {}", partitions);
+    for (TopicPartition topicPartition : partitions)
+    {
+      int partition = topicPartition.partition();
+      long offset = consumer.position(topicPartition);
+      log.info("deactivating partition {}, offset: {}", partition, offset);
+      repository.deactivatePartition(partition, offset);
+    }
+  }
+
+  @Override
+  public void onPartitionsAssigned(Collection<TopicPartition> partitions)
+  {
+    log.info("partitions assigned: {}", partitions);
+    fetchAssignmentsAsync();
+    if (partitions.size() > 0)
+    {
+      for (TopicPartition topicPartition : partitions)
+      {
+        int partition = topicPartition.partition();
+        long offset = repository.activatePartition(partition);
+        log.info("activated partition {}, seeking to offset {}", partition, offset);
+        consumer.seek(topicPartition, offset);
+      }
+
+      restore(partitions);
+    }
+  }
+
+  private void fetchAssignmentsAsync()
+  {
+    adminClient
+        .describeConsumerGroups(List.of(groupId))
+        .describedGroups()
+        .get(groupId)
+        .whenComplete((descriptions, e) ->
+        {
+          if (e != null)
+          {
+            log.error("could not fetch group data: {}", e.getMessage());
+          }
+          else
+          {
+            synchronized (this)
+            {
+              for (MemberDescription description : descriptions.members())
+              {
+                description
+                    .assignment()
+                    .topicPartitions()
+                    .forEach(tp -> instanceIdByPartition[tp.partition()] = description.groupInstanceId().get());
+              }
+              partitionOwnershipUnknown = false;
+              notifyAll();
+            }
+          }
+        });
+  }
+
+  @Override
+  public void onPartitionsLost(Collection<TopicPartition> partitions)
+  {
+    partitionOwnershipUnknown = true;
+    log.info("partiotions lost: {}", partitions);
+  }
+
+
+  private void restore(Collection<TopicPartition> partitions)
+  {
+    log.info("--> starting restore...");
+
+    Map<Integer, Long> lastSeen =
+        consumer
+            .endOffsets(partitions)
+            .entrySet()
+            .stream()
+            .collect(Collectors.toMap(
+                entry -> entry.getKey().partition(),
+                entry -> entry.getValue() - 1));
+
+    Map<Integer, Long> positions =
+        lastSeen
+            .keySet()
+            .stream()
+            .collect(Collectors.toMap(
+                partition -> partition,
+                partition -> repository.storedPosition(partition)));
+
+    while (
+        positions
+            .entrySet()
+            .stream()
+            .map(entry -> entry.getValue() < lastSeen.get(entry.getKey()))
+            .reduce(false, (a, b) -> a || b))
+    {
+      try
+      {
+        ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
+        if (records.count() == 0)
+          continue;
+
+        log.debug("polled {} records", records.count());
+        records.forEach(record ->
+        {
+          handleRecord(record, restoreUseCases);
+          positions.put(record.partition(), record.offset());
+        });
+      }
+      catch(WakeupException e)
+      {
+        log.info("--> cleanly interrupted while restoring");
+      }
+    }
+
+    log.info("--> restore completed!");
+  }
 
   @PostMapping("start")
   public synchronized String start()
   {
-    String result = "Started";
-
     if (running)
     {
-      stop();
-      result = "Restarted";
+      log.info("consumer already running!");
+      return "Already running!";
+    }
+
+    int foundNumPartitions = consumer.partitionsFor(topic).size();
+    if (foundNumPartitions != numPartitions)
+    {
+      log.error(
+          "unexpected number of partitions for topic {}: expected={}, found={}",
+          topic,
+          numPartitions,
+          foundNumPartitions
+          );
+      return "Wrong number of partitions for topic " + topic + ": " + foundNumPartitions;
     }
 
-    log.info("subscribing to topic {}", topic);
-    consumer.subscribe(Set.of(topic));
+    consumer.subscribe(List.of(topic), this);
+
     running = true;
-    future = executorService.submit(this);
+    future = CompletableFuture.runAsync(this);
 
-    return result;
+    log.info("consumer started");
+    return "Started";
   }
 
   @PostMapping("stop")
@@ -99,14 +393,16 @@ public class TransferConsumer implements Runnable
   {
     if (!running)
     {
-      log.info("not running!");
+      log.info("consumer not running!");
       return "Not running";
     }
 
     running = false;
+
     if (!future.isDone())
       consumer.wakeup();
-    log.info("waiting for the polling-loop to finish...");
+
+    log.info("waiting for the consumer...");
     try
     {
       future.get();
@@ -119,18 +415,27 @@ public class TransferConsumer implements Runnable
     finally
     {
       future = null;
-      log.info("unsubscribing");
       consumer.unsubscribe();
     }
 
-    return "Stoped";
+    log.info("consumer stopped");
+    return "Stopped";
   }
 
   public synchronized void shutdown()
   {
     log.info("shutdown initiated!");
+    shutdown = true;
     stop();
     log.info("closing consumer");
     consumer.close();
   }
+
+
+
+  public interface ConsumerUseCases
+      extends
+        GetTransferUseCase,
+        CreateTransferUseCase,
+        HandleStateChangeUseCase {};
 }