+ this.host = serverProperties.getAddress().getHostAddress();
+ this.port = serverProperties.getPort();
+ properties.put("application.server", host + ":" + port);
+
+ streams = new KafkaStreams(Deduplicator.buildTopology(), properties);
+ streams.setUncaughtExceptionHandler((Thread t, Throwable e) ->
+ {
+ LOG.error("Unexpected error in thread {}: {}", t, e.toString());
+ try
+ {
+ streams.close(Duration.ofSeconds(5));
+ }
+ catch (Exception ex)
+ {
+ LOG.error("Could not close KafkaStreams!", ex);
+ }
+ });
+ streams.setStateListener(healthIndicator);
+ }
+
+ static Topology buildTopology()
+ {