From: Kai Moritz Date: Fri, 9 Oct 2020 21:47:30 +0000 (+0200) Subject: Added actuators and implemented an indicator that reports the streams-state X-Git-Tag: streams-deduplicator-1.0.0~6 X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=e1bd976486d1d1a087567a6c6081637d48706f44;p=demos%2Fkafka%2Fdeduplication Added actuators and implemented an indicator that reports the streams-state * Refined README.sh: The script waits, until the health-indicator signals, that the deduplicator is up and RUNNING --- diff --git a/Dockerfile b/Dockerfile index bd8b1e3..5a7b5f9 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,3 +1,4 @@ FROM openjdk:11-jre-slim COPY target/streams-deduplicator-1.0-SNAPSHOT.jar /opt/app.jar +EXPOSE 8080 CMD ["java", "-jar", "/opt/app.jar"] diff --git a/README.sh b/README.sh index 6646d24..00383c4 100755 --- a/README.sh +++ b/README.sh @@ -7,20 +7,35 @@ then exit fi -mvn package - -docker build -t juplo/deduplicator:streams . +if [[ $(docker image ls -q juplo/deduplicator:streams) == "" ]] +then + mvn package + docker build -t juplo/deduplicator:streams . +else + echo "Using image existing image:" + docker image ls juplo/deduplicator:streams +fi docker-compose up -d zookeeper kafka -while ! [[ $(zookeeper-shell zookeeper:2181 ls /brokers/ids 2> /dev/null) =~ 1001 ]]; do echo "Waiting for kafka..."; sleep 1; done +while ! [[ $(zookeeper-shell zookeeper:2181 ls /brokers/ids 2> /dev/null) =~ 1001 ]]; +do + echo "Waiting for kafka..."; + sleep 1; +done kafka-topics --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 3 --topic input kafka-topics --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 3 --topic output docker-compose up -d deduplicator + +while ! [[ $(http :8080/actuator/health 2>/dev/null | jq -r .components.streams.status) == "RUNNING" ]]; +do + echo "Waiting for Streams-Application..."; + sleep 1; +done + cat data.txt | kafkacat -K: -b localhost:9092 -t input -sleep 5 kafkacat -C -b localhost:9092 -t input -e | wc -l kafkacat -C -b localhost:9092 -t output -e | wc -l diff --git a/pom.xml b/pom.xml index a1566e1..18736fb 100644 --- a/pom.xml +++ b/pom.xml @@ -23,6 +23,10 @@ org.springframework.boot spring-boot-starter-web + + org.springframework.boot + spring-boot-starter-actuator + org.apache.kafka kafka-streams diff --git a/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java b/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java index 5c0f554..04a2d82 100644 --- a/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java +++ b/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java @@ -33,7 +33,9 @@ public class Deduplicator public final String host; public final int port; - public Deduplicator(ServerProperties serverProperties) + public Deduplicator( + ServerProperties serverProperties, + StreamsHealthIndicator healthIndicator) { Properties properties = new Properties(); properties.put("bootstrap.servers", "kafka:9092"); @@ -58,6 +60,7 @@ public class Deduplicator LOG.error("Could not close KafkaStreams!", ex); } }); + streams.setStateListener(healthIndicator); } static Topology buildTopology() diff --git a/src/main/java/de/juplo/demo/kafka/deduplication/StreamsHealthIndicator.java b/src/main/java/de/juplo/demo/kafka/deduplication/StreamsHealthIndicator.java new file mode 100644 index 0000000..f531256 --- /dev/null +++ b/src/main/java/de/juplo/demo/kafka/deduplication/StreamsHealthIndicator.java @@ -0,0 +1,55 @@ +package de.juplo.demo.kafka.deduplication; + +import org.apache.kafka.streams.KafkaStreams; +import org.springframework.boot.actuate.health.AbstractHealthIndicator; +import org.springframework.boot.actuate.health.Health; +import org.springframework.boot.actuate.health.Status; +import org.springframework.stereotype.Component; + + +@Component +public class StreamsHealthIndicator extends AbstractHealthIndicator implements KafkaStreams.StateListener +{ + public final static Status CREATED = new Status("CREATED"); + public final static Status RUNNING = new Status("RUNNING"); + public final static Status REBALANCING = new Status("REBALANCING"); + public final static Status ERROR = new Status("ERROR"); + public final static Status PENDING_SHUTDOWN = new Status("PENDING_SHUTDOWN"); + public final static Status NOT_RUNNING = new Status("NOT_RUNNING"); + + private Status status = Status.UNKNOWN; + + @Override + protected synchronized void doHealthCheck(Health.Builder builder) throws Exception + { + builder.status(status); + } + + @Override + public synchronized void onChange(KafkaStreams.State newState, KafkaStreams.State oldState) + { + switch (newState) + { + case CREATED: + status = CREATED; + break; + case RUNNING: + status = RUNNING; + break; + case REBALANCING: + status = REBALANCING; + break; + case ERROR: + status = ERROR; + break; + case PENDING_SHUTDOWN: + status = PENDING_SHUTDOWN; + break; + case NOT_RUNNING: + status = NOT_RUNNING; + break; + default: + status = null; + } + } +} diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index 7ef7e54..985a34f 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -1,3 +1,8 @@ server: address: 127.0.0.1 port: 8080 +management: + endpoint: + health: + show-details: always + show-components: always