From: Kai Moritz Date: Fri, 9 Oct 2020 16:07:39 +0000 (+0200) Subject: Moved the building of the topology into a static method X-Git-Url: https://juplo.de/gitweb/?p=demos%2Fkafka%2Fdeduplication;a=commitdiff_plain;h=76b0d9606d3d236a98fe660e092c1798c71fe464 Moved the building of the topology into a static method --- diff --git a/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java b/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java index af7da70..8f173f7 100644 --- a/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java +++ b/src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java @@ -36,6 +36,23 @@ public class Deduplicator properties.put("default.key.serde", Serdes.StringSerde.class); properties.put("default.value.serde", Serdes.StringSerde.class); + streams = new KafkaStreams(Deduplicator.buildTopology(), properties); + streams.setUncaughtExceptionHandler((Thread t, Throwable e) -> + { + LOG.error("Unexpected error in thread {}: {}", t, e.toString()); + try + { + streams.close(Duration.ofSeconds(5)); + } + catch (Exception ex) + { + LOG.error("Could not close KafkaStreams!", ex); + } + }); + } + + static Topology buildTopology() + { StreamsBuilder builder = new StreamsBuilder(); // Create state-store for sequence numbers @@ -61,23 +78,9 @@ public class Deduplicator DeduplicationTransformer.STORE) .to("output"); - Topology topology = builder.build(); - streams = new KafkaStreams(topology, properties); - streams.setUncaughtExceptionHandler((Thread t, Throwable e) -> - { - LOG.error("Unexpected error in thread {}: {}", t, e.toString()); - try - { - streams.close(Duration.ofSeconds(5)); - } - catch (Exception ex) - { - LOG.error("Could not close KafkaStreams!", ex); - } - }); + return builder.build(); } - @PostConstruct public void start() {