From 82a2e30072861bc8a3c19d51ebca158a3331b5d9 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 4 Jun 2024 23:31:05 +0200 Subject: [PATCH] splitter: 1.2.0 - Introduced `SplitterStreamProcessor.buildTopology(..)` --- .../splitter/SplitterStreamProcessor.java | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java index d0070c0..fa84665 100644 --- a/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/splitter/SplitterStreamProcessor.java @@ -3,6 +3,7 @@ package de.juplo.kafka.wordcount.splitter; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.KStream; import java.util.Arrays; @@ -22,6 +23,14 @@ public class SplitterStreamProcessor String inputTopic, String outputTopic, Properties properties) + { + Topology topology = buildTopology(inputTopic, outputTopic); + streams = new KafkaStreams(topology, properties); + } + + static Topology buildTopology( + String inputTopic, + String outputTopic) { StreamsBuilder builder = new StreamsBuilder(); @@ -34,7 +43,10 @@ public class SplitterStreamProcessor .toList()) .to(outputTopic); - streams = new KafkaStreams(builder.build(), properties); + Topology topology = builder.build(); + log.info("\n\n{}", topology.describe()); + + return topology; } public void start() -- 2.20.1