Der Zustand wird aus dem ``state``-Topic wiederhergestellt
authorKai Moritz <kai@juplo.de>
Mon, 28 Oct 2024 09:28:50 +0000 (10:28 +0100)
committerKai Moritz <kai@juplo.de>
Fri, 8 Nov 2024 17:21:16 +0000 (18:21 +0100)
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ExampleConsumer.java

index e454ed0..49875a0 100644 (file)
@@ -2,7 +2,7 @@ package de.juplo.kafka;
 
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.StickyAssignor;
+import org.apache.kafka.clients.consumer.RangeAssignor;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.common.serialization.StringDeserializer;
@@ -52,7 +52,7 @@ public class ApplicationConfiguration
       props.put("auto.commit.interval", properties.getConsumerProperties().getAutoCommitInterval());
     }
     props.put("metadata.maxage.ms", 5000); //  5 Sekunden
-    props.put("partition.assignment.strategy", StickyAssignor.class.getName());
+    props.put("partition.assignment.strategy", RangeAssignor.class.getName());
     props.put("key.deserializer", StringDeserializer.class.getName());
     props.put("value.deserializer", StringDeserializer.class.getName());
 
index b9d8b27..edd55f8 100644 (file)
@@ -31,6 +31,8 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
   private volatile boolean running = false;
   private final Phaser phaser = new Phaser(1);
   private final Set<TopicPartition> assignedPartitions = new HashSet<>();
+  private volatile PartitionState[] partitionStates;
+  private volatile long[] stateEndOffsets;
   private volatile int[] seen;
   private volatile int[] acked;
   private volatile boolean[] done;
@@ -66,12 +68,18 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
       log.info("{} - Fetching PartitionInfo for topic {}", id, topic);
       int numPartitions = consumer.partitionsFor(topic).size();
       log.info("{} - Topic {} has {} partitions", id, topic, numPartitions);
+      partitionStates = new PartitionState[numPartitions];
+      for (int i=0; i<numPartitions; i++)
+      {
+        partitionStates[i] = PartitionState.UNASSIGNED;
+      }
+      stateEndOffsets = new long[numPartitions];
       seen = new int[numPartitions];
       acked = new int[numPartitions];
       done = new boolean[numPartitions];
 
       log.info("{} - Subscribing to topic {}", id, topic);
-      consumer.subscribe(Arrays.asList(topic), this);
+      consumer.subscribe(Arrays.asList(topic, stateTopic), this);
       running = true;
 
       while (running)
@@ -81,14 +89,19 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
 
         int phase = phaser.getPhase();
 
-        log.info("{} - Received {} messages in phase {}", id, records.count(), phase);
         assignedPartitions
           .forEach(partition ->
           {
             seen[partition.partition()] = 0;
             acked[partition.partition()] = 0;
             done[partition.partition()] = false;
+          });
 
