1 package de.juplo.kafka.wordcount.counter;
3 import org.apache.kafka.common.serialization.StringDeserializer;
4 import org.apache.kafka.common.serialization.StringSerializer;
5 import org.apache.kafka.streams.TestInputTopic;
6 import org.apache.kafka.streams.TestOutputTopic;
7 import org.apache.kafka.streams.Topology;
8 import org.apache.kafka.streams.TopologyTestDriver;
9 import org.apache.kafka.streams.state.Stores;
10 import org.junit.jupiter.api.Test;
12 import java.util.List;
13 import java.util.Properties;
16 public class CounterStreamProcessorTopologyTest
18 public final static String IN = "TEST-IN";
19 public final static String OUT = "TEST-OUT";
24 Topology topology = CounterStreamProcessor.buildTopology(
27 Stores.inMemoryKeyValueStore("TOPOLOGY-TEST"));
29 CounterApplicationConfiguriation config =
30 new CounterApplicationConfiguriation();
31 Properties properties =
32 config.propertyMap(new CounterApplicationProperties());
34 TopologyTestDriver testDriver = new TopologyTestDriver(topology, properties);
36 TestInputTopic<String, String> in = testDriver.createInputTopic(
38 new StringSerializer(),
39 new StringSerializer());
41 TestOutputTopic<String, String> out = testDriver.createOutputTopic(
43 new StringDeserializer(),
44 new StringDeserializer());
46 TestData.writeInputData((key, value) -> in.pipeInput(key, value));
48 List<Message> receivedMessages = out
51 .map(record -> Message.of(record.key(), record.value()))
54 TestData.assertExpectedResult(receivedMessages);