1 package de.juplo.kafka.wordcount.counter;
3 import com.fasterxml.jackson.databind.ObjectMapper;
4 import org.apache.kafka.common.serialization.*;
5 import org.apache.kafka.streams.*;
6 import org.apache.kafka.streams.state.Stores;
7 import org.junit.jupiter.api.Test;
10 import java.util.Properties;
13 public class CounterStreamProcessorTopologyTest
15 public final static String IN = "TEST-IN";
16 public final static String OUT = "TEST-OUT";
21 Topology topology = CounterStreamProcessor.buildTopology(
24 Stores.inMemoryKeyValueStore("TOPOLOGY-TEST"),
27 CounterApplicationConfiguriation config =
28 new CounterApplicationConfiguriation();
29 Properties properties =
30 config.propertyMap(new CounterApplicationProperties());
32 TopologyTestDriver testDriver = new TopologyTestDriver(topology, properties);
34 TestInputTopic<String, String> in = testDriver.createInputTopic(
36 new StringSerializer(),
37 new StringSerializer());
39 TestOutputTopic<String, String> out = testDriver.createOutputTopic(
41 new StringDeserializer(),
42 new StringDeserializer());
44 TestData.writeInputData((key, value) -> in.pipeInput(key, value));
46 List<Message> receivedMessages = out
49 .map(record -> Message.of(record.key(), record.value()))
52 TestData.assertExpectedResult(receivedMessages);