X-Git-Url: https://juplo.de/gitweb/?p=demos%2Fkafka%2Fdeduplication;a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fdemo%2Fkafka%2Fdeduplication%2FDeduplicator.java;h=04a2d824feaaef6c11b7594304a3e2dd81df92cb;hp=5c0f554a228cc5aac7d5e8e09a312dbf9b1c2c7b;hb=29eac6d5ee3ab76af19edef6d8863c6894313169;hpb=85905a556b97077ef99332928eb1401864af24e5 diff --git a/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java b/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java index 5c0f554..04a2d82 100644 --- a/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java +++ b/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java @@ -33,7 +33,9 @@ public class Deduplicator public final String host; public final int port; - public Deduplicator(ServerProperties serverProperties) + public Deduplicator( + ServerProperties serverProperties, + StreamsHealthIndicator healthIndicator) { Properties properties = new Properties(); properties.put("bootstrap.servers", "kafka:9092"); @@ -58,6 +60,7 @@ public class Deduplicator LOG.error("Could not close KafkaStreams!", ex); } }); + streams.setStateListener(healthIndicator); } static Topology buildTopology()