Added actuators and implemented an indicator that reports the streams-state
[demos/kafka/deduplication] / src / main / java / de / juplo / demo / kafka / deduplication / StreamsHealthIndicator.java
1 package de.juplo.demo.kafka.deduplication;
2
3 import org.apache.kafka.streams.KafkaStreams;
4 import org.springframework.boot.actuate.health.AbstractHealthIndicator;
5 import org.springframework.boot.actuate.health.Health;
6 import org.springframework.boot.actuate.health.Status;
7 import org.springframework.stereotype.Component;
8
9
10 @Component
11 public class StreamsHealthIndicator extends AbstractHealthIndicator implements KafkaStreams.StateListener
12 {
13   public final static Status CREATED = new Status("CREATED");
14   public final static Status RUNNING = new Status("RUNNING");
15   public final static Status REBALANCING = new Status("REBALANCING");
16   public final static Status ERROR = new Status("ERROR");
17   public final static Status PENDING_SHUTDOWN = new Status("PENDING_SHUTDOWN");
18   public final static Status NOT_RUNNING = new Status("NOT_RUNNING");
19
20   private Status status = Status.UNKNOWN;
21
22   @Override
23   protected synchronized void doHealthCheck(Health.Builder builder) throws Exception
24   {
25     builder.status(status);
26   }
27
28   @Override
29   public synchronized void onChange(KafkaStreams.State newState, KafkaStreams.State oldState)
30   {
31     switch (newState)
32     {
33       case CREATED:
34         status = CREATED;
35         break;
36       case RUNNING:
37         status = RUNNING;
38         break;
39       case REBALANCING:
40         status = REBALANCING;
41         break;
42       case ERROR:
43         status = ERROR;
44         break;
45       case PENDING_SHUTDOWN:
46         status = PENDING_SHUTDOWN;
47         break;
48       case NOT_RUNNING:
49         status = NOT_RUNNING;
50         break;
51       default:
52         status = null;
53     }
54   }
55 }