From f2f5d14ce899d97a7043a5f34d26b77d5bdfb795 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 11 Feb 2023 13:02:44 +0100 Subject: [PATCH] WIP:test --- .../counter/CounterStreamProcessor.java | 13 +++- .../de/juplo/kafka/wordcount/counter/Key.java | 11 +++- .../CounterStreamProcessorTopologyTest.java | 64 +++++++++++++++++++ 3 files changed, 84 insertions(+), 4 deletions(-) create mode 100644 src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java b/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java index b19502d..3c34159 100644 --- a/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java +++ b/src/main/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessor.java @@ -6,6 +6,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.kafka.streams.KafkaStreams; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.KStream; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier; @@ -25,6 +26,16 @@ public class CounterStreamProcessor Properties properties, KeyValueBytesStoreSupplier storeSupplier, ObjectMapper mapper) + { + Topology topology = + CounterStreamProcessor.buildTopology(inputTopic, outputTopic, mapper); + streams = new KafkaStreams(topology, properties); + } + + static Topology buildTopology( + String inputTopic, + String outputTopic, + ObjectMapper mapper) { StreamsBuilder builder = new StreamsBuilder(); @@ -48,7 +59,7 @@ public class CounterStreamProcessor .toStream() .to(outputTopic); - streams = new KafkaStreams(builder.build(), properties); + return builder.build(); } public void start() diff --git a/src/main/java/de/juplo/kafka/wordcount/counter/Key.java b/src/main/java/de/juplo/kafka/wordcount/counter/Key.java index 1e00dca..f926efe 100644 --- a/src/main/java/de/juplo/kafka/wordcount/counter/Key.java +++ b/src/main/java/de/juplo/kafka/wordcount/counter/Key.java @@ -1,11 +1,16 @@ package de.juplo.kafka.wordcount.counter; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; import lombok.Value; -@Value(staticConstructor = "of") +@Data +@AllArgsConstructor(staticName = "of") +@NoArgsConstructor public class Key { - private final String username; - private final String word; + private String username; + private String word; } diff --git a/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java b/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java new file mode 100644 index 0000000..8fc08a1 --- /dev/null +++ b/src/test/java/de/juplo/kafka/wordcount/counter/CounterStreamProcessorTopologyTest.java @@ -0,0 +1,64 @@ +package de.juplo.kafka.wordcount.counter; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonMappingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.*; +import org.apache.kafka.streams.*; +import org.junit.jupiter.api.Test; +import org.springframework.kafka.support.serializer.JsonDeserializer; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; + +import java.util.List; +import java.util.Properties; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; + + +public class CounterStreamProcessorTopologyTest +{ + public final static String IN = "TEST-IN"; + public final static String OUT = "TEST-OUT"; + + @Test + public void test() + { + ObjectMapper mapper = new ObjectMapper(); + Topology topology = CounterStreamProcessor.buildTopology(IN, OUT, mapper); + + Properties properties = new Properties(); + properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "test"); + properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234"); + properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName()); + properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class.getName()); + properties.put(StreamsConfig.STATE_DIR_CONFIG, "target"); + properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 0); + properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0); + properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + TopologyTestDriver testDriver = new TopologyTestDriver(topology, properties); + + TestInputTopic in = testDriver.createInputTopic( + IN, + new StringSerializer(), + new StringSerializer()); + + TestOutputTopic out = testDriver.createOutputTopic( + OUT, + new JsonDeserializer(Key.class).ignoreTypeHeaders(), + new StringDeserializer()); + + TestData.writeInputData((key, value) -> in.pipeInput(key, value)); + + List receivedMessages = out + .readRecordsToList() + .stream() + .map(record -> Message.of(record.key(), record.value())) + .toList(); + + TestData.assertExpectedResult(receivedMessages); + } +} -- 2.20.1