From 4e69c909e7551eaf8468d71d932de266b60426a8 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Mon, 18 Nov 2024 07:38:42 +0100 Subject: [PATCH] `HealthIndicator` mit Details zu Rebalances des Consumers implementiert MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * `ConsumerHealthIndicator` refaktorisiert (DRY) * `ConsumerHealthIndicator` refaktorisiert (Aufgaben klarer getrennt) * `ConsumerHealthIndicator` refaktorisiert (Klareres Benennungs-Schema) * `ConsumerHealthIndicator` refaktorisiert (Methoden aufgeräumt) --- README.sh | 2 +- build.gradle | 2 +- docker/docker-compose.yml | 6 +- pom.xml | 2 +- .../juplo/kafka/ApplicationConfiguration.java | 22 ++++ .../juplo/kafka/ConsumerHealthIndicator.java | 116 ++++++++++++++++++ ...HealthIndicatorAwareRebalanceListener.java | 33 +++++ .../java/de/juplo/kafka/ExampleConsumer.java | 6 +- src/main/resources/application.yml | 2 + 9 files changed, 184 insertions(+), 7 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/ConsumerHealthIndicator.java create mode 100644 src/main/java/de/juplo/kafka/ConsumerHealthIndicatorAwareRebalanceListener.java diff --git a/README.sh b/README.sh index 203c22bd..90e25d5d 100755 --- a/README.sh +++ b/README.sh @@ -1,6 +1,6 @@ #!/bin/bash -IMAGE=juplo/spring-consumer:1.1-record-handler-SNAPSHOT +IMAGE=juplo/spring-consumer:1.1-health-indicator-SNAPSHOT if [ "$1" = "cleanup" ] then diff --git a/build.gradle b/build.gradle index f700918b..12642ad3 100644 --- a/build.gradle +++ b/build.gradle @@ -8,7 +8,7 @@ plugins { } group = 'de.juplo.kafka' -version = '1.1-record-handler-SNAPSHOT' +version = '1.1-health-indicator-SNAPSHOT' java { toolchain { diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index bf306f68..adf2acdd 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -140,7 +140,7 @@ services: command: kafka:9092 test producer consumer: - image: juplo/spring-consumer:1.1-record-handler-SNAPSHOT + image: juplo/spring-consumer:1.1-health-indicator-SNAPSHOT environment: spring.kafka.bootstrap-servers: kafka:9092 spring.kafka.client-id: consumer @@ -149,7 +149,7 @@ services: juplo.consumer.topic: test peter: - image: juplo/spring-consumer:1.1-record-handler-SNAPSHOT + image: juplo/spring-consumer:1.1-health-indicator-SNAPSHOT environment: spring.kafka.bootstrap-servers: kafka:9092 spring.kafka.client-id: consumer @@ -158,7 +158,7 @@ services: juplo.consumer.topic: test ute: - image: juplo/spring-consumer:1.1-record-handler-SNAPSHOT + image: juplo/spring-consumer:1.1-health-indicator-SNAPSHOT environment: spring.kafka.bootstrap-servers: kafka:9092 spring.kafka.client-id: consumer diff --git a/pom.xml b/pom.xml index 7590d25a..074909ec 100644 --- a/pom.xml +++ b/pom.xml @@ -15,7 +15,7 @@ spring-consumer Spring Consumer Super Simple Consumer-Group, that is implemented as Spring-Boot application and configured by Spring Kafka - 1.1-record-handler-SNAPSHOT + 1.1-health-indicator-SNAPSHOT 21 diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 87331b34..cf7fefc4 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -9,6 +9,8 @@ import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.ConsumerFactory; +import java.time.Clock; + @Configuration @EnableConfigurationProperties(ApplicationProperties.class) @@ -19,6 +21,7 @@ public class ApplicationConfiguration public ExampleConsumer exampleConsumer( Consumer kafkaConsumer, RecordHandler recordHandler, + ConsumerHealthIndicatorAwareRebalanceListener consumerHealthIndicatorAwareRebalanceListener, ApplicationProperties properties, KafkaProperties kafkaProperties, ConfigurableApplicationContext applicationContext) @@ -29,6 +32,7 @@ public class ApplicationConfiguration properties.getConsumerProperties().getTopic(), kafkaConsumer, recordHandler, + consumerHealthIndicatorAwareRebalanceListener, () -> applicationContext.close()); } @@ -38,6 +42,24 @@ public class ApplicationConfiguration return (topic, partition, offset, key, value) -> log.info("No-Ops Handler called for {}={}", key, value); } + @Bean + public ConsumerHealthIndicatorAwareRebalanceListener rebalanceListener(ConsumerHealthIndicator consumerHealthIndicator) + { + return new ConsumerHealthIndicatorAwareRebalanceListener(consumerHealthIndicator); + } + + @Bean + public ConsumerHealthIndicator consumerHealthIndicator(Clock clock) + { + return new ConsumerHealthIndicator(clock); + } + + @Bean + public Clock clock() + { + return Clock.systemDefaultZone(); + } + @Bean(destroyMethod = "") public Consumer kafkaConsumer(ConsumerFactory consumerFactory) { diff --git a/src/main/java/de/juplo/kafka/ConsumerHealthIndicator.java b/src/main/java/de/juplo/kafka/ConsumerHealthIndicator.java new file mode 100644 index 00000000..30fb7322 --- /dev/null +++ b/src/main/java/de/juplo/kafka/ConsumerHealthIndicator.java @@ -0,0 +1,116 @@ +package de.juplo.kafka; + +import org.apache.kafka.common.TopicPartition; +import org.springframework.boot.actuate.health.Health; +import org.springframework.boot.actuate.health.HealthIndicator; + +import java.time.Clock; +import java.time.ZonedDateTime; +import java.util.*; + + +public class ConsumerHealthIndicator implements HealthIndicator +{ + private final Clock clock; + + private volatile RebalancingState rebalancingState; + private volatile List history; + private volatile List assignedPartitions = List.of(); + + + public ConsumerHealthIndicator(Clock clock) + { + this.clock = clock; + + rebalancingState = RebalancingState.STARTING; + RecordedState sat = new RecordedState(ZonedDateTime.now(clock), rebalancingState, List.of()); + history = List.of(sat); + } + + + @Override + public Health getHealth(boolean includeDetails) + { + Health.Builder healthBuilder = getHealthBuilder(); + + if (includeDetails) + { + healthBuilder.withDetail("rebalancing_state", rebalancingState); + healthBuilder.withDetail("history", history); + } + + return healthBuilder.build(); + } + + @Override + public Health health() + { + return getHealthBuilder().build(); + } + + private Health.Builder getHealthBuilder() + { + return rebalancingState == RebalancingState.RUNNING + ? new Health.Builder().up() + : new Health.Builder().status(rebalancingState.name()); + } + + + public void partitionsAssigned(Collection partitions) + { + List newAssignedPartitions = new LinkedList<>(this.assignedPartitions); + partitions.forEach(tp -> newAssignedPartitions.add(new Partition(tp.topic(), tp.partition()))); + Collections.sort(newAssignedPartitions, partitionComparator); + + updateAndRecordState(RebalancingState.RUNNING, newAssignedPartitions); + } + + public void partitionsRevoked(Collection partitions) + { + List newAssignedPartitions = new LinkedList<>(this.assignedPartitions); + partitions.forEach(tp -> newAssignedPartitions.remove(new Partition(tp.topic(), tp.partition()))); + + updateAndRecordState(RebalancingState.REBALANCING, newAssignedPartitions); + } + + public void partitionsLost(Collection partitions) + { + updateAndRecordState(RebalancingState.FENCED, List.of()); + } + + private void updateAndRecordState( + RebalancingState newRebalancingState, + List newAssignedPartitions) + { + List newHistory = new LinkedList<>(); + newHistory.add(new RecordedState( + ZonedDateTime.now(clock), + newRebalancingState, + newAssignedPartitions)); + newHistory.addAll(this.history); + if(newHistory.size() > 10) + { + newHistory.removeLast(); + } + this.rebalancingState = newRebalancingState; + this.assignedPartitions = newAssignedPartitions; + this.history = newHistory; + } + + + enum RebalancingState { STARTING, FENCED, REBALANCING, RUNNING } + + public record Partition(String topic, Integer partition) {} + public record RecordedState( + ZonedDateTime time, + RebalancingState rebalancingState, + List assignedPartitions) {} + + private final static Comparator partitionComparator = (tp1, tp2) -> + { + int result = tp1.topic().compareTo(tp2.topic()); + return result == 0 + ? tp1.partition() - tp2.partition() + : result; + }; +} diff --git a/src/main/java/de/juplo/kafka/ConsumerHealthIndicatorAwareRebalanceListener.java b/src/main/java/de/juplo/kafka/ConsumerHealthIndicatorAwareRebalanceListener.java new file mode 100644 index 00000000..3339ac9a --- /dev/null +++ b/src/main/java/de/juplo/kafka/ConsumerHealthIndicatorAwareRebalanceListener.java @@ -0,0 +1,33 @@ +package de.juplo.kafka; + +import lombok.RequiredArgsConstructor; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.common.TopicPartition; + +import java.util.Collection; + + +@RequiredArgsConstructor +public class ConsumerHealthIndicatorAwareRebalanceListener implements ConsumerRebalanceListener +{ + private final ConsumerHealthIndicator consumerHealthIndicator; + + + @Override + public void onPartitionsAssigned(Collection partitions) + { + consumerHealthIndicator.partitionsAssigned(partitions); + } + + @Override + public void onPartitionsRevoked(Collection partitions) + { + consumerHealthIndicator.partitionsRevoked(partitions); + } + + @Override + public void onPartitionsLost(Collection partitions) + { + consumerHealthIndicator.partitionsLost(partitions); + } +} diff --git a/src/main/java/de/juplo/kafka/ExampleConsumer.java b/src/main/java/de/juplo/kafka/ExampleConsumer.java index e5a8b3d5..bd5cac74 100644 --- a/src/main/java/de/juplo/kafka/ExampleConsumer.java +++ b/src/main/java/de/juplo/kafka/ExampleConsumer.java @@ -2,6 +2,7 @@ package de.juplo.kafka; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.common.errors.WakeupException; @@ -17,6 +18,7 @@ public class ExampleConsumer implements Runnable private final String topic; private final Consumer consumer; private final RecordHandler recordHandler; + private final ConsumerRebalanceListener rebalanceListener; private final Thread workerThread; private final Runnable closeCallback; @@ -28,12 +30,14 @@ public class ExampleConsumer implements Runnable String topic, Consumer consumer, RecordHandler recordHandler, + ConsumerRebalanceListener rebalanceListener, Runnable closeCallback) { this.id = clientId; this.topic = topic; this.consumer = consumer; this.recordHandler = recordHandler; + this.rebalanceListener = rebalanceListener; workerThread = new Thread(this, "ExampleConsumer Worker-Thread"); workerThread.start(); @@ -48,7 +52,7 @@ public class ExampleConsumer implements Runnable try { log.info("{} - Subscribing to topic {}", id, topic); - consumer.subscribe(Arrays.asList(topic)); + consumer.subscribe(Arrays.asList(topic), rebalanceListener); while (true) { diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 71dddda3..8b37911b 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -3,6 +3,8 @@ juplo: topic: test management: endpoint: + health: + show-details: ALWAYS shutdown: enabled: true endpoints: -- 2.20.1