1 package de.juplo.demo.kafka.deduplication;
3 import org.apache.kafka.streams.KafkaStreams;
4 import org.springframework.boot.actuate.health.AbstractHealthIndicator;
5 import org.springframework.boot.actuate.health.Health;
6 import org.springframework.boot.actuate.health.Status;
7 import org.springframework.stereotype.Component;
11 public class StreamsHealthIndicator extends AbstractHealthIndicator implements KafkaStreams.StateListener
13 public final static Status CREATED = new Status("CREATED");
14 public final static Status RUNNING = new Status("RUNNING");
15 public final static Status REBALANCING = new Status("REBALANCING");
16 public final static Status ERROR = new Status("ERROR");
17 public final static Status PENDING_SHUTDOWN = new Status("PENDING_SHUTDOWN");
18 public final static Status NOT_RUNNING = new Status("NOT_RUNNING");
20 private Status status = Status.UNKNOWN;
23 protected synchronized void doHealthCheck(Health.Builder builder) throws Exception
25 builder.status(status);
29 public synchronized void onChange(KafkaStreams.State newState, KafkaStreams.State oldState)
45 case PENDING_SHUTDOWN:
46 status = PENDING_SHUTDOWN;