properties.put("default.key.serde", Serdes.StringSerde.class);
properties.put("default.value.serde", Serdes.StringSerde.class);
+ streams = new KafkaStreams(Deduplicator.buildTopology(), properties);
+ streams.setUncaughtExceptionHandler((Thread t, Throwable e) ->
+ {
+ LOG.error("Unexpected error in thread {}: {}", t, e.toString());
+ try
+ {
+ streams.close(Duration.ofSeconds(5));
+ }
+ catch (Exception ex)
+ {
+ LOG.error("Could not close KafkaStreams!", ex);
+ }
+ });
+ }
+
+ static Topology buildTopology()
+ {
StreamsBuilder builder = new StreamsBuilder();
// Create state-store for sequence numbers
DeduplicationTransformer.STORE)
.to("output");
- Topology topology = builder.build();
- streams = new KafkaStreams(topology, properties);
- streams.setUncaughtExceptionHandler((Thread t, Throwable e) ->
- {
- LOG.error("Unexpected error in thread {}: {}", t, e.toString());
- try
- {
- streams.close(Duration.ofSeconds(5));
- }
- catch (Exception ex)
- {
- LOG.error("Could not close KafkaStreams!", ex);
- }
- });
+ return builder.build();
}
-
@PostConstruct
public void start()
{