counter: 1.4.2 - RocksDB does nor work in Alpine-Linux
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / counter / CounterStreamProcessorTopologyTest.java
1 package de.juplo.kafka.wordcount.counter;
2
3 import de.juplo.kafka.wordcount.splitter.TestInputUser;
4 import de.juplo.kafka.wordcount.splitter.TestInputWord;
5 import de.juplo.kafka.wordcount.top10.TestOutputWord;
6 import de.juplo.kafka.wordcount.top10.TestOutputWordCounter;
7 import lombok.extern.slf4j.Slf4j;
8 import org.apache.kafka.streams.TestInputTopic;
9 import org.apache.kafka.streams.TestOutputTopic;
10 import org.apache.kafka.streams.Topology;
11 import org.apache.kafka.streams.TopologyTestDriver;
12 import org.apache.kafka.streams.state.KeyValueStore;
13 import org.apache.kafka.streams.state.Stores;
14 import org.junit.jupiter.api.AfterAll;
15 import org.junit.jupiter.api.BeforeAll;
16 import org.junit.jupiter.api.DisplayName;
17 import org.junit.jupiter.api.Test;
18 import org.springframework.kafka.support.serializer.JsonDeserializer;
19 import org.springframework.kafka.support.serializer.JsonSerializer;
20 import org.springframework.util.LinkedMultiValueMap;
21 import org.springframework.util.MultiValueMap;
22
23 import java.util.Map;
24
25 import static de.juplo.kafka.wordcount.counter.CounterApplicationConfiguriation.serializationConfig;
26 import static de.juplo.kafka.wordcount.counter.CounterStreamProcessor.STORE_NAME;
27
28
29 @Slf4j
30 public class CounterStreamProcessorTopologyTest
31 {
32   public static final String IN = "TEST-IN";
33   public static final String OUT = "TEST-OUT";
34
35
36   static TopologyTestDriver testDriver;
37   static MultiValueMap<TestOutputWord, TestOutputWordCounter> receivedMessages = new LinkedMultiValueMap<>();
38
39
40   @BeforeAll
41   public static void setUpTestDriver()
42   {
43     Topology topology = CounterStreamProcessor.buildTopology(
44         IN,
45         OUT,
46         Stores.inMemoryKeyValueStore(STORE_NAME));
47
48     testDriver = new TopologyTestDriver(topology, serializationConfig());
49
50     TestInputTopic<TestInputUser, TestInputWord> in =
51         testDriver.createInputTopic(IN, serializer(), serializer());
52     TestOutputTopic<TestOutputWord, TestOutputWordCounter> out =
53         testDriver.createOutputTopic(OUT, keyDeserializer(), valueDeserializer());
54
55     TestData
56         .getInputMessages()
57         .forEach(kv -> in.pipeInput(kv.key, kv.value));
58
59     receivedMessages = new LinkedMultiValueMap<>();
60     out
61         .readRecordsToList()
62         .forEach(record -> receivedMessages.add(record.key(), record.value()));
63   }
64
65
66   @DisplayName("Assert the expected output messages")
67   @Test
68   public void testExpectedMessages()
69   {
70     TestData.assertExpectedMessages(receivedMessages);
71   }
72
73   @DisplayName("Assert the expected number of messages")
74   @Test
75   public void testExpectedNumberOfMessagesForWord()
76   {
77     TestData.assertExpectedNumberOfMessagesForWord(receivedMessages);
78   }
79
80   @DisplayName("Await the expected final output messages")
81   @Test
82   public void testExpectedLastMessagesForWord()
83   {
84     TestData.assertExpectedLastMessagesForWord(receivedMessages);
85   }
86
87   @DisplayName("Assert the expected state in the state-store")
88   @Test
89   public void testExpectedState()
90   {
91     KeyValueStore<Word, Long> store = testDriver.getKeyValueStore(STORE_NAME);
92     TestData.assertExpectedState(store);
93   }
94
95   @AfterAll
96   public static void tearDown()
97   {
98     testDriver.close();
99   }
100
101
102   private static JsonSerializer serializer()
103   {
104     return new JsonSerializer().noTypeInfo();
105   }
106
107   private static JsonDeserializer<TestOutputWord> keyDeserializer()
108   {
109     return deserializer(true);
110   }
111
112   private static JsonDeserializer<TestOutputWordCounter> valueDeserializer()
113   {
114     return deserializer(false);
115   }
116
117   private static <T> JsonDeserializer<T> deserializer(boolean isKey)
118   {
119     JsonDeserializer<T> deserializer = new JsonDeserializer<>();
120     deserializer.configure(
121         Map.of(JsonDeserializer.TYPE_MAPPINGS, typeMappingsConfig()),
122         isKey);
123     return deserializer;
124   }
125
126   private static String typeMappingsConfig()
127   {
128     return CounterStreamProcessor.typeMappingsConfig(TestOutputWord.class, TestOutputWordCounter.class);
129   }
130 }