From bfdb31ea8ae684b9df5c69ba96e4aa8be1e28d78 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Mon, 18 Nov 2024 07:38:42 +0100 Subject: [PATCH] `HealthIndicator` mit Details zu Rebalances des Consumers implementiert MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * `ConsumerHealthIndicator` refaktorisiert (DRY) * `ConsumerHealthIndicator` refaktorisiert (Aufgaben klarer getrennt) * `ConsumerHealthIndicator` refaktorisiert (Klareres Benennungs-Schema) * `ConsumerHealthIndicator` refaktorisiert (Methoden aufgeräumt) --- .../juplo/kafka/ApplicationConfiguration.java | 21 ++++ .../juplo/kafka/ConsumerHealthIndicator.java | 116 ++++++++++++++++++ ...HealthIndicatorAwareRebalanceListener.java | 33 +++++ .../java/de/juplo/kafka/ExampleConsumer.java | 6 +- src/main/resources/application.yml | 2 + .../de/juplo/kafka/ExampleConsumerTest.java | 10 ++ 6 files changed, 187 insertions(+), 1 deletion(-) create mode 100644 src/main/java/de/juplo/kafka/ConsumerHealthIndicator.java create mode 100644 src/main/java/de/juplo/kafka/ConsumerHealthIndicatorAwareRebalanceListener.java diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index c4174842..bdcd94ee 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -11,6 +11,7 @@ import org.springframework.context.ConfigurableApplicationContext; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import java.time.Clock; import java.util.Properties; @@ -23,6 +24,7 @@ public class ApplicationConfiguration public ExampleConsumer exampleConsumer( Consumer kafkaConsumer, RecordHandler recordHandler, + ConsumerHealthIndicatorAwareRebalanceListener consumerHealthIndicatorAwareRebalanceListener, ApplicationProperties properties, ConfigurableApplicationContext applicationContext) { @@ -32,6 +34,7 @@ public class ApplicationConfiguration properties.getConsumerProperties().getTopic(), kafkaConsumer, recordHandler, + consumerHealthIndicatorAwareRebalanceListener, () -> applicationContext.close()); } @@ -41,6 +44,24 @@ public class ApplicationConfiguration return (topic, partition, offset, key, value) -> log.info("No-Ops Handler called for {}={}", key, value); } + @Bean + public ConsumerHealthIndicatorAwareRebalanceListener rebalanceListener(ConsumerHealthIndicator consumerHealthIndicator) + { + return new ConsumerHealthIndicatorAwareRebalanceListener(consumerHealthIndicator); + } + + @Bean + public ConsumerHealthIndicator consumerHealthIndicator(Clock clock) + { + return new ConsumerHealthIndicator(clock); + } + + @Bean + public Clock clock() + { + return Clock.systemDefaultZone(); + } + @Bean(destroyMethod = "") public KafkaConsumer kafkaConsumer(ApplicationProperties properties) { diff --git a/src/main/java/de/juplo/kafka/ConsumerHealthIndicator.java b/src/main/java/de/juplo/kafka/ConsumerHealthIndicator.java new file mode 100644 index 00000000..30fb7322 --- /dev/null +++ b/src/main/java/de/juplo/kafka/ConsumerHealthIndicator.java @@ -0,0 +1,116 @@ +package de.juplo.kafka; + +import org.apache.kafka.common.TopicPartition; +import org.springframework.boot.actuate.health.Health; +import org.springframework.boot.actuate.health.HealthIndicator; + +import java.time.Clock; +import java.time.ZonedDateTime; +import java.util.*; + + +public class ConsumerHealthIndicator implements HealthIndicator +{ + private final Clock clock; + + private volatile RebalancingState rebalancingState; + private volatile List history; + private volatile List assignedPartitions = List.of(); + + + public ConsumerHealthIndicator(Clock clock) + { + this.clock = clock; + + rebalancingState = RebalancingState.STARTING; + RecordedState sat = new RecordedState(ZonedDateTime.now(clock), rebalancingState, List.of()); + history = List.of(sat); + } + + + @Override + public Health getHealth(boolean includeDetails) + { + Health.Builder healthBuilder = getHealthBuilder(); + + if (includeDetails) + { + healthBuilder.withDetail("rebalancing_state", rebalancingState); + healthBuilder.withDetail("history", history); + } + + return healthBuilder.build(); + } + + @Override + public Health health() + { + return getHealthBuilder().build(); + } + + private Health.Builder getHealthBuilder() + { + return rebalancingState == RebalancingState.RUNNING + ? new Health.Builder().up() + : new Health.Builder().status(rebalancingState.name()); + } + + + public void partitionsAssigned(Collection partitions) + { + List newAssignedPartitions = new LinkedList<>(this.assignedPartitions); + partitions.forEach(tp -> newAssignedPartitions.add(new Partition(tp.topic(), tp.partition()))); + Collections.sort(newAssignedPartitions, partitionComparator); + + updateAndRecordState(RebalancingState.RUNNING, newAssignedPartitions); + } + + public void partitionsRevoked(Collection partitions) + { + List newAssignedPartitions = new LinkedList<>(this.assignedPartitions); + partitions.forEach(tp -> newAssignedPartitions.remove(new Partition(tp.topic(), tp.partition()))); + + updateAndRecordState(RebalancingState.REBALANCING, newAssignedPartitions); + } + + public void partitionsLost(Collection partitions) + { + updateAndRecordState(RebalancingState.FENCED, List.of()); + } + + private void updateAndRecordState( + RebalancingState newRebalancingState, + List newAssignedPartitions) + { + List newHistory = new LinkedList<>(); + newHistory.add(new RecordedState( + ZonedDateTime.now(clock), + newRebalancingState, + newAssignedPartitions)); + newHistory.addAll(this.history); + if(newHistory.size() > 10) + { + newHistory.removeLast(); + } + this.rebalancingState = newRebalancingState; + this.assignedPartitions = newAssignedPartitions; + this.history = newHistory; + } + + + enum RebalancingState { STARTING, FENCED, REBALANCING, RUNNING } + + public record Partition(String topic, Integer partition) {} + public record RecordedState( + ZonedDateTime time, + RebalancingState rebalancingState, + List assignedPartitions) {} + + private final static Comparator partitionComparator = (tp1, tp2) -> + { + int result = tp1.topic().compareTo(tp2.topic()); + return result == 0 + ? tp1.partition() - tp2.partition() + : result; + }; +} diff --git a/src/main/java/de/juplo/kafka/ConsumerHealthIndicatorAwareRebalanceListener.java b/src/main/java/de/juplo/kafka/ConsumerHealthIndicatorAwareRebalanceListener.java new file mode 100644 index 00000000..3339ac9a --- /dev/null +++ b/src/main/java/de/juplo/kafka/ConsumerHealthIndicatorAwareRebalanceListener.java @@ -0,0 +1,33 @@ +package de.juplo.kafka; + +import lombok.RequiredArgsConstructor; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.common.TopicPartition; + +import java.util.Collection; + + +@RequiredArgsConstructor +public class ConsumerHealthIndicatorAwareRebalanceListener implements ConsumerRebalanceListener +{ + private final ConsumerHealthIndicator consumerHealthIndicator; + + + @Override + public void onPartitionsAssigned(Collection partitions) + { + consumerHealthIndicator.partitionsAssigned(partitions); + } + + @Override + public void onPartitionsRevoked(Collection partitions) + { + consumerHealthIndicator.partitionsRevoked(partitions); + } + + @Override + public void onPartitionsLost(Collection partitions) + { + consumerHealthIndicator.partitionsLost(partitions); + } +} diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index 7e820ea2..4f60abe7 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -2,6 +2,7 @@ package de.juplo.kafka; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.errors.RecordDeserializationException; @@ -18,6 +19,7 @@ public class ExampleConsumer implements Runnable private final String topic; private final Consumer consumer; private final RecordHandler recordHandler; + private final ConsumerRebalanceListener rebalanceListener; private final Thread workerThread; private final Runnable closeCallback; @@ -29,12 +31,14 @@ public class ExampleConsumer implements Runnable String topic, Consumer consumer, RecordHandler recordHandler, + ConsumerRebalanceListener rebalanceListener, Runnable closeCallback) { this.id = clientId; this.topic = topic; this.consumer = consumer; this.recordHandler = recordHandler; + this.rebalanceListener = rebalanceListener; workerThread = new Thread(this, "ExampleConsumer Worker-Thread"); workerThread.start(); @@ -49,7 +53,7 @@ public class ExampleConsumer implements Runnable try { log.info("{} - Subscribing to topic {}", id, topic); - consumer.subscribe(Arrays.asList(topic)); + consumer.subscribe(Arrays.asList(topic), rebalanceListener); while (true) { diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 7a06731c..09791a7a 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -8,6 +8,8 @@ juplo: auto-commit-interval: 5s management: endpoint: + health: + show-details: ALWAYS shutdown: enabled: true endpoints: diff --git a/src/test/java/de/juplo/kafka/ExampleConsumerTest.java b/src/test/java/de/juplo/kafka/ExampleConsumerTest.java index 590c9cdf..1307d618 100644 --- a/src/test/java/de/juplo/kafka/ExampleConsumerTest.java +++ b/src/test/java/de/juplo/kafka/ExampleConsumerTest.java @@ -4,6 +4,7 @@ import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.AdminClientConfig; import org.apache.kafka.clients.admin.RecordsToDelete; import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.LongSerializer; @@ -22,6 +23,7 @@ import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.test.context.EmbeddedKafka; import java.time.Duration; +import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.concurrent.ExecutionException; @@ -171,6 +173,14 @@ public class ExampleConsumerTest TOPIC, consumer, mockRecordHandler, + new ConsumerRebalanceListener() + { + @Override + public void onPartitionsRevoked(Collection collection) {} + + @Override + public void onPartitionsAssigned(Collection collection) {} + }, () -> isTerminatedExceptionally.set(true)); } -- 2.20.1