Added actuators and implemented an indicator that reports the streams-state
[demos/kafka/deduplication] / src / main / java / de / juplo / demo / kafka / deduplication / Deduplicator.java
index 5c0f554..04a2d82 100644 (file)
@@ -33,7 +33,9 @@ public class Deduplicator
   public final String host;
   public final int port;
 
-  public Deduplicator(ServerProperties serverProperties)
+  public Deduplicator(
+      ServerProperties serverProperties,
+      StreamsHealthIndicator healthIndicator)
   {
     Properties properties = new Properties();
     properties.put("bootstrap.servers", "kafka:9092");
@@ -58,6 +60,7 @@ public class Deduplicator
         LOG.error("Could not close KafkaStreams!", ex);
       }
     });
+    streams.setStateListener(healthIndicator);
   }
 
   static Topology buildTopology()