fd920006ed9811b60d5f868be2a90d1def1a1d6d
[demos/kafka/wordcount] / src / test / java / de / juplo / kafka / wordcount / counter / CounterApplicationIT.java
1 package de.juplo.kafka.wordcount.counter;
2
3 import com.fasterxml.jackson.core.JsonProcessingException;
4 import com.fasterxml.jackson.databind.ObjectMapper;
5 import lombok.RequiredArgsConstructor;
6 import lombok.Value;
7 import lombok.extern.slf4j.Slf4j;
8 import org.apache.kafka.clients.consumer.ConsumerRecord;
9 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
10 import org.apache.kafka.streams.state.Stores;
11 import org.junit.jupiter.api.BeforeEach;
12 import org.junit.jupiter.api.Test;
13 import org.springframework.beans.factory.annotation.Autowired;
14 import org.springframework.boot.test.context.SpringBootTest;
15 import org.springframework.boot.test.context.TestConfiguration;
16 import org.springframework.context.annotation.Bean;
17 import org.springframework.context.annotation.Primary;
18 import org.springframework.kafka.annotation.KafkaListener;
19 import org.springframework.kafka.core.KafkaTemplate;
20 import org.springframework.kafka.test.context.EmbeddedKafka;
21 import org.springframework.util.LinkedMultiValueMap;
22 import org.springframework.util.MultiValueMap;
23
24 import java.time.Duration;
25
26 import static de.juplo.kafka.wordcount.counter.CounterApplicationIT.*;
27 import static org.assertj.core.api.Assertions.assertThat;
28 import static org.awaitility.Awaitility.*;
29
30
31 @SpringBootTest(
32                 properties = {
33                                 "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
34                                 "juplo.wordcount.counter.bootstrap-server=${spring.embedded.kafka.brokers}",
35                                 "juplo.wordcount.counter.commit-interval=0",
36                                 "juplo.wordcount.counter.cacheMaxBytes=0",
37                                 "juplo.wordcount.counter.input-topic=" + TOPIC_IN,
38                                 "juplo.wordcount.counter.output-topic=" + TOPIC_OUT })
39 @EmbeddedKafka(topics = { TOPIC_IN, TOPIC_OUT }, partitions = PARTITIONS)
40 @Slf4j
41 public class CounterApplicationIT
42 {
43         public final static String TOPIC_IN = "in";
44         public final static String TOPIC_OUT = "out";
45         static final int PARTITIONS = 2;
46
47         @Autowired
48         KafkaTemplate<String, String> kafkaTemplate;
49         @Autowired
50         ObjectMapper mapper;
51         @Autowired
52         Consumer consumer;
53
54
55         @BeforeEach
56         public void clear()
57         {
58                 consumer.received.clear();
59         }
60
61
62         @Test
63         void testSendMessage() throws Exception
64         {
65                 kafkaTemplate.send(TOPIC_IN, "peter", "Hallo");
66                 kafkaTemplate.send(TOPIC_IN, "klaus", "Müsch");
67                 kafkaTemplate.send(TOPIC_IN, "peter", "Welt");
68                 kafkaTemplate.send(TOPIC_IN, "klaus", "Müsch");
69                 kafkaTemplate.send(TOPIC_IN, "klaus", "s");
70                 kafkaTemplate.send(TOPIC_IN, "peter", "Boäh");
71                 kafkaTemplate.send(TOPIC_IN, "peter", "Welt");
72                 kafkaTemplate.send(TOPIC_IN, "peter", "Boäh");
73                 kafkaTemplate.send(TOPIC_IN, "klaus", "s");
74                 kafkaTemplate.send(TOPIC_IN, "peter", "Boäh");
75                 kafkaTemplate.send(TOPIC_IN, "klaus", "s");
76
77                 Message peter1 = Message.of(
78                                 "{\"username\":\"peter\",\"word\":\"Hallo\"}",
79                                 "1");
80                 Message peter2 = Message.of(
81                                 "{\"username\":\"peter\",\"word\":\"Welt\"}",
82                                 "1");
83                 Message peter3 = Message.of(
84                                 "{\"username\":\"peter\",\"word\":\"Boäh\"}",
85                                 "1");
86                 Message peter4 = Message.of(
87                                 "{\"username\":\"peter\",\"word\":\"Welt\"}",
88                                 "2");
89                 Message peter5 = Message.of(
90                                 "{\"username\":\"peter\",\"word\":\"Boäh\"}",
91                                 "2");
92                 Message peter6 = Message.of(
93                                 "{\"username\":\"peter\",\"word\":\"Boäh\"}",
94                                 "3");
95
96                 Message klaus1 = Message.of(
97                                 "{\"username\":\"klaus\",\"word\":\"Müsch\"}",
98                                 "1");
99                 Message klaus2 = Message.of(
100                                 "{\"username\":\"klaus\",\"word\":\"Müsch\"}",
101                                 "2");
102                 Message klaus3 = Message.of(
103                                 "{\"username\":\"klaus\",\"word\":\"s\"}",
104                                 "1");
105                 Message klaus4 = Message.of(
106                                 "{\"username\":\"klaus\",\"word\":\"s\"}",
107                                 "2");
108                 Message klaus5 = Message.of(
109                                 "{\"username\":\"klaus\",\"word\":\"s\"}",
110                                 "3");
111
112                 await("Expexted converted data")
113                                 .atMost(Duration.ofSeconds(10))
114                                 .untilAsserted(() ->
115                                 {
116                                         assertThat(consumer.received).hasSize(2);
117                                         assertThat(consumer.received.get("klaus")).containsExactly(klaus1, klaus2, klaus3, klaus4, klaus5);
118                                         assertThat(consumer.received.get("peter")).containsExactly(peter1, peter2, peter3, peter4, peter5, peter6);
119                                 });
120         }
121
122
123         @RequiredArgsConstructor
124         static class Consumer
125         {
126                 private final MultiValueMap<String, Message> received = new LinkedMultiValueMap<>();
127                 private final ObjectMapper mapper;
128
129                 @KafkaListener(groupId = "TEST", topics = TOPIC_OUT)
130                 public void receive(ConsumerRecord<String, String> record) throws JsonProcessingException
131                 {
132                         log.debug("Received message: {}", record);
133                         Key key = mapper.readValue(record.key(), Key.class);
134                         received.add(key.getUsername(), Message.of(record.key(),record.value()));
135                 }
136         }
137
138         @Value(staticConstructor = "of")
139         static class Message
140         {
141                 String key;
142                 String value;
143         }
144
145         @TestConfiguration
146         static class Configuration
147         {
148                 @Bean
149                 Consumer consumer(ObjectMapper mapper)
150                 {
151                         return new Consumer(mapper);
152                 }
153
154                 @Primary
155                 @Bean
156                 KeyValueBytesStoreSupplier inMemoryStoreSupplier()
157                 {
158                         return Stores.inMemoryKeyValueStore("TEST-STORE");
159                 }
160         }
161 }