From a01b5436757600a44b20260798d7331c7e33a851 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Tue, 14 May 2024 23:48:48 +0200 Subject: [PATCH] WIP --- .../top10/Top10ApplicationConfiguration.java | 1 - .../wordcount/top10/Top10StreamProcessor.java | 17 ++++++++++++++++- .../top10/Top10StreamProcessorTopologyTest.java | 9 +++------ 3 files changed, 19 insertions(+), 8 deletions(-) diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java index b43d825..ae161d8 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java @@ -1,6 +1,5 @@ package de.juplo.kafka.wordcount.top10; -import com.fasterxml.jackson.databind.ObjectMapper; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.streams.StreamsConfig; diff --git a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java index e6deee0..084e425 100644 --- a/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java @@ -4,6 +4,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.Topology; import java.util.Properties; @@ -18,6 +19,17 @@ public class Top10StreamProcessor String inputTopic, String outputTopic, Properties props) + { + Topology topology = Top10StreamProcessor.buildTopology( + inputTopic, + outputTopic); + + streams = new KafkaStreams(topology, props); + } + + static Topology buildTopology( + String inputTopic, + String outputTopic) { StreamsBuilder builder = new StreamsBuilder(); @@ -40,7 +52,10 @@ public class Top10StreamProcessor .toStream() .to(outputTopic); - streams = new KafkaStreams(builder.build(), props); + Topology topology = builder.build(); + log.info("\n\n{}", topology.describe()); + + return topology; } public void start() diff --git a/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java index 8b4a593..3a744dd 100644 --- a/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java @@ -27,13 +27,10 @@ public class Top10StreamProcessorTopologyTest @Test public void test() { - Topology topology = CounterStreamProcessor.buildTopology( - IN, - OUT, - Stores.inMemoryKeyValueStore("TOPOLOGY-TEST")); + Topology topology = Top10StreamProcessor.buildTopology(IN, OUT); - CounterApplicationConfiguriation applicationConfiguriation = - new CounterApplicationConfiguriation(); + Top10ApplicationConfiguriation applicationConfiguriation = + new Top10ApplicationConfiguriation(); Properties streamProcessorProperties = applicationConfiguriation.streamProcessorProperties(new CounterApplicationProperties()); Map propertyMap = convertToMap(streamProcessorProperties); -- 2.20.1