From 1b4ad353fca770b7dc4240f673fe389ffed8c6ae Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 23 Nov 2024 10:25:07 +0100 Subject: [PATCH] `ConsumerHealthIndicator` refaktorisiert (Klareres Benennungs-Schema) --- .../juplo/kafka/ConsumerHealthIndicator.java | 38 +++++++++++-------- 1 file changed, 22 insertions(+), 16 deletions(-) diff --git a/src/main/java/de/juplo/kafka/ConsumerHealthIndicator.java b/src/main/java/de/juplo/kafka/ConsumerHealthIndicator.java index 1478b082..dde95fcb 100644 --- a/src/main/java/de/juplo/kafka/ConsumerHealthIndicator.java +++ b/src/main/java/de/juplo/kafka/ConsumerHealthIndicator.java @@ -13,8 +13,8 @@ public class ConsumerHealthIndicator implements HealthIndicator { private final Clock clock; - private volatile State state; - private volatile List history; + private volatile RebalancingState rebalancingState; + private volatile List history; private volatile List assignedPartitions = List.of(); @@ -22,8 +22,8 @@ public class ConsumerHealthIndicator implements HealthIndicator { this.clock = clock; - state = State.STARTING; - StateAtTime sat = new StateAtTime(ZonedDateTime.now(clock), state, List.of()); + rebalancingState = RebalancingState.STARTING; + RecordedState sat = new RecordedState(ZonedDateTime.now(clock), rebalancingState, List.of()); history = List.of(sat); } @@ -35,7 +35,7 @@ public class ConsumerHealthIndicator implements HealthIndicator if (includeDetails) { - healthBuilder.withDetail("status", state); + healthBuilder.withDetail("rebalancing_state", rebalancingState); healthBuilder.withDetail("history", history); } @@ -55,7 +55,7 @@ public class ConsumerHealthIndicator implements HealthIndicator partitions.forEach(tp -> newAssignedPartitions.add(new Partition(tp.topic(), tp.partition()))); Collections.sort(newAssignedPartitions, partitionComparator); - updateAndRecordState(State.RUNNING, newAssignedPartitions); + updateAndRecordState(RebalancingState.RUNNING, newAssignedPartitions); } public void partitionsRevoked(Collection partitions) @@ -63,42 +63,48 @@ public class ConsumerHealthIndicator implements HealthIndicator List newAssignedPartitions = new LinkedList<>(this.assignedPartitions); partitions.forEach(tp -> newAssignedPartitions.remove(new Partition(tp.topic(), tp.partition()))); - updateAndRecordState(State.REBALANCING, newAssignedPartitions); + updateAndRecordState(RebalancingState.REBALANCING, newAssignedPartitions); } public void partitionsLost(Collection partitions) { - updateAndRecordState(State.FENCED, List.of()); + updateAndRecordState(RebalancingState.FENCED, List.of()); } private Health.Builder getHealthBuilder() { - return state == State.RUNNING + return rebalancingState == RebalancingState.RUNNING ? new Health.Builder().up() - : new Health.Builder().status(state.name()); + : new Health.Builder().status(rebalancingState.name()); } private void updateAndRecordState( - State newState, + RebalancingState newRebalancingState, List newAssignedPartitions) { - List newHistory = new LinkedList<>(); - newHistory.add(new StateAtTime(ZonedDateTime.now(clock), newState, newAssignedPartitions)); + List newHistory = new LinkedList<>(); + newHistory.add(new RecordedState( + ZonedDateTime.now(clock), + newRebalancingState, + newAssignedPartitions)); newHistory.addAll(this.history); if(newHistory.size() > 10) { newHistory.removeLast(); } - this.state = newState; + this.rebalancingState = newRebalancingState; this.assignedPartitions = newAssignedPartitions; this.history = newHistory; } - enum State { STARTING, FENCED, REBALANCING, RUNNING } + enum RebalancingState { STARTING, FENCED, REBALANCING, RUNNING } public record Partition(String topic, Integer partition) {} - public record StateAtTime(ZonedDateTime time, State state, List assignedPartitions) {} + public record RecordedState( + ZonedDateTime time, + RebalancingState rebalancingState, + List assignedPartitions) {} private final static Comparator partitionComparator = (tp1, tp2) -> -- 2.20.1