import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.Topology;
import java.util.Properties;
String inputTopic,
String outputTopic,
Properties props)
+ {
+ Topology topology = Top10StreamProcessor.buildTopology(
+ inputTopic,
+ outputTopic);
+
+ streams = new KafkaStreams(topology, props);
+ }
+
+ static Topology buildTopology(
+ String inputTopic,
+ String outputTopic)
{
StreamsBuilder builder = new StreamsBuilder();
.toStream()
.to(outputTopic);
- streams = new KafkaStreams(builder.build(), props);
+ Topology topology = builder.build();
+ log.info("\n\n{}", topology.describe());
+
+ return topology;
}
public void start()