X-Git-Url: https://juplo.de/gitweb/?a=blobdiff_plain;f=src%2Fmain%2Fjava%2Fde%2Fjuplo%2Fkafka%2Fwordcount%2Fcounter%2FCounterStreamProcessor.java;h=4002df2d9872978f7feddc2110cd6df07b402888;hb=b02b75a4e70c41795a58ca38cd62d455af8aea16;hp=b19502dbd5507a69f8a50fc9c2b496d271ffddc0;hpb=e1ec32bcc70e5f990ca2ddb5566f98cee7e4c2c2;p=demos%2Fkafka%2Fwordcount diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java index b19502d..4002df2 100644 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java @@ -6,6 +6,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; @@ -25,6 +26,21 @@ public class CounterStreamProcessor Properties properties, KeyValueBytesStoreSupplier storeSupplier, ObjectMapper mapper) + { + Topology topology = CounterStreamProcessor.buildTopology( + inputTopic, + outputTopic, + storeSupplier, + mapper); + + streams = new KafkaStreams(topology, properties); + } + + static Topology buildTopology( + String inputTopic, + String outputTopic, + KeyValueBytesStoreSupplier storeSupplier, + ObjectMapper mapper) { StreamsBuilder builder = new StreamsBuilder(); @@ -48,7 +64,7 @@ public class CounterStreamProcessor .toStream() .to(outputTopic); - streams = new KafkaStreams(builder.build(), properties); + return builder.build(); } public void start()