--- /dev/null
+package de.juplo.kafka.wordcount.counter;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.common.serialization.*;
+import org.apache.kafka.streams.*;
+import org.apache.kafka.streams.state.Stores;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Properties;
+
+
+public class CounterStreamProcessorTopologyTest
+{
+ public final static String IN = "TEST-IN";
+ public final static String OUT = "TEST-OUT";
+
+ @Test
+ public void test()
+ {
+ Topology topology = CounterStreamProcessor.buildTopology(
+ IN,
+ OUT,
+ Stores.inMemoryKeyValueStore("TOPOLOGY-TEST"),
+ new ObjectMapper());
+
+ CounterApplicationConfiguriation config =
+ new CounterApplicationConfiguriation();
+ Properties properties =
+ config.propertyMap(new CounterApplicationProperties());
+
+ TopologyTestDriver testDriver = new TopologyTestDriver(topology, properties);
+
+ TestInputTopic<String, String> in = testDriver.createInputTopic(
+ IN,
+ new StringSerializer(),
+ new StringSerializer());
+
+ TestOutputTopic<String, String> out = testDriver.createOutputTopic(
+ OUT,
+ new StringDeserializer(),
+ new StringDeserializer());
+
+ TestData.writeInputData((key, value) -> in.pipeInput(key, value));
+
+ List<Message> receivedMessages = out
+ .readRecordsToList()
+ .stream()
+ .map(record -> Message.of(record.key(), record.value()))
+ .toList();
+
+ TestData.assertExpectedResult(receivedMessages);
+ }
+}