X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fsplitter%2FSplitterStreamProcessor.java;h=fa8466578a165da86df378f6859ecf557d66567e;hb=82a2e30072861bc8a3c19d51ebca158a3331b5d9;hp=d0070c0551f563b72f091672a703298036757083;hpb=d25dc9ec4152f4d2de60abbd387c9c57e935135b;p=demos%2Fkafka%2Fwordcount diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java index d0070c0..fa84665 100644 --- a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java @@ -3,6 +3,7 @@ package de.juplo.kafka.wordcount.splitter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.KStream; import java.util.Arrays; @@ -22,6 +23,14 @@ public class SplitterStreamProcessor String inputTopic, String outputTopic, Properties properties) + { + Topology topology = buildTopology(inputTopic, outputTopic); + streams = new KafkaStreams(topology, properties); + } + + static Topology buildTopology( + String inputTopic, + String outputTopic) { StreamsBuilder builder = new StreamsBuilder(); @@ -34,7 +43,10 @@ public class SplitterStreamProcessor .toList()) .to(outputTopic); - streams = new KafkaStreams(builder.build(), properties); + Topology topology = builder.build(); + log.info("\n\n{}", topology.describe()); + + return topology; } public void start()