X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fdemo%2Fkafka%2Fdeduplication%2FStreamsHealthIndicator.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fdemo%2Fkafka%2Fdeduplication%2FStreamsHealthIndicator.java;h=f531256962aeeb2ce4728ec9cfc4c128c4692ab1;hb=e1bd976486d1d1a087567a6c6081637d48706f44;hp=0000000000000000000000000000000000000000;hpb=0187521d27874ea7812d9116e76ef9c1a499368f;p=demos%2Fkafka%2Fdeduplication diff --git a/src/main/java/de/juplo/demo/kafka/deduplication/StreamsHealthIndicator.java b/src/main/java/de/juplo/demo/kafka/deduplication/StreamsHealthIndicator.java new file mode 100644 index 0000000..f531256 --- /dev/null +++ b/src/main/java/de/juplo/demo/kafka/deduplication/StreamsHealthIndicator.java @@ -0,0 +1,55 @@ +package de.juplo.demo.kafka.deduplication; + +import org.apache.kafka.streams.KafkaStreams; +import org.springframework.boot.actuate.health.AbstractHealthIndicator; +import org.springframework.boot.actuate.health.Health; +import org.springframework.boot.actuate.health.Status; +import org.springframework.stereotype.Component; + + +@Component +public class StreamsHealthIndicator extends AbstractHealthIndicator implements KafkaStreams.StateListener +{ + public final static Status CREATED = new Status("CREATED"); + public final static Status RUNNING = new Status("RUNNING"); + public final static Status REBALANCING = new Status("REBALANCING"); + public final static Status ERROR = new Status("ERROR"); + public final static Status PENDING_SHUTDOWN = new Status("PENDING_SHUTDOWN"); + public final static Status NOT_RUNNING = new Status("NOT_RUNNING"); + + private Status status = Status.UNKNOWN; + + @Override + protected synchronized void doHealthCheck(Health.Builder builder) throws Exception + { + builder.status(status); + } + + @Override + public synchronized void onChange(KafkaStreams.State newState, KafkaStreams.State oldState) + { + switch (newState) + { + case CREATED: + status = CREATED; + break; + case RUNNING: + status = RUNNING; + break; + case REBALANCING: + status = REBALANCING; + break; + case ERROR: + status = ERROR; + break; + case PENDING_SHUTDOWN: + status = PENDING_SHUTDOWN; + break; + case NOT_RUNNING: + status = NOT_RUNNING; + break; + default: + status = null; + } + } +}