`ConsumerHealthIndicator` refaktorisiert (DRY)
authorKai Moritz <kai@juplo.de>
Mon, 18 Nov 2024 18:24:49 +0000 (19:24 +0100)
committerKai Moritz <kai@juplo.de>
Sat, 23 Nov 2024 09:08:59 +0000 (10:08 +0100)
src/main/java/de/juplo/kafka/ConsumerHealthIndicator.java

index 47d7593..c3bd274 100644 (file)
@@ -15,7 +15,7 @@ public class ConsumerHealthIndicator implements HealthIndicator
 
   private volatile State state;
   private volatile List<StateAtTime> history;
-  private volatile List<Partition> assignedPartitions;
+  private volatile List<Partition> assignedPartitions = List.of();
 
 
   public ConsumerHealthIndicator(Clock clock)
@@ -52,50 +52,27 @@ public class ConsumerHealthIndicator implements HealthIndicator
   public void partitionsAssigned(Collection<TopicPartition> partitions)
   {
     this.state = State.RUNNING;
-    List<Partition> assignedPartitions = new LinkedList<>();
-    assignedPartitions.addAll(assignedPartitions);
+    List<Partition> assignedPartitions = new LinkedList<>(this.assignedPartitions);
     partitions.forEach(tp -> assignedPartitions.add(new Partition(tp.topic(), tp.partition())));
     Collections.sort(assignedPartitions, partitionComparator);
     this.assignedPartitions = assignedPartitions;
-    List<StateAtTime> history = new LinkedList<>();
-    history.add(new StateAtTime(ZonedDateTime.now(clock), state, assignedPartitions));
-    history.addAll(this.history);
-    if(history.size() > 10)
-    {
-      history.removeLast();
-    }
-    this.history = history;
+    addCurrentStateToHistory();
   }
 
   public void partitionsRevoked(Collection<TopicPartition> partitions)
   {
     this.state = State.REBALANCING;
-    List<Partition> assignedPartitions = new LinkedList<>();
-    assignedPartitions.addAll(assignedPartitions);
+    List<Partition> assignedPartitions = new LinkedList<>(this.assignedPartitions);
     partitions.forEach(tp -> assignedPartitions.remove(new Partition(tp.topic(), tp.partition())));
     this.assignedPartitions = assignedPartitions;
-    List<StateAtTime> history = new LinkedList<>();
-    history.add(new StateAtTime(ZonedDateTime.now(clock), state, assignedPartitions));
-    history.addAll(this.history);
-    if(history.size() > 10)
-    {
-      history.removeLast();
-    }
-    this.history = history;
+    addCurrentStateToHistory();
   }
 
   public void partitionsLost(Collection<TopicPartition> partitions)
   {
     this.state = State.FENCED;
     this.assignedPartitions = List.of();
-    List<StateAtTime> history = new LinkedList<>();
-    history.add(new StateAtTime(ZonedDateTime.now(clock), state, assignedPartitions));
-    history.addAll(this.history);
-    if(history.size() > 10)
-    {
-      history.removeLast();
-    }
-    this.history = history;
+    addCurrentStateToHistory();
   }
 
 
@@ -106,6 +83,18 @@ public class ConsumerHealthIndicator implements HealthIndicator
       : new Health.Builder().status(state.name());
   }
 
+  private void addCurrentStateToHistory()
+  {
+    List<StateAtTime> history = new LinkedList<>();
+    history.add(new StateAtTime(ZonedDateTime.now(clock), state, assignedPartitions));
+    history.addAll(this.history);
+    if(history.size() > 10)
+    {
+      history.removeLast();
+    }
+    this.history = history;
+  }
+
   enum State { STARTING, FENCED, REBALANCING, RUNNING }
 
   public record Partition(String topic, Integer partition) {}