WIP:test
authorKai Moritz <kai@juplo.de>
Sun, 12 Feb 2023 13:56:34 +0000 (14:56 +0100)
committerKai Moritz <kai@juplo.de>
Sun, 12 Feb 2023 13:56:34 +0000 (14:56 +0100)
src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java
src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java

index 3c34159..4002df2 100644 (file)
@@ -27,14 +27,19 @@ public class CounterStreamProcessor
                        KeyValueBytesStoreSupplier storeSupplier,
                        ObjectMapper mapper)
        {
-               Topology topology =
-                               CounterStreamProcessor.buildTopology(inputTopic, outputTopic, mapper);
+               Topology topology = CounterStreamProcessor.buildTopology(
+                               inputTopic,
+                               outputTopic,
+                               storeSupplier,
+                               mapper);
+
                streams = new KafkaStreams(topology, properties);
        }
 
        static Topology buildTopology(
                        String inputTopic,
                        String outputTopic,
+                       KeyValueBytesStoreSupplier storeSupplier,
                        ObjectMapper mapper)
        {
                StreamsBuilder builder = new StreamsBuilder();
index 8fc08a1..4881c40 100644 (file)
@@ -6,6 +6,7 @@ import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.*;
 import org.apache.kafka.streams.*;
+import org.apache.kafka.streams.state.Stores;
 import org.junit.jupiter.api.Test;
 import org.springframework.kafka.support.serializer.JsonDeserializer;
 import org.springframework.util.LinkedMultiValueMap;
@@ -26,8 +27,11 @@ public class CounterStreamProcessorTopologyTest
   @Test
   public void test()
   {
-    ObjectMapper mapper = new ObjectMapper();
-    Topology topology = CounterStreamProcessor.buildTopology(IN, OUT, mapper);
+    Topology topology = CounterStreamProcessor.buildTopology(
+        IN,
+        OUT,
+        Stores.inMemoryKeyValueStore("TOPOLOGY-TEST"),
+        new ObjectMapper());
 
     Properties properties = new Properties();
     properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");