package de.juplo.kafka.wordcount.top10;
-import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.Topology;
import java.util.Properties;
String inputTopic,
String outputTopic,
Properties props)
+ {
+ Topology topology = Top10StreamProcessor.buildTopology(
+ inputTopic,
+ outputTopic);
+
+ streams = new KafkaStreams(topology, props);
+ }
+
+ static Topology buildTopology(
+ String inputTopic,
+ String outputTopic)
{
StreamsBuilder builder = new StreamsBuilder();
.toStream()
.to(outputTopic);
- streams = new KafkaStreams(builder.build(), props);
+ Topology topology = builder.build();
+ log.info("\n\n{}", topology.describe());
+
+ return topology;
}
public void start()
@Test
public void test()
{
- Topology topology = CounterStreamProcessor.buildTopology(
- IN,
- OUT,
- Stores.inMemoryKeyValueStore("TOPOLOGY-TEST"));
+ Topology topology = Top10StreamProcessor.buildTopology(IN, OUT);
- CounterApplicationConfiguriation applicationConfiguriation =
- new CounterApplicationConfiguriation();
+ Top10ApplicationConfiguriation applicationConfiguriation =
+ new Top10ApplicationConfiguriation();
Properties streamProcessorProperties =
applicationConfiguriation.streamProcessorProperties(new CounterApplicationProperties());
Map<String, Object> propertyMap = convertToMap(streamProcessorProperties);