@KafkaListener im Test ergänzt, der das DLT mit liest
[demos/kafka/training] / src / test / java / de / juplo / kafka / GenericApplicationTests.java
1 package de.juplo.kafka;
2
3 import com.mongodb.client.MongoClient;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.clients.consumer.KafkaConsumer;
6 import org.apache.kafka.clients.producer.KafkaProducer;
7 import org.apache.kafka.clients.producer.ProducerRecord;
8 import org.apache.kafka.common.TopicPartition;
9 import org.apache.kafka.common.serialization.*;
10 import org.apache.kafka.common.utils.Bytes;
11 import org.junit.jupiter.api.*;
12 import org.springframework.beans.factory.annotation.Autowired;
13 import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
14 import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
15 import org.springframework.boot.autoconfigure.mongo.MongoProperties;
16 import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo;
17 import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer;
18 import org.springframework.boot.test.context.TestConfiguration;
19 import org.springframework.context.annotation.Bean;
20 import org.springframework.context.annotation.Import;
21 import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
22 import org.springframework.kafka.core.ConsumerFactory;
23 import org.springframework.kafka.test.context.EmbeddedKafka;
24 import org.springframework.test.context.TestPropertySource;
25 import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
26
27 import java.time.Duration;
28 import java.util.*;
29 import java.util.function.BiConsumer;
30 import java.util.function.Consumer;
31 import java.util.stream.Collectors;
32 import java.util.stream.IntStream;
33
34 import static de.juplo.kafka.GenericApplicationTests.PARTITIONS;
35 import static de.juplo.kafka.GenericApplicationTests.TOPIC;
36 import static org.assertj.core.api.Assertions.*;
37 import static org.awaitility.Awaitility.*;
38
39
40 @SpringJUnitConfig(initializers = ConfigDataApplicationContextInitializer.class)
41 @TestPropertySource(
42                 properties = {
43                                 "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
44                                 "spring.kafka.producer.bootstrap-servers=${spring.embedded.kafka.brokers}",
45                                 "sumup.adder.topic=" + TOPIC,
46                                 "spring.kafka.consumer.auto-commit-interval=500ms",
47                                 "spring.mongodb.embedded.version=4.4.13" })
48 @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
49 @EnableAutoConfiguration
50 @AutoConfigureDataMongo
51 @Slf4j
52 abstract class GenericApplicationTests<K, V>
53 {
54         public static final String TOPIC = "FOO";
55         public static final int PARTITIONS = 10;
56
57
58         @Autowired
59         org.apache.kafka.clients.consumer.Consumer<K, V> kafkaConsumer;
60         @Autowired
61         KafkaProperties kafkaProperties;
62         @Autowired
63         ApplicationProperties applicationProperties;
64         @Autowired
65         MongoClient mongoClient;
66         @Autowired
67         MongoProperties mongoProperties;
68         @Autowired
69         KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
70         @Autowired
71         TestRecordHandler recordHandler;
72         @Autowired
73         EndlessConsumer endlessConsumer;
74
75         KafkaProducer<Bytes, Bytes> testRecordProducer;
76         KafkaConsumer<Bytes, Bytes> offsetConsumer;
77         Map<TopicPartition, Long> oldOffsets;
78
79
80         final RecordGenerator recordGenerator;
81         final Consumer<ProducerRecord<Bytes, Bytes>> messageSender;
82
83         public GenericApplicationTests(RecordGenerator recordGenerator)
84         {
85                 this.recordGenerator = recordGenerator;
86                 this.messageSender = (record) -> sendMessage(record);
87         }
88
89
90         /** Tests methods */
91
92         @Test
93         void commitsCurrentOffsetsOnSuccess() throws Exception
94         {
95                 recordGenerator.generate(false, false, messageSender);
96
97                 int numberOfGeneratedMessages = recordGenerator.getNumberOfMessages();
98
99                 await(numberOfGeneratedMessages + " records received")
100                                 .atMost(Duration.ofSeconds(30))
101                                 .pollInterval(Duration.ofSeconds(1))
102                                 .until(() -> recordHandler.receivedMessages >= numberOfGeneratedMessages);
103
104                 await("Offsets committed")
105                                 .atMost(Duration.ofSeconds(10))
106                                 .pollInterval(Duration.ofSeconds(1))
107                                 .untilAsserted(() ->
108                                 {
109                                         checkSeenOffsetsForProgress();
110                                         assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets);
111                                 });
112
113                 assertThat(endlessConsumer.running())
114                                 .describedAs("Consumer should still be running")
115                                 .isTrue();
116
117                 endlessConsumer.stop();
118                 recordGenerator.assertBusinessLogic();
119         }
120
121         @Test
122         @SkipWhenErrorCannotBeGenerated(poisonPill = true)
123         void commitsOffsetOfErrorForReprocessingOnDeserializationError()
124         {
125                 recordGenerator.generate(true, false, messageSender);
126
127                 int numberOfValidMessages =
128                                 recordGenerator.getNumberOfMessages() -
129                                 recordGenerator.getNumberOfPoisonPills();
130
131                 await(numberOfValidMessages + " records received")
132                                 .atMost(Duration.ofSeconds(30))
133                                 .pollInterval(Duration.ofSeconds(1))
134                                 .until(() -> recordHandler.receivedMessages >= numberOfValidMessages);
135
136                 await("Offsets committed")
137                                 .atMost(Duration.ofSeconds(10))
138                                 .pollInterval(Duration.ofSeconds(1))
139                                 .untilAsserted(() ->
140                                 {
141                                         checkSeenOffsetsForProgress();
142                                         assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets);
143                                 });
144
145                 assertThat(endlessConsumer.running())
146                                 .describedAs("Consumer should still be running")
147                                 .isTrue();
148
149                 endlessConsumer.stop();
150                 recordGenerator.assertBusinessLogic();
151         }
152
153         @Test
154         @SkipWhenErrorCannotBeGenerated(logicError = true)
155         void commitsOffsetsOfUnseenRecordsOnLogicError()
156         {
157                 recordGenerator.generate(false, true, messageSender);
158
159                 int numberOfValidMessages =
160                                 recordGenerator.getNumberOfMessages() -
161                                 recordGenerator.getNumberOfLogicErrors();
162
163                 await(numberOfValidMessages + " records received")
164                                 .atMost(Duration.ofSeconds(30))
165                                 .pollInterval(Duration.ofSeconds(1))
166                                 .until(() -> recordHandler.receivedMessages >= numberOfValidMessages);
167
168                 await("Offsets committed")
169                                 .atMost(Duration.ofSeconds(10))
170                                 .pollInterval(Duration.ofSeconds(1))
171                                 .untilAsserted(() ->
172                                 {
173                                         checkSeenOffsetsForProgress();
174                                         assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets);
175                                 });
176
177                 assertThat(endlessConsumer.running())
178                                 .describedAs("Consumer should still be running")
179                                 .isTrue();
180
181                 endlessConsumer.stop();
182                 recordGenerator.assertBusinessLogic();
183         }
184
185
186         /** Helper methods for the verification of expectations */
187
188         void assertSeenOffsetsEqualCommittedOffsets(Map<TopicPartition, Long> offsetsToCheck)
189         {
190                 doForCurrentOffsets((tp, offset) ->
191                 {
192                         Long expected = offsetsToCheck.get(tp) + 1;
193                         log.debug("Checking, if the offset {} for {} is exactly {}", offset, tp, expected);
194                         assertThat(offset)
195                                         .describedAs("Committed offset corresponds to the offset of the consumer")
196                                         .isEqualTo(expected);
197                 });
198         }
199
200         void assertSeenOffsetsAreBehindCommittedOffsets(Map<TopicPartition, Long> offsetsToCheck)
201         {
202                 List<Boolean> isOffsetBehindSeen = new LinkedList<>();
203
204                 doForCurrentOffsets((tp, offset) ->
205                 {
206                         Long expected = offsetsToCheck.get(tp) + 1;
207                         log.debug("Checking, if the offset {} for {} is at most {}", offset, tp, expected);
208                         assertThat(offset)
209                                         .describedAs("Committed offset must be at most equal to the offset of the consumer")
210                                         .isLessThanOrEqualTo(expected);
211                         isOffsetBehindSeen.add(offset < expected);
212                 });
213
214                 assertThat(isOffsetBehindSeen.stream().reduce(false, (result, next) -> result | next))
215                                 .describedAs("Committed offsets are behind seen offsets")
216                                 .isTrue();
217         }
218
219         void checkSeenOffsetsForProgress()
220         {
221                 // Be sure, that some messages were consumed...!
222                 Set<TopicPartition> withProgress = new HashSet<>();
223                 partitions().forEach(tp ->
224                 {
225                         Long oldOffset = oldOffsets.get(tp) + 1;
226                         Long newOffset = recordHandler.seenOffsets.get(tp) + 1;
227                         if (!oldOffset.equals(newOffset))
228                         {
229                                 log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset);
230                                 withProgress.add(tp);
231                         }
232                 });
233                 assertThat(withProgress)
234                                 .describedAs("Some offsets must have changed, compared to the old offset-positions")
235                                 .isNotEmpty();
236         }
237
238
239         /** Helper methods for setting up and running the tests */
240
241         void seekToEnd()
242         {
243                 offsetConsumer.assign(partitions());
244                 offsetConsumer.seekToEnd(partitions());
245                 partitions().forEach(tp ->
246                 {
247                         // seekToEnd() works lazily: it only takes effect on poll()/position()
248                         Long offset = offsetConsumer.position(tp);
249                         log.info("New position for {}: {}", tp, offset);
250                 });
251                 // The new positions must be commited!
252                 offsetConsumer.commitSync();
253                 offsetConsumer.unsubscribe();
254         }
255
256         void doForCurrentOffsets(BiConsumer<TopicPartition, Long> consumer)
257         {
258                 offsetConsumer.assign(partitions());
259                 partitions().forEach(tp -> consumer.accept(tp, offsetConsumer.position(tp)));
260                 offsetConsumer.unsubscribe();
261         }
262
263         List<TopicPartition> partitions()
264         {
265                 return
266                                 IntStream
267                                                 .range(0, PARTITIONS)
268                                                 .mapToObj(partition -> new TopicPartition(TOPIC, partition))
269                                                 .collect(Collectors.toList());
270         }
271
272
273         public interface RecordGenerator
274         {
275                 void generate(
276                                 boolean poisonPills,
277                                 boolean logicErrors,
278                                 Consumer<ProducerRecord<Bytes, Bytes>> messageSender);
279
280                 int getNumberOfMessages();
281                 int getNumberOfPoisonPills();
282                 int getNumberOfLogicErrors();
283
284                 default boolean canGeneratePoisonPill()
285                 {
286                         return true;
287                 }
288
289                 default boolean canGenerateLogicError()
290                 {
291                         return true;
292                 }
293
294                 default void assertBusinessLogic()
295                 {
296                         log.debug("No business-logic to assert");
297                 }
298         }
299
300         void sendMessage(ProducerRecord<Bytes, Bytes> record)
301         {
302                 testRecordProducer.send(record, (metadata, e) ->
303                 {
304                         if (metadata != null)
305                         {
306                                 log.debug(
307                                                 "{}|{} - {}={}",
308                                                 metadata.partition(),
309                                                 metadata.offset(),
310                                                 record.key(),
311                                                 record.value());
312                         }
313                         else
314                         {
315                                 log.warn(
316                                                 "Exception for {}={}: {}",
317                                                 record.key(),
318                                                 record.value(),
319                                                 e.toString());
320                         }
321                 });
322         }
323
324
325         @BeforeEach
326         public void init()
327         {
328                 Properties props;
329                 props = new Properties();
330                 props.put("bootstrap.servers", kafkaProperties.getBootstrapServers());
331                 props.put("linger.ms", 100);
332                 props.put("key.serializer", BytesSerializer.class.getName());
333                 props.put("value.serializer", BytesSerializer.class.getName());
334                 testRecordProducer = new KafkaProducer<>(props);
335
336                 props = new Properties();
337                 props.put("bootstrap.servers", kafkaProperties.getBootstrapServers());
338                 props.put("client.id", "OFFSET-CONSUMER");
339                 props.put("group.id", kafkaProperties.getConsumer().getGroupId());
340                 props.put("key.deserializer", BytesDeserializer.class.getName());
341                 props.put("value.deserializer", BytesDeserializer.class.getName());
342                 offsetConsumer = new KafkaConsumer<>(props);
343
344                 mongoClient.getDatabase(mongoProperties.getDatabase()).drop();
345                 seekToEnd();
346
347                 oldOffsets = new HashMap<>();
348                 recordHandler.seenOffsets = new HashMap<>();
349                 recordHandler.receivedMessages = 0;
350
351                 doForCurrentOffsets((tp, offset) ->
352                 {
353                         oldOffsets.put(tp, offset - 1);
354                         recordHandler.seenOffsets.put(tp, offset - 1);
355                 });
356
357                 endlessConsumer.start();
358         }
359
360         @AfterEach
361         public void deinit()
362         {
363                 try
364                 {
365                         endlessConsumer.stop();
366                 }
367                 catch (Exception e)
368                 {
369                         log.debug("{}", e.toString());
370                 }
371
372                 try
373                 {
374                         testRecordProducer.close();
375                         offsetConsumer.close();
376                 }
377                 catch (Exception e)
378                 {
379                         log.info("Exception while stopping the consumer: {}", e.toString());
380                 }
381         }
382
383
384         @TestConfiguration
385         @Import(ApplicationConfiguration.class)
386         public static class Configuration
387         {
388                 @Bean
389                 public RecordHandler recordHandler(RecordHandler applicationRecordHandler)
390                 {
391                         return new TestRecordHandler(applicationRecordHandler);
392                 }
393
394                 @Bean(destroyMethod = "close")
395                 public org.apache.kafka.clients.consumer.Consumer<String, Message> kafkaConsumer(ConsumerFactory<String, Message> factory)
396                 {
397                         return factory.createConsumer();
398                 }
399
400                 @Bean
401                 public DeadLetterTopicConsumer deadLetterTopicConsumer()
402                 {
403                         return new DeadLetterTopicConsumer();
404                 }
405         }
406 }