counter: 1.2.10 - Replaced helper-class `Message` with `KeyValue`
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / counter / CounterStreamProcessorTopologyTest.java
1 package de.juplo.kafka.wordcount.counter;
2
3 import org.apache.kafka.streams.*;
4 import org.apache.kafka.streams.state.Stores;
5 import org.junit.jupiter.api.Test;
6 import org.springframework.kafka.support.serializer.JsonDeserializer;
7 import org.springframework.kafka.support.serializer.JsonSerde;
8 import org.springframework.kafka.support.serializer.JsonSerializer;
9
10 import java.util.List;
11 import java.util.Map;
12 import java.util.Properties;
13 import java.util.stream.Collectors;
14
15
16 public class CounterStreamProcessorTopologyTest
17 {
18   public final static String IN = "TEST-IN";
19   public final static String OUT = "TEST-OUT";
20
21   @Test
22   public void test()
23   {
24     Topology topology = CounterStreamProcessor.buildTopology(
25         IN,
26         OUT,
27         Stores.inMemoryKeyValueStore("TOPOLOGY-TEST"));
28
29     CounterApplicationConfiguriation applicationConfiguriation =
30         new CounterApplicationConfiguriation();
31     Properties streamProcessorProperties =
32         applicationConfiguriation.streamProcessorProperties(new CounterApplicationProperties());
33
34     Map<String, ?> propertyMap = streamProcessorProperties
35         .entrySet()
36         .stream()
37         .collect(
38             Collectors.toMap(
39                 entry -> (String)entry.getKey(),
40                 entry -> entry.getValue()
41             ));
42     JsonSerde<?> keySerde = new JsonSerde<>();
43     keySerde.configure(propertyMap, true);
44     JsonSerde<?> valueSerde = new JsonSerde<>();
45     valueSerde.configure(propertyMap, false);
46
47     TopologyTestDriver testDriver = new TopologyTestDriver(topology, streamProcessorProperties);
48
49     TestInputTopic<String, Word> in = testDriver.createInputTopic(
50         IN,
51         (JsonSerializer<String>)keySerde.serializer(),
52         (JsonSerializer<Word>)valueSerde.serializer());
53
54     TestOutputTopic<Word, WordCount> out = testDriver.createOutputTopic(
55         OUT,
56         (JsonDeserializer<Word>)keySerde.deserializer(),
57         (JsonDeserializer<WordCount>)valueSerde.deserializer());
58
59     TestData.writeInputData((key, value) -> in.pipeInput(key, value));
60
61     List<KeyValue<Word,WordCount>> receivedMessages = out
62         .readRecordsToList()
63         .stream()
64         .map(record -> KeyValue.pair(record.key(), record.value()))
65         .toList();
66
67     TestData.assertExpectedResult(receivedMessages);
68   }
69 }