import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
Properties properties,
KeyValueBytesStoreSupplier storeSupplier,
ObjectMapper mapper)
+ {
+ Topology topology = CounterStreamProcessor.buildTopology(
+ inputTopic,
+ outputTopic,
+ storeSupplier,
+ mapper);
+
+ streams = new KafkaStreams(topology, properties);
+ }
+
+ static Topology buildTopology(
+ String inputTopic,
+ String outputTopic,
+ KeyValueBytesStoreSupplier storeSupplier,
+ ObjectMapper mapper)
{
StreamsBuilder builder = new StreamsBuilder();
.toStream()
.to(outputTopic);
- streams = new KafkaStreams(builder.build(), properties);
+ return builder.build();
}
public void start()