+        log.info("{} - Received {} messages in phase {}", id, records.count(), phase);
+        records
+          .partitions()
+          .forEach(partition ->
+          {
             for (ConsumerRecord<String, String> record : records.records(partition))
             {
               handleRecord(
@@ -100,6 +113,11 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
             }
 
             done[partition.partition()] = true;
+          });
+
+        assignedPartitions
+          .forEach(partition ->
+          {
             if (seen[partition.partition()] == 0)
             {
               int arrivedPhase = phaser.arrive();
@@ -139,6 +157,56 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
   {
     consumed++;
     log.info("{} - {}: {}/{} - {}={}", id, offset, topic, partition, key, value);
+
+    if (topic.equals(this.topic))
+    {
+      handleMessage(partition, key);
+    }
+    else
+    {
+      handleState(partition, offset, key, value);
+    }
+  }
+
+  private synchronized void handleState(
+    int partition,
+    long offset,
+    String key,
+    String value)
+  {
+    counterState.put(key, Long.parseLong(value));
+    if (offset + 1 == stateEndOffsets[partition])
+    {
+      log.info(
+        "{} - Restoring of state for partition {} done! New partition-state is ASSIGNED!", id, partition);
+      partitionStates[partition] = PartitionState.ASSIGNED;
+
+      TopicPartition tp;
+
+      tp = new TopicPartition(stateTopic, partition);
+      log.info("{} - Pausing state partition {}...", id, tp);
+      consumer.pause(List.of(tp));
+
+      tp = new TopicPartition(topic, partition);
+      log.info("{} - Resuming message partition {}...", id, tp);
+      consumer.resume(List.of(tp));
+    }
+    else
+    {
+      log.debug(
+        "{} - Restored state up to offset {}, end-offset: {}, state: {}={}",
+        id,
+        offset,
+        stateEndOffsets[partition],
+        key,
+        value);
+    }
+  }
+
+  private void handleMessage(
+    Integer partition,
+    String key)
+  {
     Long counter = computeCount(key);
     log.info("{} - current value for counter {}: {}", id, key, counter);
     sendCounterState(partition, key, counter);
@@ -234,25 +302,108 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
   @Override
   public void onPartitionsAssigned(Collection<TopicPartition> partitions)
   {
-    assignedPartitions.addAll(partitions);
-    phaser.bulkRegister(partitions.size());
-    log.info(
-      "{} - Added {} parties for newly assigned partitions. New total number of parties: {}",
-      id,
-      partitions.size(),
-      phaser.getRegisteredParties());
+    partitions.forEach(partition ->
+    {
+      if (partition.topic().equals(topic))
+      {
+        log.info("{} - Adding partition {}", id, partition);
+        assignedPartitions.add(partition);
+
+        phaser.register();
+        log.info(
+          "{} - Registered new party for restored assigned partition {}. New total number of parties: {}",
+          id,
+          partition,
+          phaser.getRegisteredParties());
+
+        log.info(
+          "{} - Changing partition-state for {}: {} -> RESTORING",
+          id,
+          partition,
+          partitionStates[partition.partition()]);
+        partitionStates[partition.partition()] = PartitionState.RESTORING;
+      }
+      else
+      {
+        long endOffset = consumer
+          .endOffsets(List.of(partition))
+          .get(partition)
+          .longValue();
+        log.info("{} - Found end-offset {} for state partition {}", id, endOffset, partition);
+        stateEndOffsets[partition.partition()] = endOffset;
+      }
+    });
+
+    assignedPartitions.forEach(messagePartition ->
+    {
+      int partition = messagePartition.partition();
+      TopicPartition statePartition = new TopicPartition(this.stateTopic, partition);
+
+      long stateBeginningOffset = consumer
+        .beginningOffsets(List.of(statePartition))
+        .get(statePartition);
+
+      if (stateBeginningOffset < stateEndOffsets[partition])
+      {
+        log.info(
+          "{} - Seeking to first offset {} for state partition {}",
+          id,
+          stateBeginningOffset,
+          statePartition);
+        consumer.seek(statePartition, stateBeginningOffset);
+
+        log.info("{} - Pausing message partition {}", id, messagePartition);
+        consumer.pause(List.of(messagePartition));
+
+        log.info("{} - Resuming state partition {}", id, statePartition);
+        consumer.resume(List.of(statePartition));
+      }
+      else
+      {
+        log.info(
+          "{} - State is up-to-date for message partition {}. New partition-state is ASSIGNED",
+          id,
+          messagePartition);
+        partitionStates[partition] = PartitionState.ASSIGNED;
+
+        log.info("{} - Pausing state partition {}...", id, statePartition);
+        consumer.pause(List.of(statePartition));
+
+        log.info("{} - Resuming message partition {}...", id, messagePartition);
+        consumer.resume(List.of(messagePartition));
+      }
+    });
   }
 
   @Override
-  public void onPartitionsRevoked(Collection<TopicPartition> partitions)
+  public synchronized void onPartitionsRevoked(Collection<TopicPartition> partitions)
   {
-    assignedPartitions.removeAll(partitions);
-    partitions.forEach(partition -> phaser.arriveAndDeregister());
-    log.info(
-      "{} - Removed {} parties for revoked partitions. New total number of parties: {}",
-      id,
-      partitions.size(),
-      phaser.getRegisteredParties());
+    partitions.forEach(partition ->
+    {
+      if (partition.topic().equals(topic))
+      {
+        log.info("{} - Revoking partition {}", id, partition);
+        assignedPartitions.remove(partition);
+
+        PartitionState partitionState = partitionStates[partition.partition()];
+        switch (partitionState)
+        {
+          case RESTORING:
+          case ASSIGNED:
+            phaser.arriveAndDeregister();
+            log.info(
+              "{} - Deregistered party for revoked partition {}. New total number of parties: {}",
+              id,
+              partition,
+              phaser.getRegisteredParties());
+            partitionStates[partition.partition()] = PartitionState.UNASSIGNED;
+            break;
+          default:
+          case UNASSIGNED:
+            log.warn("{} - partition {} in state {} was revoked!", id, partition, partitionState);
+        }
+      }
+    });
   }
 
   public void shutdown() throws InterruptedException
@@ -262,4 +413,11 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
     consumer.wakeup();
     workerThread.join();
   }
+
+  enum PartitionState
+  {
+    UNASSIGNED,
+    RESTORING,
+    ASSIGNED
+  }
 }