Added actuators and implemented an indicator that reports the streams-state
authorKai Moritz <kai@juplo.de>
Fri, 9 Oct 2020 21:47:30 +0000 (23:47 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 5 May 2024 05:49:39 +0000 (07:49 +0200)
* Refined README.sh: The script waits, until the health-indicator signals,
  that the deduplicator is up and RUNNING

Dockerfile
README.sh
pom.xml
src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java
src/main/java/de/juplo/demo/kafka/deduplication/StreamsHealthIndicator.java [new file with mode: 0644]
src/main/resources/application.yml

index bd8b1e3..5a7b5f9 100644 (file)
@@ -1,3 +1,4 @@
 FROM openjdk:11-jre-slim
 COPY target/streams-deduplicator-1.0-SNAPSHOT.jar /opt/app.jar
+EXPOSE 8080
 CMD ["java", "-jar", "/opt/app.jar"]
index 6646d24..00383c4 100755 (executable)
--- a/README.sh
+++ b/README.sh
@@ -7,20 +7,35 @@ then
   exit
 fi
 
-mvn package
-
-docker build -t juplo/deduplicator:streams .
+if [[ $(docker image ls -q juplo/deduplicator:streams) == "" ]]
+then
+  mvn package
+  docker build -t juplo/deduplicator:streams .
+else
+  echo "Using image existing image:"
+  docker image ls juplo/deduplicator:streams
+fi
 
 docker-compose up -d zookeeper kafka
 
-while ! [[ $(zookeeper-shell zookeeper:2181 ls /brokers/ids 2> /dev/null) =~ 1001 ]]; do echo "Waiting for kafka..."; sleep 1; done
+while ! [[ $(zookeeper-shell zookeeper:2181 ls /brokers/ids 2> /dev/null) =~ 1001 ]];
+do
+  echo "Waiting for kafka...";
+  sleep 1;
+done
 
 kafka-topics --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 3 --topic input
 kafka-topics --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 3 --topic output
 
 docker-compose up -d deduplicator
+
+while ! [[ $(http :8080/actuator/health 2>/dev/null | jq -r .components.streams.status) == "RUNNING" ]];
+do
+  echo "Waiting for Streams-Application...";
+  sleep 1;
+done
+
 cat data.txt | kafkacat -K: -b localhost:9092 -t input
-sleep 5
 
 kafkacat -C -b localhost:9092 -t input -e | wc -l
 kafkacat -C -b localhost:9092 -t output -e | wc -l
diff --git a/pom.xml b/pom.xml
index a1566e1..18736fb 100644 (file)
--- a/pom.xml
+++ b/pom.xml
       <groupId>org.springframework.boot</groupId>
       <artifactId>spring-boot-starter-web</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot-starter-actuator</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka-streams</artifactId>
index 5c0f554..04a2d82 100644 (file)
@@ -33,7 +33,9 @@ public class Deduplicator
   public final String host;
   public final int port;
 
-  public Deduplicator(ServerProperties serverProperties)
+  public Deduplicator(
+      ServerProperties serverProperties,
+      StreamsHealthIndicator healthIndicator)
   {
     Properties properties = new Properties();
     properties.put("bootstrap.servers", "kafka:9092");
@@ -58,6 +60,7 @@ public class Deduplicator
         LOG.error("Could not close KafkaStreams!", ex);
       }
     });
+    streams.setStateListener(healthIndicator);
   }
 
   static Topology buildTopology()
diff --git a/src/main/java/de/juplo/demo/kafka/deduplication/StreamsHealthIndicator.java b/src/main/java/de/juplo/demo/kafka/deduplication/StreamsHealthIndicator.java
new file mode 100644 (file)
index 0000000..f531256
--- /dev/null
@@ -0,0 +1,55 @@
+package de.juplo.demo.kafka.deduplication;
+
+import org.apache.kafka.streams.KafkaStreams;
+import org.springframework.boot.actuate.health.AbstractHealthIndicator;
+import org.springframework.boot.actuate.health.Health;
+import org.springframework.boot.actuate.health.Status;
+import org.springframework.stereotype.Component;
+
+
+@Component
+public class StreamsHealthIndicator extends AbstractHealthIndicator implements KafkaStreams.StateListener
+{
+  public final static Status CREATED = new Status("CREATED");
+  public final static Status RUNNING = new Status("RUNNING");
+  public final static Status REBALANCING = new Status("REBALANCING");
+  public final static Status ERROR = new Status("ERROR");
+  public final static Status PENDING_SHUTDOWN = new Status("PENDING_SHUTDOWN");
+  public final static Status NOT_RUNNING = new Status("NOT_RUNNING");
+
+  private Status status = Status.UNKNOWN;
+
+  @Override
+  protected synchronized void doHealthCheck(Health.Builder builder) throws Exception
+  {
+    builder.status(status);
+  }
+
+  @Override
+  public synchronized void onChange(KafkaStreams.State newState, KafkaStreams.State oldState)
+  {
+    switch (newState)
+    {
+      case CREATED:
+        status = CREATED;
+        break;
+      case RUNNING:
+        status = RUNNING;
+        break;
+      case REBALANCING:
+        status = REBALANCING;
+        break;
+      case ERROR:
+        status = ERROR;
+        break;
+      case PENDING_SHUTDOWN:
+        status = PENDING_SHUTDOWN;
+        break;
+      case NOT_RUNNING:
+        status = NOT_RUNNING;
+        break;
+      default:
+        status = null;
+    }
+  }
+}
index 7ef7e54..985a34f 100644 (file)
@@ -1,3 +1,8 @@
 server:
   address: 127.0.0.1
   port: 8080
+management:
+  endpoint:
+    health:
+      show-details: always
+      show-components: always