counter: 1.1.11 - Added a test, that is based on `TopologyTestDriver`
[demos/kafka/wordcount] / src / main / java / de / juplo / kafka / wordcount / counter / CounterStreamProcessor.java
index b19502d..4002df2 100644 (file)
@@ -6,6 +6,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.Topology;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
@@ -25,6 +26,21 @@ public class CounterStreamProcessor
                        Properties properties,
                        KeyValueBytesStoreSupplier storeSupplier,
                        ObjectMapper mapper)
+       {
+               Topology topology = CounterStreamProcessor.buildTopology(
+                               inputTopic,
+                               outputTopic,
+                               storeSupplier,
+                               mapper);
+
+               streams = new KafkaStreams(topology, properties);
+       }
+
+       static Topology buildTopology(
+                       String inputTopic,
+                       String outputTopic,
+                       KeyValueBytesStoreSupplier storeSupplier,
+                       ObjectMapper mapper)
        {
                StreamsBuilder builder = new StreamsBuilder();
 
@@ -48,7 +64,7 @@ public class CounterStreamProcessor
                                .toStream()
                                .to(outputTopic);
 
-               streams = new KafkaStreams(builder.build(), properties);
+               return builder.build();
        }
 
        public void start()