counter: 1.3.1 - Refined `CounterStreamProcessor` (DRY for type-mapping)
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / counter / CounterStreamProcessorTopologyTest.java
1 package de.juplo.kafka.wordcount.counter;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.streams.*;
5 import org.apache.kafka.streams.state.Stores;
6 import org.junit.jupiter.api.Test;
7 import org.springframework.kafka.support.serializer.JsonDeserializer;
8 import org.springframework.kafka.support.serializer.JsonSerde;
9 import org.springframework.kafka.support.serializer.JsonSerializer;
10
11 import java.util.List;
12 import java.util.Map;
13 import java.util.Properties;
14
15 import static de.juplo.kafka.wordcount.counter.TestData.convertToMap;
16 import static de.juplo.kafka.wordcount.counter.TestData.parseHeader;
17 import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.DEFAULT_CLASSID_FIELD_NAME;
18 import static org.springframework.kafka.support.mapping.AbstractJavaTypeMapper.KEY_DEFAULT_CLASSID_FIELD_NAME;
19
20
21 @Slf4j
22 public class CounterStreamProcessorTopologyTest
23 {
24   public final static String IN = "TEST-IN";
25   public final static String OUT = "TEST-OUT";
26
27   @Test
28   public void test()
29   {
30     Topology topology = CounterStreamProcessor.buildTopology(
31         IN,
32         OUT,
33         Stores.inMemoryKeyValueStore("TOPOLOGY-TEST"));
34
35     CounterApplicationConfiguriation applicationConfiguriation =
36         new CounterApplicationConfiguriation();
37     Properties streamProcessorProperties =
38         applicationConfiguriation.streamProcessorProperties(new CounterApplicationProperties());
39     Map<String, Object> propertyMap = convertToMap(streamProcessorProperties);
40
41     JsonSerde<?> keySerde = new JsonSerde<>();
42     keySerde.configure(propertyMap, true);
43     JsonSerde<?> valueSerde = new JsonSerde<>();
44     valueSerde.configure(propertyMap, false);
45
46     TopologyTestDriver testDriver = new TopologyTestDriver(topology, streamProcessorProperties);
47
48     TestInputTopic<String, Word> in = testDriver.createInputTopic(
49         IN,
50         (JsonSerializer<String>)keySerde.serializer(),
51         (JsonSerializer<Word>)valueSerde.serializer());
52
53     TestOutputTopic<Word, WordCounter> out = testDriver.createOutputTopic(
54         OUT,
55         (JsonDeserializer<Word>)keySerde.deserializer(),
56         (JsonDeserializer<WordCounter>)valueSerde.deserializer());
57
58     TestData.writeInputData((key, value) -> in.pipeInput(key, value));
59
60     List<KeyValue<Word, WordCounter>> receivedMessages = out
61         .readRecordsToList()
62         .stream()
63         .map(record ->
64         {
65           log.debug(
66               "OUT: {} -> {}, {}, {}",
67               record.key(),
68               record.value(),
69               parseHeader(record.headers(), KEY_DEFAULT_CLASSID_FIELD_NAME),
70               parseHeader(record.headers(), DEFAULT_CLASSID_FIELD_NAME));
71           return KeyValue.pair(record.key(), record.value());
72         })
73         .toList();
74
75     TestData.assertExpectedResult(receivedMessages);
76   }
77 }