splitter: 1.2.0 - Introduced `SplitterStreamProcessor.buildTopology(..)`
authorKai Moritz <kai@juplo.de>
Tue, 4 Jun 2024 21:31:05 +0000 (23:31 +0200)
committerKai Moritz <kai@juplo.de>
Tue, 4 Jun 2024 21:31:05 +0000 (23:31 +0200)
src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java

index d0070c0..fa84665 100644 (file)
@@ -3,6 +3,7 @@ package de.juplo.kafka.wordcount.splitter;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.kstream.KStream;
 
 import java.util.Arrays;
@@ -22,6 +23,14 @@ public class SplitterStreamProcessor
                        String inputTopic,
                        String outputTopic,
                        Properties properties)
+       {
+               Topology topology = buildTopology(inputTopic, outputTopic);
+               streams = new KafkaStreams(topology, properties);
+       }
+
+       static Topology buildTopology(
+                       String inputTopic,
+                       String outputTopic)
        {
                StreamsBuilder builder = new StreamsBuilder();
 
@@ -34,7 +43,10 @@ public class SplitterStreamProcessor
                                                        .toList())
                                .to(outputTopic);
 
-               streams = new KafkaStreams(builder.build(), properties);
+               Topology topology = builder.build();
+               log.info("\n\n{}", topology.describe());
+
+               return topology;
        }
 
        public void start()