WIP
authorKai Moritz <kai@juplo.de>
Tue, 14 May 2024 21:48:48 +0000 (23:48 +0200)
committerKai Moritz <kai@juplo.de>
Tue, 14 May 2024 21:48:48 +0000 (23:48 +0200)
src/main/java/de/juplo/kafka/wordcount/top10/Top10ApplicationConfiguration.java
src/main/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessor.java
src/test/java/de/juplo/kafka/wordcount/top10/Top10StreamProcessorTopologyTest.java

index b43d825..ae161d8 100644 (file)
@@ -1,6 +1,5 @@
 package de.juplo.kafka.wordcount.top10;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.streams.StreamsConfig;
index e6deee0..084e425 100644 (file)
@@ -4,6 +4,7 @@ import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.Topology;
 
 import java.util.Properties;
 
@@ -18,6 +19,17 @@ public class Top10StreamProcessor
                        String inputTopic,
                        String outputTopic,
                        Properties props)
+       {
+               Topology topology = Top10StreamProcessor.buildTopology(
+                               inputTopic,
+                               outputTopic);
+
+               streams = new KafkaStreams(topology, props);
+       }
+
+       static Topology buildTopology(
+                       String inputTopic,
+                       String outputTopic)
        {
                StreamsBuilder builder = new StreamsBuilder();
 
@@ -40,7 +52,10 @@ public class Top10StreamProcessor
                                .toStream()
                                .to(outputTopic);
 
-               streams = new KafkaStreams(builder.build(), props);
+               Topology topology = builder.build();
+               log.info("\n\n{}", topology.describe());
+
+               return topology;
        }
 
        public void start()
index 8b4a593..3a744dd 100644 (file)
@@ -27,13 +27,10 @@ public class Top10StreamProcessorTopologyTest
   @Test
   public void test()
   {
-    Topology topology = CounterStreamProcessor.buildTopology(
-        IN,
-        OUT,
-        Stores.inMemoryKeyValueStore("TOPOLOGY-TEST"));
+    Topology topology = Top10StreamProcessor.buildTopology(IN, OUT);
 
-    CounterApplicationConfiguriation applicationConfiguriation =
-        new CounterApplicationConfiguriation();
+    Top10ApplicationConfiguriation applicationConfiguriation =
+        new Top10ApplicationConfiguriation();
     Properties streamProcessorProperties =
         applicationConfiguriation.streamProcessorProperties(new CounterApplicationProperties());
     Map<String, Object> propertyMap = convertToMap(streamProcessorProperties);