From 3e188504a982da70c4a092e5fdc46e3b6f8ca227 Mon Sep 17 00:00:00 2001 From: Kai Moritz <kai@juplo.de> Date: Sat, 23 Nov 2024 10:25:07 +0100 Subject: [PATCH] `ConsumerHealthIndicator` refaktorisiert (Klareres Benennungs-Schema) --- .../juplo/kafka/ConsumerHealthIndicator.java | 38 +++++++++++-------- 1 file changed, 22 insertions(+), 16 deletions(-) diff --git a/src/main/java/de/juplo/kafka/ConsumerHealthIndicator.java b/src/main/java/de/juplo/kafka/ConsumerHealthIndicator.java index 1478b082..dde95fcb 100644 --- a/src/main/java/de/juplo/kafka/ConsumerHealthIndicator.java +++ b/src/main/java/de/juplo/kafka/ConsumerHealthIndicator.java @@ -13,8 +13,8 @@ public class ConsumerHealthIndicator implements HealthIndicator { private final Clock clock; - private volatile State state; - private volatile List<StateAtTime> history; + private volatile RebalancingState rebalancingState; + private volatile List<RecordedState> history; private volatile List<Partition> assignedPartitions = List.of(); @@ -22,8 +22,8 @@ public class ConsumerHealthIndicator implements HealthIndicator { this.clock = clock; - state = State.STARTING; - StateAtTime sat = new StateAtTime(ZonedDateTime.now(clock), state, List.of()); + rebalancingState = RebalancingState.STARTING; + RecordedState sat = new RecordedState(ZonedDateTime.now(clock), rebalancingState, List.of()); history = List.of(sat); } @@ -35,7 +35,7 @@ public class ConsumerHealthIndicator implements HealthIndicator if (includeDetails) { - healthBuilder.withDetail("status", state); + healthBuilder.withDetail("rebalancing_state", rebalancingState); healthBuilder.withDetail("history", history); } @@ -55,7 +55,7 @@ public class ConsumerHealthIndicator implements HealthIndicator partitions.forEach(tp -> newAssignedPartitions.add(new Partition(tp.topic(), tp.partition()))); Collections.sort(newAssignedPartitions, partitionComparator); - updateAndRecordState(State.RUNNING, newAssignedPartitions); + updateAndRecordState(RebalancingState.RUNNING, newAssignedPartitions); } public void partitionsRevoked(Collection<TopicPartition> partitions) @@ -63,42 +63,48 @@ public class ConsumerHealthIndicator implements HealthIndicator List<Partition> newAssignedPartitions = new LinkedList<>(this.assignedPartitions); partitions.forEach(tp -> newAssignedPartitions.remove(new Partition(tp.topic(), tp.partition()))); - updateAndRecordState(State.REBALANCING, newAssignedPartitions); + updateAndRecordState(RebalancingState.REBALANCING, newAssignedPartitions); } public void partitionsLost(Collection<TopicPartition> partitions) { - updateAndRecordState(State.FENCED, List.of()); + updateAndRecordState(RebalancingState.FENCED, List.of()); } private Health.Builder getHealthBuilder() { - return state == State.RUNNING + return rebalancingState == RebalancingState.RUNNING ? new Health.Builder().up() - : new Health.Builder().status(state.name()); + : new Health.Builder().status(rebalancingState.name()); } private void updateAndRecordState( - State newState, + RebalancingState newRebalancingState, List<Partition> newAssignedPartitions) { - List<StateAtTime> newHistory = new LinkedList<>(); - newHistory.add(new StateAtTime(ZonedDateTime.now(clock), newState, newAssignedPartitions)); + List<RecordedState> newHistory = new LinkedList<>(); + newHistory.add(new RecordedState( + ZonedDateTime.now(clock), + newRebalancingState, + newAssignedPartitions)); newHistory.addAll(this.history); if(newHistory.size() > 10) { newHistory.removeLast(); } - this.state = newState; + this.rebalancingState = newRebalancingState; this.assignedPartitions = newAssignedPartitions; this.history = newHistory; } - enum State { STARTING, FENCED, REBALANCING, RUNNING } + enum RebalancingState { STARTING, FENCED, REBALANCING, RUNNING } public record Partition(String topic, Integer partition) {} - public record StateAtTime(ZonedDateTime time, State state, List<Partition> assignedPartitions) {} + public record RecordedState( + ZonedDateTime time, + RebalancingState rebalancingState, + List<Partition> assignedPartitions) {} private final static Comparator<Partition> partitionComparator = (tp1, tp2) -> -- 2.20.1