feat: Implemented and configured health-indicator for the ``Channel``s
authorKai Moritz <kai@juplo.de>
Tue, 5 Mar 2024 08:46:55 +0000 (09:46 +0100)
committerKai Moritz <kai@juplo.de>
Thu, 14 Mar 2024 08:11:21 +0000 (09:11 +0100)
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelReactiveHealthIndicator.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/KafkaServicesConfiguration.java

diff --git a/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelReactiveHealthIndicator.java b/src/main/java/de/juplo/kafka/chat/backend/implementation/kafka/ChannelReactiveHealthIndicator.java
new file mode 100644 (file)
index 0000000..dcb6158
--- /dev/null
@@ -0,0 +1,29 @@
+package de.juplo.kafka.chat.backend.implementation.kafka;
+
+import lombok.RequiredArgsConstructor;
+import org.springframework.boot.actuate.health.AbstractReactiveHealthIndicator;
+import org.springframework.boot.actuate.health.Health;
+import reactor.core.publisher.Mono;
+
+
+@RequiredArgsConstructor
+public class ChannelReactiveHealthIndicator extends AbstractReactiveHealthIndicator
+{
+  private final Channel channel;
+
+
+  @Override
+  protected Mono<Health> doHealthCheck(Health.Builder builder)
+  {
+    return Mono
+        .fromSupplier(() -> channel.getChannelState())
+        .map(state  -> switch(state)
+            {
+              case STARTING -> builder.outOfService();
+              case LOAD_IN_PROGRESS -> builder.outOfService();
+              case READY -> builder.up();
+              case SHUTTING_DOWN -> builder.down();
+            })
+        .map(healthBuilder -> healthBuilder.build());
+  }
+}
index c7cf113..525c427 100644 (file)
@@ -314,4 +314,17 @@ public class KafkaServicesConfiguration
   {
     return ZoneId.systemDefault();
   }
+
+  @Bean
+  ChannelReactiveHealthIndicator dataChannelHealthIndicator(
+      DataChannel dataChannel)
+  {
+    return new ChannelReactiveHealthIndicator(dataChannel);
+  }
+
+  @Bean
+  ChannelReactiveHealthIndicator infoChannelHealthIndicator(InfoChannel infoChannel)
+  {
+    return new ChannelReactiveHealthIndicator(infoChannel);
+  }
 }