From: Kai Moritz Date: Tue, 4 Jun 2024 21:31:05 +0000 (+0200) Subject: splitter: 1.2.0 - Introduced `SplitterStreamProcessor.buildTopology(..)` X-Git-Tag: splitter-1.2.0~1 X-Git-Url: https://juplo.de/gitweb/?a=commitdiff_plain;h=82a2e30072861bc8a3c19d51ebca158a3331b5d9;p=demos%2Fkafka%2Fwordcount splitter: 1.2.0 - Introduced `SplitterStreamProcessor.buildTopology(..)` --- diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java index d0070c0..fa84665 100644 --- a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java @@ -3,6 +3,7 @@ package de.juplo.kafka.wordcount.splitter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.KStream; import java.util.Arrays; @@ -22,6 +23,14 @@ public class SplitterStreamProcessor String inputTopic, String outputTopic, Properties properties) + { + Topology topology = buildTopology(inputTopic, outputTopic); + streams = new KafkaStreams(topology, properties); + } + + static Topology buildTopology( + String inputTopic, + String outputTopic) { StreamsBuilder builder = new StreamsBuilder(); @@ -34,7 +43,10 @@ public class SplitterStreamProcessor .toList()) .to(outputTopic); - streams = new KafkaStreams(builder.build(), properties); + Topology topology = builder.build(); + log.info("\n\n{}", topology.describe()); + + return topology; } public void start()