`ConsumerHealthIndicator` refaktorisiert (Klareres Benennungs-Schema)
authorKai Moritz <kai@juplo.de>
Sat, 23 Nov 2024 09:25:07 +0000 (10:25 +0100)
committerKai Moritz <kai@juplo.de>
Sat, 23 Nov 2024 09:25:07 +0000 (10:25 +0100)
src/main/java/de/juplo/kafka/ConsumerHealthIndicator.java

index 1478b08..dde95fc 100644 (file)
@@ -13,8 +13,8 @@ public class ConsumerHealthIndicator implements HealthIndicator
 {
   private final Clock clock;
 
-  private volatile State state;
-  private volatile List<StateAtTime> history;
+  private volatile RebalancingState rebalancingState;
+  private volatile List<RecordedState> history;
   private volatile List<Partition> assignedPartitions = List.of();
 
 
@@ -22,8 +22,8 @@ public class ConsumerHealthIndicator implements HealthIndicator
   {
     this.clock = clock;
 
-    state = State.STARTING;
-    StateAtTime sat = new StateAtTime(ZonedDateTime.now(clock), state, List.of());
+    rebalancingState = RebalancingState.STARTING;
+    RecordedState sat = new RecordedState(ZonedDateTime.now(clock), rebalancingState, List.of());
     history = List.of(sat);
   }
 
@@ -35,7 +35,7 @@ public class ConsumerHealthIndicator implements HealthIndicator
 
     if (includeDetails)
     {
-      healthBuilder.withDetail("status", state);
+      healthBuilder.withDetail("rebalancing_state", rebalancingState);
       healthBuilder.withDetail("history", history);
     }
 
@@ -55,7 +55,7 @@ public class ConsumerHealthIndicator implements HealthIndicator
     partitions.forEach(tp -> newAssignedPartitions.add(new Partition(tp.topic(), tp.partition())));
     Collections.sort(newAssignedPartitions, partitionComparator);
 
-    updateAndRecordState(State.RUNNING, newAssignedPartitions);
+    updateAndRecordState(RebalancingState.RUNNING, newAssignedPartitions);
   }
 
   public void partitionsRevoked(Collection<TopicPartition> partitions)
@@ -63,42 +63,48 @@ public class ConsumerHealthIndicator implements HealthIndicator
     List<Partition> newAssignedPartitions = new LinkedList<>(this.assignedPartitions);
     partitions.forEach(tp -> newAssignedPartitions.remove(new Partition(tp.topic(), tp.partition())));
 
-    updateAndRecordState(State.REBALANCING, newAssignedPartitions);
+    updateAndRecordState(RebalancingState.REBALANCING, newAssignedPartitions);
   }
 
   public void partitionsLost(Collection<TopicPartition> partitions)
   {
-    updateAndRecordState(State.FENCED, List.of());
+    updateAndRecordState(RebalancingState.FENCED, List.of());
   }
 
 
   private Health.Builder getHealthBuilder()
   {
-    return state == State.RUNNING
+    return rebalancingState == RebalancingState.RUNNING
       ? new Health.Builder().up()
-      : new Health.Builder().status(state.name());
+      : new Health.Builder().status(rebalancingState.name());
   }
 
   private void updateAndRecordState(
-    State newState,
+    RebalancingState newRebalancingState,
     List<Partition> newAssignedPartitions)
   {
-    List<StateAtTime> newHistory = new LinkedList<>();
-    newHistory.add(new StateAtTime(ZonedDateTime.now(clock), newState, newAssignedPartitions));
+    List<RecordedState> newHistory = new LinkedList<>();
+    newHistory.add(new RecordedState(
+      ZonedDateTime.now(clock),
+      newRebalancingState,
+      newAssignedPartitions));
     newHistory.addAll(this.history);
     if(newHistory.size() > 10)
     {
       newHistory.removeLast();
     }
-    this.state = newState;
+    this.rebalancingState = newRebalancingState;
     this.assignedPartitions = newAssignedPartitions;
     this.history = newHistory;
   }
 
-  enum State { STARTING, FENCED, REBALANCING, RUNNING }
+  enum RebalancingState { STARTING, FENCED, REBALANCING, RUNNING }
 
   public record Partition(String topic, Integer partition) {}
-  public record StateAtTime(ZonedDateTime time, State state, List<Partition> assignedPartitions) {}
+  public record RecordedState(
+    ZonedDateTime time,
+    RebalancingState rebalancingState,
+    List<Partition> assignedPartitions) {}
 
 
   private final static Comparator<Partition> partitionComparator = (tp1, tp2) ->