projects
/
demos
/
kafka
/
deduplication
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
Added actuators and implemented an indicator that reports the streams-state
[demos/kafka/deduplication]
/
src
/
main
/
java
/
de
/
juplo
/
demo
/
kafka
/
deduplication
/
Deduplicator.java
diff --git
a/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java
b/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java
index
5c0f554
..
04a2d82
100644
(file)
--- a/
src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java
+++ b/
src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java
@@
-33,7
+33,9
@@
public class Deduplicator
public final String host;
public final int port;
public final String host;
public final int port;
- public Deduplicator(ServerProperties serverProperties)
+ public Deduplicator(
+ ServerProperties serverProperties,
+ StreamsHealthIndicator healthIndicator)
{
Properties properties = new Properties();
properties.put("bootstrap.servers", "kafka:9092");
{
Properties properties = new Properties();
properties.put("bootstrap.servers", "kafka:9092");
@@
-58,6
+60,7
@@
public class Deduplicator
LOG.error("Could not close KafkaStreams!", ex);
}
});
LOG.error("Could not close KafkaStreams!", ex);
}
});
+ streams.setStateListener(healthIndicator);
}
static Topology buildTopology()
}
static Topology buildTopology()