21c3f7f01c524eb64ad88d204b9b7a82e4a65e22
[demos/kafka/training] / src / test / java / de / juplo / kafka / GenericApplicationTests.java
1 package de.juplo.kafka;
2
3 import com.mongodb.client.MongoClient;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
6 import org.apache.kafka.clients.consumer.ConsumerRecord;
7 import org.apache.kafka.clients.consumer.KafkaConsumer;
8 import org.apache.kafka.clients.producer.KafkaProducer;
9 import org.apache.kafka.clients.producer.ProducerRecord;
10 import org.apache.kafka.common.TopicPartition;
11 import org.apache.kafka.common.errors.RecordDeserializationException;
12 import org.apache.kafka.common.serialization.*;
13 import org.apache.kafka.common.utils.Bytes;
14 import org.junit.jupiter.api.*;
15 import org.springframework.beans.factory.annotation.Autowired;
16 import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
17 import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
18 import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
19 import org.springframework.boot.autoconfigure.mongo.MongoProperties;
20 import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo;
21 import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer;
22 import org.springframework.boot.test.context.TestConfiguration;
23 import org.springframework.context.annotation.Import;
24 import org.springframework.kafka.test.context.EmbeddedKafka;
25 import org.springframework.test.context.TestPropertySource;
26 import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
27
28 import java.time.Duration;
29 import java.util.*;
30 import java.util.concurrent.ExecutorService;
31 import java.util.function.BiConsumer;
32 import java.util.function.Consumer;
33 import java.util.stream.Collectors;
34 import java.util.stream.IntStream;
35
36 import static de.juplo.kafka.GenericApplicationTests.PARTITIONS;
37 import static de.juplo.kafka.GenericApplicationTests.TOPIC;
38 import static org.assertj.core.api.Assertions.*;
39 import static org.awaitility.Awaitility.*;
40
41
42 @SpringJUnitConfig(
43   initializers = ConfigDataApplicationContextInitializer.class,
44   classes = {
45       KafkaAutoConfiguration.class,
46       ApplicationTests.Configuration.class })
47 @TestPropertySource(
48                 properties = {
49                                 "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
50                                 "sumup.adder.topic=" + TOPIC,
51                                 "spring.kafka.consumer.auto-commit-interval=500ms",
52                                 "spring.mongodb.embedded.version=4.4.13" })
53 @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
54 @EnableAutoConfiguration
55 @AutoConfigureDataMongo
56 @Slf4j
57 abstract class GenericApplicationTests<K, V>
58 {
59         public static final String TOPIC = "FOO";
60         public static final int PARTITIONS = 10;
61
62
63         @Autowired
64         org.apache.kafka.clients.consumer.Consumer<K, V> kafkaConsumer;
65         @Autowired
66         Consumer<ConsumerRecord<K, V>> consumer;
67         @Autowired
68         ApplicationProperties applicationProperties;
69         @Autowired
70         KafkaProperties kafkaProperties;
71         @Autowired
72         ExecutorService executor;
73         @Autowired
74         MongoClient mongoClient;
75         @Autowired
76         MongoProperties mongoProperties;
77         @Autowired
78         ConsumerRebalanceListener rebalanceListener;
79         @Autowired
80         RecordHandler<K, V> recordHandler;
81
82         KafkaProducer<Bytes, Bytes> testRecordProducer;
83         KafkaConsumer<Bytes, Bytes> offsetConsumer;
84         EndlessConsumer<K, V> endlessConsumer;
85         Map<TopicPartition, Long> oldOffsets;
86         Map<TopicPartition, Long> seenOffsets;
87         Set<ConsumerRecord<K, V>> receivedRecords;
88
89
90         final RecordGenerator recordGenerator;
91         final Consumer<ProducerRecord<Bytes, Bytes>> messageSender;
92
93         public GenericApplicationTests(RecordGenerator recordGenerator)
94         {
95                 this.recordGenerator = recordGenerator;
96                 this.messageSender = (record) -> sendMessage(record);
97         }
98
99
100         /** Tests methods */
101
102         @Test
103         void commitsCurrentOffsetsOnSuccess() throws Exception
104         {
105                 int numberOfGeneratedMessages =
106                                 recordGenerator.generate(false, false, messageSender);
107
108                 await(numberOfGeneratedMessages + " records received")
109                                 .atMost(Duration.ofSeconds(30))
110                                 .pollInterval(Duration.ofSeconds(1))
111                                 .until(() -> receivedRecords.size() >= numberOfGeneratedMessages);
112
113                 await("Offsets committed")
114                                 .atMost(Duration.ofSeconds(10))
115                                 .pollInterval(Duration.ofSeconds(1))
116                                 .untilAsserted(() ->
117                                 {
118                                         checkSeenOffsetsForProgress();
119                                         assertSeenOffsetsEqualCommittedOffsets(seenOffsets);
120                                 });
121
122                 assertThatExceptionOfType(IllegalStateException.class)
123                                 .isThrownBy(() -> endlessConsumer.exitStatus())
124                                 .describedAs("Consumer should still be running");
125
126                 endlessConsumer.stop();
127                 recordGenerator.assertBusinessLogic();
128         }
129
130         @Test
131         @SkipWhenErrorCannotBeGenerated(poisonPill = true)
132         void commitsOffsetOfErrorForReprocessingOnDeserializationError()
133         {
134                 int numberOfGeneratedMessages =
135                                 recordGenerator.generate(true, false, messageSender);
136
137                 await("Consumer failed")
138                                 .atMost(Duration.ofSeconds(30))
139                                 .pollInterval(Duration.ofSeconds(1))
140                                 .until(() -> !endlessConsumer.running());
141
142                 checkSeenOffsetsForProgress();
143                 assertSeenOffsetsEqualCommittedOffsets(seenOffsets);
144
145                 endlessConsumer.start();
146                 await("Consumer failed")
147                                 .atMost(Duration.ofSeconds(30))
148                                 .pollInterval(Duration.ofSeconds(1))
149                                 .until(() -> !endlessConsumer.running());
150
151                 checkSeenOffsetsForProgress();
152                 assertSeenOffsetsEqualCommittedOffsets(seenOffsets);
153                 assertThat(receivedRecords.size())
154                                 .describedAs("Received not all sent events")
155                                 .isLessThan(numberOfGeneratedMessages);
156
157                 assertThatNoException()
158                                 .describedAs("Consumer should not be running")
159                                 .isThrownBy(() -> endlessConsumer.exitStatus());
160                 assertThat(endlessConsumer.exitStatus())
161                                 .describedAs("Consumer should have exited abnormally")
162                                 .containsInstanceOf(RecordDeserializationException.class);
163
164                 recordGenerator.assertBusinessLogic();
165         }
166
167         @Test
168         @SkipWhenErrorCannotBeGenerated(logicError = true)
169         void doesNotCommitOffsetsOnLogicError()
170         {
171                 int numberOfGeneratedMessages =
172                                 recordGenerator.generate(false, true, messageSender);
173
174                 await("Consumer failed")
175                                 .atMost(Duration.ofSeconds(30))
176                                 .pollInterval(Duration.ofSeconds(1))
177                                 .until(() -> !endlessConsumer.running());
178
179                 checkSeenOffsetsForProgress();
180                 assertSeenOffsetsAreBehindCommittedOffsets(seenOffsets);
181
182                 endlessConsumer.start();
183                 await("Consumer failed")
184                                 .atMost(Duration.ofSeconds(30))
185                                 .pollInterval(Duration.ofSeconds(1))
186                                 .until(() -> !endlessConsumer.running());
187
188                 assertSeenOffsetsAreBehindCommittedOffsets(seenOffsets);
189
190                 assertThatNoException()
191                                 .describedAs("Consumer should not be running")
192                                 .isThrownBy(() -> endlessConsumer.exitStatus());
193                 assertThat(endlessConsumer.exitStatus())
194                                 .describedAs("Consumer should have exited abnormally")
195                                 .containsInstanceOf(RuntimeException.class);
196
197                 recordGenerator.assertBusinessLogic();
198         }
199
200
201         /** Helper methods for the verification of expectations */
202
203         void assertSeenOffsetsEqualCommittedOffsets(Map<TopicPartition, Long> offsetsToCheck)
204         {
205                 doForCurrentOffsets((tp, offset) ->
206                 {
207                         Long expected = offsetsToCheck.get(tp) + 1;
208                         log.debug("Checking, if the offset {} for {} is exactly {}", offset, tp, expected);
209                         assertThat(offset)
210                                         .describedAs("Committed offset corresponds to the offset of the consumer")
211                                         .isEqualTo(expected);
212                 });
213         }
214
215         void assertSeenOffsetsAreBehindCommittedOffsets(Map<TopicPartition, Long> offsetsToCheck)
216         {
217                 List<Boolean> isOffsetBehindSeen = new LinkedList<>();
218
219                 doForCurrentOffsets((tp, offset) ->
220                 {
221                         Long expected = offsetsToCheck.get(tp) + 1;
222                         log.debug("Checking, if the offset {} for {} is at most {}", offset, tp, expected);
223                         assertThat(offset)
224                                         .describedAs("Committed offset must be at most equal to the offset of the consumer")
225                                         .isLessThanOrEqualTo(expected);
226                         isOffsetBehindSeen.add(offset < expected);
227                 });
228
229                 assertThat(isOffsetBehindSeen.stream().reduce(false, (result, next) -> result | next))
230                                 .describedAs("Committed offsets are behind seen offsets")
231                                 .isTrue();
232         }
233
234         void checkSeenOffsetsForProgress()
235         {
236                 // Be sure, that some messages were consumed...!
237                 Set<TopicPartition> withProgress = new HashSet<>();
238                 partitions().forEach(tp ->
239                 {
240                         Long oldOffset = oldOffsets.get(tp) + 1;
241                         Long newOffset = seenOffsets.get(tp) + 1;
242                         if (!oldOffset.equals(newOffset))
243                         {
244                                 log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset);
245                                 withProgress.add(tp);
246                         }
247                 });
248                 assertThat(withProgress)
249                                 .describedAs("Some offsets must have changed, compared to the old offset-positions")
250                                 .isNotEmpty();
251         }
252
253
254         /** Helper methods for setting up and running the tests */
255
256         void seekToEnd()
257         {
258                 offsetConsumer.assign(partitions());
259                 offsetConsumer.seekToEnd(partitions());
260                 partitions().forEach(tp ->
261                 {
262                         // seekToEnd() works lazily: it only takes effect on poll()/position()
263                         Long offset = offsetConsumer.position(tp);
264                         log.info("New position for {}: {}", tp, offset);
265                 });
266                 // The new positions must be commited!
267                 offsetConsumer.commitSync();
268                 offsetConsumer.unsubscribe();
269         }
270
271         void doForCurrentOffsets(BiConsumer<TopicPartition, Long> consumer)
272         {
273                 offsetConsumer.assign(partitions());
274                 partitions().forEach(tp -> consumer.accept(tp, offsetConsumer.position(tp)));
275                 offsetConsumer.unsubscribe();
276         }
277
278         List<TopicPartition> partitions()
279         {
280                 return
281                                 IntStream
282                                                 .range(0, PARTITIONS)
283                                                 .mapToObj(partition -> new TopicPartition(TOPIC, partition))
284                                                 .collect(Collectors.toList());
285         }
286
287
288         public interface RecordGenerator
289         {
290                 int generate(
291                                 boolean poisonPills,
292                                 boolean logicErrors,
293                                 Consumer<ProducerRecord<Bytes, Bytes>> messageSender);
294
295                 default boolean canGeneratePoisonPill()
296                 {
297                         return true;
298                 }
299
300                 default boolean canGenerateLogicError()
301                 {
302                         return true;
303                 }
304
305                 default void assertBusinessLogic()
306                 {
307                         log.debug("No business-logic to assert");
308                 }
309         }
310
311         void sendMessage(ProducerRecord<Bytes, Bytes> record)
312         {
313                 testRecordProducer.send(record, (metadata, e) ->
314                 {
315                         if (metadata != null)
316                         {
317                                 log.debug(
318                                                 "{}|{} - {}={}",
319                                                 metadata.partition(),
320                                                 metadata.offset(),
321                                                 record.key(),
322                                                 record.value());
323                         }
324                         else
325                         {
326                                 log.warn(
327                                                 "Exception for {}={}: {}",
328                                                 record.key(),
329                                                 record.value(),
330                                                 e.toString());
331                         }
332                 });
333         }
334
335
336         @BeforeEach
337         public void init()
338         {
339                 Properties props;
340                 props = new Properties();
341                 props.put("bootstrap.servers", kafkaProperties.getBootstrapServers());
342                 props.put("linger.ms", 100);
343                 props.put("key.serializer", BytesSerializer.class.getName());
344                 props.put("value.serializer", BytesSerializer.class.getName());
345                 testRecordProducer = new KafkaProducer<>(props);
346
347                 props = new Properties();
348                 props.put("bootstrap.servers", kafkaProperties.getBootstrapServers());
349                 props.put("client.id", "OFFSET-CONSUMER");
350                 props.put("group.id", kafkaProperties.getConsumer().getGroupId());
351                 props.put("key.deserializer", BytesDeserializer.class.getName());
352                 props.put("value.deserializer", BytesDeserializer.class.getName());
353                 offsetConsumer = new KafkaConsumer<>(props);
354
355                 mongoClient.getDatabase(mongoProperties.getDatabase()).drop();
356                 seekToEnd();
357
358                 oldOffsets = new HashMap<>();
359                 seenOffsets = new HashMap<>();
360                 receivedRecords = new HashSet<>();
361
362                 doForCurrentOffsets((tp, offset) ->
363                 {
364                         oldOffsets.put(tp, offset - 1);
365                         seenOffsets.put(tp, offset - 1);
366                 });
367
368                 TestRecordHandler<K, V> captureOffsetAndExecuteTestHandler =
369                                 new TestRecordHandler<K, V>(recordHandler)
370                                 {
371                                         @Override
372                                         public void onNewRecord(ConsumerRecord<K, V> record)
373                                         {
374                                                 seenOffsets.put(
375                                                                 new TopicPartition(record.topic(), record.partition()),
376                                                                 record.offset());
377                                                 receivedRecords.add(record);
378                                         }
379                                 };
380
381                 endlessConsumer =
382                                 new EndlessConsumer<>(
383                                                 executor,
384                                                 kafkaProperties.getClientId(),
385                                                 applicationProperties.getTopic(),
386                                                 kafkaConsumer,
387                                                 rebalanceListener,
388                                                 captureOffsetAndExecuteTestHandler);
389
390                 endlessConsumer.start();
391         }
392
393         @AfterEach
394         public void deinit()
395         {
396                 try
397                 {
398                         testRecordProducer.close();
399                         offsetConsumer.close();
400                 }
401                 catch (Exception e)
402                 {
403                         log.info("Exception while stopping the consumer: {}", e.toString());
404                 }
405         }
406
407
408         @TestConfiguration
409         @Import(ApplicationConfiguration.class)
410         public static class Configuration
411         {
412         }
413 }