From: Kai Moritz Date: Sat, 23 Nov 2024 09:18:48 +0000 (+0100) Subject: `ConsumerHealthIndicator` refaktorisiert (Aufgaben klarer getrennt) X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=1253dd1f3917804afb649ed8e9510470eb2747eb;p=demos%2Fkafka%2Ftraining `ConsumerHealthIndicator` refaktorisiert (Aufgaben klarer getrennt) --- diff --git a/src/main/java/de/juplo/kafka/ConsumerHealthIndicator.java b/src/main/java/de/juplo/kafka/ConsumerHealthIndicator.java index c3bd2744..1478b082 100644 --- a/src/main/java/de/juplo/kafka/ConsumerHealthIndicator.java +++ b/src/main/java/de/juplo/kafka/ConsumerHealthIndicator.java @@ -51,28 +51,24 @@ public class ConsumerHealthIndicator implements HealthIndicator public void partitionsAssigned(Collection partitions) { - this.state = State.RUNNING; - List assignedPartitions = new LinkedList<>(this.assignedPartitions); - partitions.forEach(tp -> assignedPartitions.add(new Partition(tp.topic(), tp.partition()))); - Collections.sort(assignedPartitions, partitionComparator); - this.assignedPartitions = assignedPartitions; - addCurrentStateToHistory(); + List newAssignedPartitions = new LinkedList<>(this.assignedPartitions); + partitions.forEach(tp -> newAssignedPartitions.add(new Partition(tp.topic(), tp.partition()))); + Collections.sort(newAssignedPartitions, partitionComparator); + + updateAndRecordState(State.RUNNING, newAssignedPartitions); } public void partitionsRevoked(Collection partitions) { - this.state = State.REBALANCING; - List assignedPartitions = new LinkedList<>(this.assignedPartitions); - partitions.forEach(tp -> assignedPartitions.remove(new Partition(tp.topic(), tp.partition()))); - this.assignedPartitions = assignedPartitions; - addCurrentStateToHistory(); + List newAssignedPartitions = new LinkedList<>(this.assignedPartitions); + partitions.forEach(tp -> newAssignedPartitions.remove(new Partition(tp.topic(), tp.partition()))); + + updateAndRecordState(State.REBALANCING, newAssignedPartitions); } public void partitionsLost(Collection partitions) { - this.state = State.FENCED; - this.assignedPartitions = List.of(); - addCurrentStateToHistory(); + updateAndRecordState(State.FENCED, List.of()); } @@ -83,16 +79,20 @@ public class ConsumerHealthIndicator implements HealthIndicator : new Health.Builder().status(state.name()); } - private void addCurrentStateToHistory() + private void updateAndRecordState( + State newState, + List newAssignedPartitions) { - List history = new LinkedList<>(); - history.add(new StateAtTime(ZonedDateTime.now(clock), state, assignedPartitions)); - history.addAll(this.history); - if(history.size() > 10) + List newHistory = new LinkedList<>(); + newHistory.add(new StateAtTime(ZonedDateTime.now(clock), newState, newAssignedPartitions)); + newHistory.addAll(this.history); + if(newHistory.size() > 10) { - history.removeLast(); + newHistory.removeLast(); } - this.history = history; + this.state = newState; + this.assignedPartitions = newAssignedPartitions; + this.history = newHistory; } enum State { STARTING, FENCED, REBALANCING, RUNNING }