From 80c7a5b7c0e171a8c442162cce81d1ec4148726b Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 12 Feb 2023 14:56:34 +0100 Subject: [PATCH] WIP:test --- .../kafka/wordcount/counter/CounterStreamProcessor.java | 9 +++++++-- .../counter/CounterStreamProcessorTopologyTest.java | 8 ++++++-- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java index 3c34159..4002df2 100644 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java @@ -27,14 +27,19 @@ public class CounterStreamProcessor KeyValueBytesStoreSupplier storeSupplier, ObjectMapper mapper) { - Topology topology = - CounterStreamProcessor.buildTopology(inputTopic, outputTopic, mapper); + Topology topology = CounterStreamProcessor.buildTopology( + inputTopic, + outputTopic, + storeSupplier, + mapper); + streams = new KafkaStreams(topology, properties); } static Topology buildTopology( String inputTopic, String outputTopic, + KeyValueBytesStoreSupplier storeSupplier, ObjectMapper mapper) { StreamsBuilder builder = new StreamsBuilder(); diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java index 8fc08a1..4881c40 100644 --- a/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java +++ b/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java @@ -6,6 +6,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.*; import org.apache.kafka.streams.*; +import org.apache.kafka.streams.state.Stores; import org.junit.jupiter.api.Test; import org.springframework.kafka.support.serializer.JsonDeserializer; import org.springframework.util.LinkedMultiValueMap; @@ -26,8 +27,11 @@ public class CounterStreamProcessorTopologyTest @Test public void test() { - ObjectMapper mapper = new ObjectMapper(); - Topology topology = CounterStreamProcessor.buildTopology(IN, OUT, mapper); + Topology topology = CounterStreamProcessor.buildTopology( + IN, + OUT, + Stores.inMemoryKeyValueStore("TOPOLOGY-TEST"), + new ObjectMapper()); Properties properties = new Properties(); properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "test"); -- 2.20.1