Moved the building of the topology into a static method
[demos/kafka/deduplication] / src / main / java / de / juplo / demo / kafka / deduplication / Deduplicator.java
index af7da70..8f173f7 100644 (file)
@@ -36,6 +36,23 @@ public class Deduplicator
     properties.put("default.key.serde", Serdes.StringSerde.class);
     properties.put("default.value.serde", Serdes.StringSerde.class);
 
+    streams = new KafkaStreams(Deduplicator.buildTopology(), properties);
+    streams.setUncaughtExceptionHandler((Thread t, Throwable e) ->
+    {
+      LOG.error("Unexpected error in thread {}: {}", t, e.toString());
+      try
+      {
+        streams.close(Duration.ofSeconds(5));
+      }
+      catch (Exception ex)
+      {
+        LOG.error("Could not close KafkaStreams!", ex);
+      }
+    });
+  }
+
+  static Topology buildTopology()
+  {
     StreamsBuilder builder = new StreamsBuilder();
 
     // Create state-store for sequence numbers
@@ -61,23 +78,9 @@ public class Deduplicator
             DeduplicationTransformer.STORE)
         .to("output");
 
-    Topology topology = builder.build();
-    streams = new KafkaStreams(topology, properties);
-    streams.setUncaughtExceptionHandler((Thread t, Throwable e) ->
-    {
-      LOG.error("Unexpected error in thread {}: {}", t, e.toString());
-      try
-      {
-        streams.close(Duration.ofSeconds(5));
-      }
-      catch (Exception ex)
-      {
-        LOG.error("Could not close KafkaStreams!", ex);
-      }
-    });
+    return builder.build();
   }
 
-
   @PostConstruct
   public void start()
   {