counter: 1.1.11 - Added a test, that is based on `TopologyTestDriver`
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / counter / CounterStreamProcessorTopologyTest.java
1 package de.juplo.kafka.wordcount.counter;
2
3 import com.fasterxml.jackson.databind.ObjectMapper;
4 import org.apache.kafka.common.serialization.*;
5 import org.apache.kafka.streams.*;
6 import org.apache.kafka.streams.state.Stores;
7 import org.junit.jupiter.api.Test;
8
9 import java.util.List;
10 import java.util.Properties;
11
12
13 public class CounterStreamProcessorTopologyTest
14 {
15   public final static String IN = "TEST-IN";
16   public final static String OUT = "TEST-OUT";
17
18   @Test
19   public void test()
20   {
21     Topology topology = CounterStreamProcessor.buildTopology(
22         IN,
23         OUT,
24         Stores.inMemoryKeyValueStore("TOPOLOGY-TEST"),
25         new ObjectMapper());
26
27     CounterApplicationConfiguriation config =
28         new CounterApplicationConfiguriation();
29     Properties properties =
30         config.propertyMap(new CounterApplicationProperties());
31
32     TopologyTestDriver testDriver = new TopologyTestDriver(topology, properties);
33
34     TestInputTopic<String, String> in = testDriver.createInputTopic(
35         IN,
36         new StringSerializer(),
37         new StringSerializer());
38
39     TestOutputTopic<String, String> out = testDriver.createOutputTopic(
40         OUT,
41         new StringDeserializer(),
42         new StringDeserializer());
43
44     TestData.writeInputData((key, value) -> in.pipeInput(key, value));
45
46     List<Message> receivedMessages = out
47         .readRecordsToList()
48         .stream()
49         .map(record -> Message.of(record.key(), record.value()))
50         .toList();
51
52     TestData.assertExpectedResult(receivedMessages);
53   }
54 }