Moved the building of the topology into a static method
authorKai Moritz <kai@juplo.de>
Fri, 9 Oct 2020 16:07:39 +0000 (18:07 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 10 Oct 2020 19:33:54 +0000 (21:33 +0200)
src/main/java/de/juplo/demo/kafka/deduplication/Deduplicator.java

index af7da70..8f173f7 100644 (file)
@@ -36,6 +36,23 @@ public class Deduplicator
     properties.put("default.key.serde", Serdes.StringSerde.class);
     properties.put("default.value.serde", Serdes.StringSerde.class);
 
+    streams = new KafkaStreams(Deduplicator.buildTopology(), properties);
+    streams.setUncaughtExceptionHandler((Thread t, Throwable e) ->
+    {
+      LOG.error("Unexpected error in thread {}: {}", t, e.toString());
+      try
+      {
+        streams.close(Duration.ofSeconds(5));
+      }
+      catch (Exception ex)
+      {
+        LOG.error("Could not close KafkaStreams!", ex);
+      }
+    });
+  }
+
+  static Topology buildTopology()
+  {
     StreamsBuilder builder = new StreamsBuilder();
 
     // Create state-store for sequence numbers
@@ -61,23 +78,9 @@ public class Deduplicator
             DeduplicationTransformer.STORE)
         .to("output");
 
-    Topology topology = builder.build();
-    streams = new KafkaStreams(topology, properties);
-    streams.setUncaughtExceptionHandler((Thread t, Throwable e) ->
-    {
-      LOG.error("Unexpected error in thread {}: {}", t, e.toString());
-      try
-      {
-        streams.close(Duration.ofSeconds(5));
-      }
-      catch (Exception ex)
-      {
-        LOG.error("Could not close KafkaStreams!", ex);
-      }
-    });
+    return builder.build();
   }
 
-
   @PostConstruct
   public void start()
   {