import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import java.util.Arrays;
String inputTopic,
String outputTopic,
Properties properties)
+ {
+ Topology topology = buildTopology(inputTopic, outputTopic);
+ streams = new KafkaStreams(topology, properties);
+ }
+
+ static Topology buildTopology(
+ String inputTopic,
+ String outputTopic)
{
StreamsBuilder builder = new StreamsBuilder();
.toList())
.to(outputTopic);
- streams = new KafkaStreams(builder.build(), properties);
+ Topology topology = builder.build();
+ log.info("\n\n{}", topology.describe());
+
+ return topology;
}
public void start()