FROM openjdk:11-jre-slim
COPY target/streams-deduplicator-1.0-SNAPSHOT.jar /opt/app.jar
+EXPOSE 8080
CMD ["java", "-jar", "/opt/app.jar"]
exit
fi
-mvn package
-
-docker build -t juplo/deduplicator:streams .
+if [[ $(docker image ls -q juplo/deduplicator:streams) == "" ]]
+then
+ mvn package
+ docker build -t juplo/deduplicator:streams .
+else
+ echo "Using image existing image:"
+ docker image ls juplo/deduplicator:streams
+fi
docker-compose up -d zookeeper kafka
-while ! [[ $(zookeeper-shell zookeeper:2181 ls /brokers/ids 2> /dev/null) =~ 1001 ]]; do echo "Waiting for kafka..."; sleep 1; done
+while ! [[ $(zookeeper-shell zookeeper:2181 ls /brokers/ids 2> /dev/null) =~ 1001 ]];
+do
+ echo "Waiting for kafka...";
+ sleep 1;
+done
kafka-topics --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 3 --topic input
kafka-topics --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 3 --topic output
docker-compose up -d deduplicator
+
+while ! [[ $(http :8080/actuator/health 2>/dev/null | jq -r .components.streams.status) == "RUNNING" ]];
+do
+ echo "Waiting for Streams-Application...";
+ sleep 1;
+done
+
cat data.txt | kafkacat -K: -b localhost:9092 -t input
-sleep 5
kafkacat -C -b localhost:9092 -t input -e | wc -l
kafkacat -C -b localhost:9092 -t output -e | wc -l
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-actuator</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
public final String host;
public final int port;
- public Deduplicator(ServerProperties serverProperties)
+ public Deduplicator(
+ ServerProperties serverProperties,
+ StreamsHealthIndicator healthIndicator)
{
Properties properties = new Properties();
properties.put("bootstrap.servers", "kafka:9092");
LOG.error("Could not close KafkaStreams!", ex);
}
});
+ streams.setStateListener(healthIndicator);
}
static Topology buildTopology()
--- /dev/null
+package de.juplo.demo.kafka.deduplication;
+
+import org.apache.kafka.streams.KafkaStreams;
+import org.springframework.boot.actuate.health.AbstractHealthIndicator;
+import org.springframework.boot.actuate.health.Health;
+import org.springframework.boot.actuate.health.Status;
+import org.springframework.stereotype.Component;
+
+
+@Component
+public class StreamsHealthIndicator extends AbstractHealthIndicator implements KafkaStreams.StateListener
+{
+ public final static Status CREATED = new Status("CREATED");
+ public final static Status RUNNING = new Status("RUNNING");
+ public final static Status REBALANCING = new Status("REBALANCING");
+ public final static Status ERROR = new Status("ERROR");
+ public final static Status PENDING_SHUTDOWN = new Status("PENDING_SHUTDOWN");
+ public final static Status NOT_RUNNING = new Status("NOT_RUNNING");
+
+ private Status status = Status.UNKNOWN;
+
+ @Override
+ protected synchronized void doHealthCheck(Health.Builder builder) throws Exception
+ {
+ builder.status(status);
+ }
+
+ @Override
+ public synchronized void onChange(KafkaStreams.State newState, KafkaStreams.State oldState)
+ {
+ switch (newState)
+ {
+ case CREATED:
+ status = CREATED;
+ break;
+ case RUNNING:
+ status = RUNNING;
+ break;
+ case REBALANCING:
+ status = REBALANCING;
+ break;
+ case ERROR:
+ status = ERROR;
+ break;
+ case PENDING_SHUTDOWN:
+ status = PENDING_SHUTDOWN;
+ break;
+ case NOT_RUNNING:
+ status = NOT_RUNNING;
+ break;
+ default:
+ status = null;
+ }
+ }
+}
server:
address: 127.0.0.1
port: 8080
+management:
+ endpoint:
+ health:
+ show-details: always
+ show-components: always