- SequenceNumberExtractor<String, String> extractor =
- new SequenceNumberExtractor<String, String>() {
- @Override
- public long extract(
- String topic, int partition, long offset, Headers headers, String key, String value)
- {
- return Long.parseLong(value);
- }
- };
+ this.host = serverProperties.getAddress().getHostAddress();
+ this.port = serverProperties.getPort();
+ properties.put("application.server", host + ":" + port);
+
+ streams = new KafkaStreams(Deduplicator.buildTopology(), properties);
+ streams.setUncaughtExceptionHandler((Thread t, Throwable e) ->
+ {
+ LOG.error("Unexpected error in thread {}: {}", t, e.toString());
+ try
+ {
+ streams.close(Duration.ofSeconds(5));
+ }
+ catch (Exception ex)
+ {
+ LOG.error("Could not close KafkaStreams!", ex);
+ }
+ });
+ streams.setStateListener(healthIndicator);
+ }