`ConsumerHealthIndicator` refaktorisiert (Aufgaben klarer getrennt)
authorKai Moritz <kai@juplo.de>
Sat, 23 Nov 2024 09:18:48 +0000 (10:18 +0100)
committerKai Moritz <kai@juplo.de>
Sat, 23 Nov 2024 09:18:48 +0000 (10:18 +0100)
src/main/java/de/juplo/kafka/ConsumerHealthIndicator.java

index c3bd274..1478b08 100644 (file)
@@ -51,28 +51,24 @@ public class ConsumerHealthIndicator implements HealthIndicator
 
   public void partitionsAssigned(Collection<TopicPartition> partitions)
   {
-    this.state = State.RUNNING;
-    List<Partition> assignedPartitions = new LinkedList<>(this.assignedPartitions);
-    partitions.forEach(tp -> assignedPartitions.add(new Partition(tp.topic(), tp.partition())));
-    Collections.sort(assignedPartitions, partitionComparator);
-    this.assignedPartitions = assignedPartitions;
-    addCurrentStateToHistory();
+    List<Partition> newAssignedPartitions = new LinkedList<>(this.assignedPartitions);
+    partitions.forEach(tp -> newAssignedPartitions.add(new Partition(tp.topic(), tp.partition())));
+    Collections.sort(newAssignedPartitions, partitionComparator);
+
+    updateAndRecordState(State.RUNNING, newAssignedPartitions);
   }
 
   public void partitionsRevoked(Collection<TopicPartition> partitions)
   {
-    this.state = State.REBALANCING;
-    List<Partition> assignedPartitions = new LinkedList<>(this.assignedPartitions);
-    partitions.forEach(tp -> assignedPartitions.remove(new Partition(tp.topic(), tp.partition())));
-    this.assignedPartitions = assignedPartitions;
-    addCurrentStateToHistory();
+    List<Partition> newAssignedPartitions = new LinkedList<>(this.assignedPartitions);
+    partitions.forEach(tp -> newAssignedPartitions.remove(new Partition(tp.topic(), tp.partition())));
+
+    updateAndRecordState(State.REBALANCING, newAssignedPartitions);
   }
 
   public void partitionsLost(Collection<TopicPartition> partitions)
   {
-    this.state = State.FENCED;
-    this.assignedPartitions = List.of();
-    addCurrentStateToHistory();
+    updateAndRecordState(State.FENCED, List.of());
   }
 
 
@@ -83,16 +79,20 @@ public class ConsumerHealthIndicator implements HealthIndicator
       : new Health.Builder().status(state.name());
   }
 
-  private void addCurrentStateToHistory()
+  private void updateAndRecordState(
+    State newState,
+    List<Partition> newAssignedPartitions)
   {
-    List<StateAtTime> history = new LinkedList<>();
-    history.add(new StateAtTime(ZonedDateTime.now(clock), state, assignedPartitions));
-    history.addAll(this.history);
-    if(history.size() > 10)
+    List<StateAtTime> newHistory = new LinkedList<>();
+    newHistory.add(new StateAtTime(ZonedDateTime.now(clock), newState, newAssignedPartitions));
+    newHistory.addAll(this.history);
+    if(newHistory.size() > 10)
     {
-      history.removeLast();
+      newHistory.removeLast();
     }
-    this.history = history;
+    this.state = newState;
+    this.assignedPartitions = newAssignedPartitions;
+    this.history = newHistory;
   }
 
   enum State { STARTING, FENCED, REBALANCING, RUNNING }