Der Zählerzustand wird separat pro Partition verwaltet
authorKai Moritz <kai@juplo.de>
Sat, 2 Nov 2024 15:54:01 +0000 (16:54 +0100)
committerKai Moritz <kai@juplo.de>
Fri, 8 Nov 2024 17:21:16 +0000 (18:21 +0100)
* Dadurch ist es möglich, den Zustand für entzogene Partitionen zu löschen.
* D.h., bei der Ausgabe ist immer klar ersichtlich, über welchen Zustand
  die angefragte Instanz gerade verfügt.

src/main/java/de/juplo/kafka/CounterStateController.java
src/main/java/de/juplo/kafka/ExampleConsumer.java

index 3d96582..723a8d0 100644 (file)
@@ -15,7 +15,7 @@ public class CounterStateController
   private final ExampleConsumer consumer;
 
   @GetMapping
-  Map<String, Long> getAllCounters()
+  Map<Integer, Map<String, Long>> getAllCounters()
   {
     return new HashMap<>(consumer.getCounterState());
   }
index 42daf03..112e2ff 100644 (file)
@@ -24,7 +24,6 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
   private final Thread workerThread;
   private final Runnable closeCallback;
 
-  private final CounterState counterState = new CounterState();
   private final String stateTopic;
   private final Producer<String, String> producer;
 
@@ -32,6 +31,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
   private final Phaser phaser = new Phaser(1);
   private final Set<TopicPartition> assignedPartitions = new HashSet<>();
   private volatile PartitionState[] partitionStates;
+  private CounterState[] counterState;
   private volatile long[] stateEndOffsets;
   private volatile int[] seen;
   private volatile int[] acked;
@@ -73,6 +73,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
       {
         partitionStates[i] = PartitionState.UNASSIGNED;
       }
+      counterState = new CounterState[numPartitions];
       stateEndOffsets = new long[numPartitions];
       seen = new int[numPartitions];
       acked = new int[numPartitions];
@@ -174,7 +175,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
     String key,
     String value)
   {
-    counterState.setCounterState(key, Long.parseLong(value));
+    counterState[partition].setCounterState(key, Long.parseLong(value));
     if (offset + 1 == stateEndOffsets[partition])
     {
       log.info("{} - Restoring of state for partition {} done!", id, partition);
@@ -196,19 +197,21 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
     Integer partition,
     String key)
   {
-    Long counter = computeCount(key);
+    Long counter = computeCount(partition, key);
     log.info("{} - current value for counter {}: {}", id, key, counter);
     sendCounterState(partition, key, counter);
   }
 
-  private synchronized Long computeCount(String key)
+  private synchronized Long computeCount(int partition, String key)
   {
-    return counterState.addToCounter(key);
+    return counterState[partition].addToCounter(key);
   }
 
-  public Map<String, Long> getCounterState()
+  public Map<Integer, Map<String, Long>> getCounterState()
   {
-    return counterState.getCounterState();
+    Map<Integer, Map<String, Long>> result = new HashMap<>(assignedPartitions.size());
+    assignedPartitions.forEach(tp -> result.put(tp.partition(), counterState[tp.partition()].getCounterState()));
+    return result;
   }
 
   void sendCounterState(int partition, String key, Long counter)
@@ -308,6 +311,8 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
 
   private void restoreAndAssign(int partition)
   {
+    counterState[partition] = new CounterState();
+
     TopicPartition statePartition = new TopicPartition(this.stateTopic, partition);
 
     long stateEndOffset = consumer
@@ -416,6 +421,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
       TopicPartition messagePartition = new TopicPartition(topic, partition);
       log.info("{} - Revoking partition {}", id, messagePartition);
       assignedPartitions.remove(messagePartition);
+      counterState[partition] = null;
 
       phaser.arriveAndDeregister();
       log.info(