From 402ba85490cc2a7b3ed3834f79f20dd499624077 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 23 Nov 2024 10:18:48 +0100 Subject: [PATCH] `ConsumerHealthIndicator` refaktorisiert (Aufgaben klarer getrennt) --- .../juplo/kafka/ConsumerHealthIndicator.java | 42 +++++++++---------- 1 file changed, 21 insertions(+), 21 deletions(-) diff --git a/src/main/java/de/juplo/kafka/ConsumerHealthIndicator.java b/src/main/java/de/juplo/kafka/ConsumerHealthIndicator.java index c3bd2744..1478b082 100644 --- a/src/main/java/de/juplo/kafka/ConsumerHealthIndicator.java +++ b/src/main/java/de/juplo/kafka/ConsumerHealthIndicator.java @@ -51,28 +51,24 @@ public class ConsumerHealthIndicator implements HealthIndicator public void partitionsAssigned(Collection partitions) { - this.state = State.RUNNING; - List assignedPartitions = new LinkedList<>(this.assignedPartitions); - partitions.forEach(tp -> assignedPartitions.add(new Partition(tp.topic(), tp.partition()))); - Collections.sort(assignedPartitions, partitionComparator); - this.assignedPartitions = assignedPartitions; - addCurrentStateToHistory(); + List newAssignedPartitions = new LinkedList<>(this.assignedPartitions); + partitions.forEach(tp -> newAssignedPartitions.add(new Partition(tp.topic(), tp.partition()))); + Collections.sort(newAssignedPartitions, partitionComparator); + + updateAndRecordState(State.RUNNING, newAssignedPartitions); } public void partitionsRevoked(Collection partitions) { - this.state = State.REBALANCING; - List assignedPartitions = new LinkedList<>(this.assignedPartitions); - partitions.forEach(tp -> assignedPartitions.remove(new Partition(tp.topic(), tp.partition()))); - this.assignedPartitions = assignedPartitions; - addCurrentStateToHistory(); + List newAssignedPartitions = new LinkedList<>(this.assignedPartitions); + partitions.forEach(tp -> newAssignedPartitions.remove(new Partition(tp.topic(), tp.partition()))); + + updateAndRecordState(State.REBALANCING, newAssignedPartitions); } public void partitionsLost(Collection partitions) { - this.state = State.FENCED; - this.assignedPartitions = List.of(); - addCurrentStateToHistory(); + updateAndRecordState(State.FENCED, List.of()); } @@ -83,16 +79,20 @@ public class ConsumerHealthIndicator implements HealthIndicator : new Health.Builder().status(state.name()); } - private void addCurrentStateToHistory() + private void updateAndRecordState( + State newState, + List newAssignedPartitions) { - List history = new LinkedList<>(); - history.add(new StateAtTime(ZonedDateTime.now(clock), state, assignedPartitions)); - history.addAll(this.history); - if(history.size() > 10) + List newHistory = new LinkedList<>(); + newHistory.add(new StateAtTime(ZonedDateTime.now(clock), newState, newAssignedPartitions)); + newHistory.addAll(this.history); + if(newHistory.size() > 10) { - history.removeLast(); + newHistory.removeLast(); } - this.history = history; + this.state = newState; + this.assignedPartitions = newAssignedPartitions; + this.history = newHistory; } enum State { STARTING, FENCED, REBALANCING, RUNNING } -- 2.20.1