import lombok.RequiredArgsConstructor;
import org.springframework.boot.actuate.health.Health;
import org.springframework.boot.actuate.health.HealthIndicator;
-import org.springframework.stereotype.Component;
+import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
-@Component
@RequiredArgsConstructor
public class ApplicationHealthIndicator implements HealthIndicator
{
- private final EndlessConsumer<String, String> consumer;
+ private final String id;
+ private final KafkaListenerEndpointRegistry registry;
@Override
public Health health()
{
- try
- {
- return consumer
- .exitStatus()
- .map(Health::down)
- .orElse(Health.outOfService())
- .build();
- }
- catch (IllegalStateException e)
- {
- return Health.up().build();
- }
+ return registry.getListenerContainer(id).isRunning()
+ ? Health.up().build()
+ : Health.down().build();
}
}