X-Git-Url: https://juplo.de/gitweb/?p=demos%2Fkafka%2Fdeduplication;a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fdemo%2Fkafka%2Fdeduplication%2FStreamsHealthIndicator.java;fp=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fdemo%2Fkafka%2Fdeduplication%2FStreamsHealthIndicator.java;h=f531256962aeeb2ce4728ec9cfc4c128c4692ab1;hp=0000000000000000000000000000000000000000;hb=29eac6d5ee3ab76af19edef6d8863c6894313169;hpb=85905a556b97077ef99332928eb1401864af24e5 diff --git a/src/main/java/de/juplo/demo/kafka/deduplication/StreamsHealthIndicator.java b/src/main/java/de/juplo/demo/kafka/deduplication/StreamsHealthIndicator.java new file mode 100644 index 0000000..f531256 --- /dev/null +++ b/src/main/java/de/juplo/demo/kafka/deduplication/StreamsHealthIndicator.java @@ -0,0 +1,55 @@ +package de.juplo.demo.kafka.deduplication; + +import org.apache.kafka.streams.KafkaStreams; +import org.springframework.boot.actuate.health.AbstractHealthIndicator; +import org.springframework.boot.actuate.health.Health; +import org.springframework.boot.actuate.health.Status; +import org.springframework.stereotype.Component; + + +@Component +public class StreamsHealthIndicator extends AbstractHealthIndicator implements KafkaStreams.StateListener +{ + public final static Status CREATED = new Status("CREATED"); + public final static Status RUNNING = new Status("RUNNING"); + public final static Status REBALANCING = new Status("REBALANCING"); + public final static Status ERROR = new Status("ERROR"); + public final static Status PENDING_SHUTDOWN = new Status("PENDING_SHUTDOWN"); + public final static Status NOT_RUNNING = new Status("NOT_RUNNING"); + + private Status status = Status.UNKNOWN; + + @Override + protected synchronized void doHealthCheck(Health.Builder builder) throws Exception + { + builder.status(status); + } + + @Override + public synchronized void onChange(KafkaStreams.State newState, KafkaStreams.State oldState) + { + switch (newState) + { + case CREATED: + status = CREATED; + break; + case RUNNING: + status = RUNNING; + break; + case REBALANCING: + status = REBALANCING; + break; + case ERROR: + status = ERROR; + break; + case PENDING_SHUTDOWN: + status = PENDING_SHUTDOWN; + break; + case NOT_RUNNING: + status = NOT_RUNNING; + break; + default: + status = null; + } + } +}