From 1334c18252de3637b4afad756fe11f12b7850cf7 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Mon, 18 Nov 2024 19:24:49 +0100 Subject: [PATCH] `ConsumerHealthIndicator` refaktorisiert (DRY) --- .../juplo/kafka/ConsumerHealthIndicator.java | 47 +++++++------------ 1 file changed, 18 insertions(+), 29 deletions(-) diff --git a/src/main/java/de/juplo/kafka/ConsumerHealthIndicator.java b/src/main/java/de/juplo/kafka/ConsumerHealthIndicator.java index 47d75936..c3bd2744 100644 --- a/src/main/java/de/juplo/kafka/ConsumerHealthIndicator.java +++ b/src/main/java/de/juplo/kafka/ConsumerHealthIndicator.java @@ -15,7 +15,7 @@ public class ConsumerHealthIndicator implements HealthIndicator private volatile State state; private volatile List history; - private volatile List assignedPartitions; + private volatile List assignedPartitions = List.of(); public ConsumerHealthIndicator(Clock clock) @@ -52,50 +52,27 @@ public class ConsumerHealthIndicator implements HealthIndicator public void partitionsAssigned(Collection partitions) { this.state = State.RUNNING; - List assignedPartitions = new LinkedList<>(); - assignedPartitions.addAll(assignedPartitions); + List assignedPartitions = new LinkedList<>(this.assignedPartitions); partitions.forEach(tp -> assignedPartitions.add(new Partition(tp.topic(), tp.partition()))); Collections.sort(assignedPartitions, partitionComparator); this.assignedPartitions = assignedPartitions; - List history = new LinkedList<>(); - history.add(new StateAtTime(ZonedDateTime.now(clock), state, assignedPartitions)); - history.addAll(this.history); - if(history.size() > 10) - { - history.removeLast(); - } - this.history = history; + addCurrentStateToHistory(); } public void partitionsRevoked(Collection partitions) { this.state = State.REBALANCING; - List assignedPartitions = new LinkedList<>(); - assignedPartitions.addAll(assignedPartitions); + List assignedPartitions = new LinkedList<>(this.assignedPartitions); partitions.forEach(tp -> assignedPartitions.remove(new Partition(tp.topic(), tp.partition()))); this.assignedPartitions = assignedPartitions; - List history = new LinkedList<>(); - history.add(new StateAtTime(ZonedDateTime.now(clock), state, assignedPartitions)); - history.addAll(this.history); - if(history.size() > 10) - { - history.removeLast(); - } - this.history = history; + addCurrentStateToHistory(); } public void partitionsLost(Collection partitions) { this.state = State.FENCED; this.assignedPartitions = List.of(); - List history = new LinkedList<>(); - history.add(new StateAtTime(ZonedDateTime.now(clock), state, assignedPartitions)); - history.addAll(this.history); - if(history.size() > 10) - { - history.removeLast(); - } - this.history = history; + addCurrentStateToHistory(); } @@ -106,6 +83,18 @@ public class ConsumerHealthIndicator implements HealthIndicator : new Health.Builder().status(state.name()); } + private void addCurrentStateToHistory() + { + List history = new LinkedList<>(); + history.add(new StateAtTime(ZonedDateTime.now(clock), state, assignedPartitions)); + history.addAll(this.history); + if(history.size() > 10) + { + history.removeLast(); + } + this.history = history; + } + enum State { STARTING, FENCED, REBALANCING, RUNNING } public record Partition(String topic, Integer partition) {} -- 2.20.1