Der Zählerzustand wird separat pro Partition verwaltet
authorKai Moritz <kai@juplo.de>
Sat, 2 Nov 2024 15:54:01 +0000 (16:54 +0100)
committerKai Moritz <kai@juplo.de>
Sat, 2 Nov 2024 15:54:01 +0000 (16:54 +0100)
* Dadurch ist es möglich, den Zustand für entzogene Partitionen zu löschen.
* D.h., bei der Ausgabe ist immer klar ersichtlich, über welchen Zustand
  die angefragte Instanz gerade verfügt.

src/main/java/de/juplo/kafka/CounterStateController.java
src/main/java/de/juplo/kafka/ExampleConsumer.java

index 3d96582..723a8d0 100644 (file)
@@ -15,7 +15,7 @@ public class CounterStateController
   private final ExampleConsumer consumer;
 
   @GetMapping
-  Map<String, Long> getAllCounters()
+  Map<Integer, Map<String, Long>> getAllCounters()
   {
     return new HashMap<>(consumer.getCounterState());
   }
index 2c65cdf..c8a2f10 100644 (file)
@@ -23,7 +23,6 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
   private final Consumer<String, String> consumer;
   private final Thread workerThread;
 
-  private final CounterState counterState = new CounterState();
   private final String stateTopic;
   Producer<String, String> producer;
 
@@ -31,6 +30,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
   private final Phaser phaser = new Phaser(1);
   private final Set<TopicPartition> assignedPartitions = new HashSet<>();
   private volatile PartitionState[] partitionStates;
+  private CounterState[] counterState;
   private volatile long[] stateEndOffsets;
   private volatile int[] seen;
   private volatile int[] acked;
@@ -69,6 +69,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
       {
         partitionStates[i] = PartitionState.UNASSIGNED;
       }
+      counterState = new CounterState[numPartitions];
       stateEndOffsets = new long[numPartitions];
       seen = new int[numPartitions];
       acked = new int[numPartitions];
@@ -166,7 +167,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
     String key,
     String value)
   {
-    counterState.setCounterState(key, Long.parseLong(value));
+    counterState[partition].setCounterState(key, Long.parseLong(value));
     if (offset + 1 == stateEndOffsets[partition])
     {
       log.info("{} - Restoring of state for partition {} done!", id, partition);
@@ -188,19 +189,21 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
     Integer partition,
     String key)
   {
-    Long counter = computeCount(key);
+    Long counter = computeCount(partition, key);
     log.info("{} - current value for counter {}: {}", id, key, counter);
     sendCounterState(partition, key, counter);
   }
 
-  private synchronized Long computeCount(String key)
+  private synchronized Long computeCount(int partition, String key)
   {
-    return counterState.addToCounter(key);
+    return counterState[partition].addToCounter(key);
   }
 
-  public Map<String, Long> getCounterState()
+  public Map<Integer, Map<String, Long>> getCounterState()
   {
-    return counterState.getCounterState();
+    Map<Integer, Map<String, Long>> result = new HashMap<>(assignedPartitions.size());
+    assignedPartitions.forEach(tp -> result.put(tp.partition(), counterState[tp.partition()].getCounterState()));
+    return result;
   }
 
   void sendCounterState(int partition, String key, Long counter)
@@ -300,6 +303,8 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
 
   private void restoreAndAssign(int partition)
   {
+    counterState[partition] = new CounterState();
+
     TopicPartition statePartition = new TopicPartition(this.stateTopic, partition);
 
     long stateEndOffset = consumer
@@ -408,6 +413,7 @@ public class ExampleConsumer implements Runnable, ConsumerRebalanceListener
       TopicPartition messagePartition = new TopicPartition(topic, partition);
       log.info("{} - Revoking partition {}", id, messagePartition);
       assignedPartitions.remove(messagePartition);
+      counterState[partition] = null;
 
       phaser.arriveAndDeregister();
       log.info(