From: Kai Moritz Date: Mon, 18 Nov 2024 18:24:49 +0000 (+0100) Subject: `ConsumerHealthIndicator` refaktorisiert (DRY) X-Git-Tag: consumer/spring-consumer--health-indicator--2025-02-signal~3 X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=2be28e666dd42943a6717f8012ad1983bc2c7bad;p=demos%2Fkafka%2Ftraining `ConsumerHealthIndicator` refaktorisiert (DRY) --- diff --git a/src/main/java/de/juplo/kafka/ConsumerHealthIndicator.java b/src/main/java/de/juplo/kafka/ConsumerHealthIndicator.java index 47d75936..c3bd2744 100644 --- a/src/main/java/de/juplo/kafka/ConsumerHealthIndicator.java +++ b/src/main/java/de/juplo/kafka/ConsumerHealthIndicator.java @@ -15,7 +15,7 @@ public class ConsumerHealthIndicator implements HealthIndicator private volatile State state; private volatile List history; - private volatile List assignedPartitions; + private volatile List assignedPartitions = List.of(); public ConsumerHealthIndicator(Clock clock) @@ -52,50 +52,27 @@ public class ConsumerHealthIndicator implements HealthIndicator public void partitionsAssigned(Collection partitions) { this.state = State.RUNNING; - List assignedPartitions = new LinkedList<>(); - assignedPartitions.addAll(assignedPartitions); + List assignedPartitions = new LinkedList<>(this.assignedPartitions); partitions.forEach(tp -> assignedPartitions.add(new Partition(tp.topic(), tp.partition()))); Collections.sort(assignedPartitions, partitionComparator); this.assignedPartitions = assignedPartitions; - List history = new LinkedList<>(); - history.add(new StateAtTime(ZonedDateTime.now(clock), state, assignedPartitions)); - history.addAll(this.history); - if(history.size() > 10) - { - history.removeLast(); - } - this.history = history; + addCurrentStateToHistory(); } public void partitionsRevoked(Collection partitions) { this.state = State.REBALANCING; - List assignedPartitions = new LinkedList<>(); - assignedPartitions.addAll(assignedPartitions); + List assignedPartitions = new LinkedList<>(this.assignedPartitions); partitions.forEach(tp -> assignedPartitions.remove(new Partition(tp.topic(), tp.partition()))); this.assignedPartitions = assignedPartitions; - List history = new LinkedList<>(); - history.add(new StateAtTime(ZonedDateTime.now(clock), state, assignedPartitions)); - history.addAll(this.history); - if(history.size() > 10) - { - history.removeLast(); - } - this.history = history; + addCurrentStateToHistory(); } public void partitionsLost(Collection partitions) { this.state = State.FENCED; this.assignedPartitions = List.of(); - List history = new LinkedList<>(); - history.add(new StateAtTime(ZonedDateTime.now(clock), state, assignedPartitions)); - history.addAll(this.history); - if(history.size() > 10) - { - history.removeLast(); - } - this.history = history; + addCurrentStateToHistory(); } @@ -106,6 +83,18 @@ public class ConsumerHealthIndicator implements HealthIndicator : new Health.Builder().status(state.name()); } + private void addCurrentStateToHistory() + { + List history = new LinkedList<>(); + history.add(new StateAtTime(ZonedDateTime.now(clock), state, assignedPartitions)); + history.addAll(this.history); + if(history.size() > 10) + { + history.removeLast(); + } + this.history = history; + } + enum State { STARTING, FENCED, REBALANCING, RUNNING } public record Partition(String topic, Integer partition) {}