WIP: instance-mapping from assignor
[demos/kafka/demos-kafka-payment-system-transfer] / src / main / java / de / juplo / kafka / payment / transfer / adapter / TransferConsumer.java
index 2ef7ee3..c312951 100644 (file)
@@ -2,6 +2,7 @@ package de.juplo.kafka.payment.transfer.adapter;
 
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
+import de.juplo.kafka.payment.transfer.domain.Transfer;
 import de.juplo.kafka.payment.transfer.ports.CreateTransferUseCase;
 import de.juplo.kafka.payment.transfer.ports.GetTransferUseCase;
 import de.juplo.kafka.payment.transfer.ports.HandleStateChangeUseCase;
@@ -9,19 +10,21 @@ import de.juplo.kafka.payment.transfer.ports.TransferRepository;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.MemberDescription;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.springframework.context.event.ContextRefreshedEvent;
 import org.springframework.context.event.EventListener;
 import org.springframework.web.bind.annotation.PostMapping;
 import org.springframework.web.bind.annotation.RequestMapping;
 import org.springframework.web.bind.annotation.ResponseBody;
 
+import java.nio.ByteBuffer;
+import java.time.Clock;
 import java.time.Duration;
+import java.time.Instant;
 import java.util.*;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -51,24 +54,43 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener
   private final Map<String, String> instanceIdUriMapping;
   private final String[] instanceIdByPartition;
 
+  private Clock clock;
+  private int stateStoreInterval;
+
   private volatile boolean partitionOwnershipUnknown = true;
 
 
   public TransferConsumer(
+      String bootstrapServers,
+      String groupId,
+      String groupInstanceId,
       String topic,
       int numPartitions,
       Map<String, String> instanceIdUriMapping,
-      KafkaConsumer<String, String> consumer,
       AdminClient adminClient,
       TransferRepository repository,
+      Clock clock,
+      int stateStoreInterval,
       ObjectMapper mapper,
       ConsumerUseCases productionUseCases,
       ConsumerUseCases restoreUseCases)
   {
+    Properties props = new Properties();
+    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+    props.put(ConsumerConfig.GROUP_INSTANCE_ID_CONFIG, groupInstanceId);
+    props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, new Assignor());
+    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+
+    this.consumer = new KafkaConsumer<>(props).;
+
+    this.groupId = groupId;
+    this.groupInstanceId = groupInstanceId;
     this.topic = topic;
     this.numPartitions = numPartitions;
-    this.groupId = consumer.groupMetadata().groupId();
-    this.groupInstanceId = consumer.groupMetadata().groupInstanceId().get();
     this.instanceIdByPartition = new String[numPartitions];
     this.instanceIdUriMapping = new HashMap<>(instanceIdUriMapping.size());
     for (String instanceId : instanceIdUriMapping.keySet())
@@ -79,9 +101,10 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener
           : instanceIdUriMapping.get(instanceId);
       this.instanceIdUriMapping.put(instanceId, uri);
     }
-    this.consumer = consumer;
     this.adminClient = adminClient;
     this.repository = repository;
+    this.clock = clock;
+    this.stateStoreInterval = stateStoreInterval;
     this.mapper = mapper;
     this.productionUseCases = productionUseCases;
     this.restoreUseCases = restoreUseCases;
@@ -91,6 +114,8 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener
   @Override
   public void run()
   {
+    Instant stateStored = clock.instant();
+
     while (running)
     {
       try
@@ -101,6 +126,25 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener
 
         log.debug("polled {} records", records.count());
         records.forEach(record -> handleRecord(record, productionUseCases));
+
+        Instant now = clock.instant();
+        if (
+            stateStoreInterval > 0 &&
+            Duration.between(stateStored, now).getSeconds() >= stateStoreInterval)
+        {
+          Map<Integer, Long> offsets = new HashMap<>();
+
+          for (TopicPartition topicPartition : consumer.assignment())
+          {
+            Integer partition = topicPartition.partition();
+            Long offset = consumer.position(topicPartition);
+            log.info("storing state locally for {}/{}: {}", topic, partition, offset);
+            offsets.put(partition, offset);
+          }
+
+          repository.storeState(offsets);
+          stateStored = now;
+        }
       }
       catch (WakeupException e)
       {
@@ -161,6 +205,21 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener
   }
 
 
+  /**
+   * Identifies the URI, at which the Group-Instance can be reached,
+   * that holds the state for a specific {@link Transfer}.
+   *
+   * The {@link Transfer#getId() ID} of the {@link Transfer} is named
+   * {@code key} here and of type {@code String}, because this example
+   * project stores the key as a String in Kafka to simplify the listing
+   * and manual manipulation of the according topic.
+   *
+   * @param key A {@code String}, that represents the {@link Transfer#getId() ID} of a {@link Transfer}.
+   * @return An {@link Optional}, that holds the URI at which the Group-Instance
+   * can be reached, that holds the state for the {@link Transfer}, that
+   * is identified by the key (if present), or is empty, if the {@link Transfer}
+   * would be handled by the local instance.
+   */
   public Optional<String> uriForKey(String key)
   {
     synchronized (this)
@@ -196,6 +255,13 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener
   {
     partitionOwnershipUnknown = true;
     log.info("partitions revoked: {}", partitions);
+    for (TopicPartition topicPartition : partitions)
+    {
+      int partition = topicPartition.partition();
+      long offset = consumer.position(topicPartition);
+      log.info("deactivating partition {}, offset: {}", partition, offset);
+      repository.deactivatePartition(partition, offset);
+    }
   }
 
   @Override
@@ -204,7 +270,17 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener
     log.info("partitions assigned: {}", partitions);
     fetchAssignmentsAsync();
     if (partitions.size() > 0)
+    {
+      for (TopicPartition topicPartition : partitions)
+      {
+        int partition = topicPartition.partition();
+        long offset = repository.activatePartition(partition);
+        log.info("activated partition {}, seeking to offset {}", partition, offset);
+        consumer.seek(topicPartition, offset);
+      }
+
       restore(partitions);
+    }
   }
 
   private void fetchAssignmentsAsync()
@@ -249,11 +325,6 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener
   {
     log.info("--> starting restore...");
 
-    partitions
-        .stream()
-        .map(topicPartition -> topicPartition.partition())
-        .forEach(partition -> repository.resetStorageForPartition(partition));
-
     Map<Integer, Long> lastSeen =
         consumer
             .endOffsets(partitions)
@@ -269,7 +340,7 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener
             .stream()
             .collect(Collectors.toMap(
                 partition -> partition,
-                partition -> 0l));
+                partition -> repository.storedPosition(partition)));
 
     while (
         positions
@@ -380,4 +451,29 @@ public class TransferConsumer implements Runnable, ConsumerRebalanceListener
         GetTransferUseCase,
         CreateTransferUseCase,
         HandleStateChangeUseCase {};
+
+  public class Assignor extends CooperativeStickyAssignor
+  {
+    @Override
+    public GroupAssignment assign(
+        Cluster metadata,
+        GroupSubscription groupSubscription)
+    {
+      return super.assign(metadata, groupSubscription);
+    }
+
+    @Override
+    public ByteBuffer subscriptionUserData(Set<String> topics)
+    {
+      return null;
+    }
+
+    @Override
+    public void onAssignment(
+        Assignment assignment,
+        ConsumerGroupMetadata metadata)
+    {
+      log.info("New assignment: {}, {}", assignment, metadata);
+    }
+  }
 }