ROT: Korrigierten/Verbesserten Test und Überarbeitetes Setup gemerged
[demos/kafka/training] / src / test / java / de / juplo / kafka / GenericApplicationTests.java
1 package de.juplo.kafka;
2
3 import com.mongodb.client.MongoClient;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.clients.consumer.ConsumerRecord;
6 import org.apache.kafka.clients.consumer.KafkaConsumer;
7 import org.apache.kafka.clients.producer.KafkaProducer;
8 import org.apache.kafka.clients.producer.ProducerRecord;
9 import org.apache.kafka.common.TopicPartition;
10 import org.apache.kafka.common.errors.RecordDeserializationException;
11 import org.apache.kafka.common.serialization.*;
12 import org.apache.kafka.common.utils.Bytes;
13 import org.junit.jupiter.api.*;
14 import org.springframework.beans.factory.annotation.Autowired;
15 import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
16 import org.springframework.boot.autoconfigure.mongo.MongoProperties;
17 import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo;
18 import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer;
19 import org.springframework.boot.test.context.TestConfiguration;
20 import org.springframework.context.annotation.Import;
21 import org.springframework.kafka.test.context.EmbeddedKafka;
22 import org.springframework.test.context.TestPropertySource;
23 import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
24
25 import java.time.Duration;
26 import java.util.*;
27 import java.util.concurrent.ExecutorService;
28 import java.util.function.BiConsumer;
29 import java.util.function.Consumer;
30 import java.util.stream.Collectors;
31 import java.util.stream.IntStream;
32
33 import static de.juplo.kafka.GenericApplicationTests.PARTITIONS;
34 import static de.juplo.kafka.GenericApplicationTests.TOPIC;
35 import static org.assertj.core.api.Assertions.*;
36 import static org.awaitility.Awaitility.*;
37
38
39 @SpringJUnitConfig(initializers = ConfigDataApplicationContextInitializer.class)
40 @TestPropertySource(
41                 properties = {
42                                 "sumup.adder.bootstrap-server=${spring.embedded.kafka.brokers}",
43                                 "sumup.adder.topic=" + TOPIC,
44                                 "sumup.adder.commit-interval=500ms",
45                                 "spring.mongodb.embedded.version=4.4.13" })
46 @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
47 @EnableAutoConfiguration
48 @AutoConfigureDataMongo
49 @Slf4j
50 abstract class GenericApplicationTests<K, V>
51 {
52         public static final String TOPIC = "FOO";
53         public static final int PARTITIONS = 10;
54
55
56         @Autowired
57         KafkaConsumer<K, V> kafkaConsumer;
58         @Autowired
59         Consumer<ConsumerRecord<K, V>> consumer;
60         @Autowired
61         ApplicationProperties properties;
62         @Autowired
63         ExecutorService executor;
64         @Autowired
65         StateRepository stateRepository;
66         @Autowired
67         MongoClient mongoClient;
68         @Autowired
69         MongoProperties mongoProperties;
70         @Autowired
71         PollIntervalAwareConsumerRebalanceListener rebalanceListener;
72         @Autowired
73         RecordHandler<K, V> recordHandler;
74
75         KafkaProducer<Bytes, Bytes> testRecordProducer;
76         KafkaConsumer<Bytes, Bytes> offsetConsumer;
77         EndlessConsumer<K, V> endlessConsumer;
78         Map<TopicPartition, Long> oldOffsets;
79         Map<TopicPartition, Long> newOffsets;
80         Set<ConsumerRecord<K, V>> receivedRecords;
81
82
83         final RecordGenerator recordGenerator;
84         final Consumer<ProducerRecord<Bytes, Bytes>> messageSender;
85
86         public GenericApplicationTests(RecordGenerator recordGenerator)
87         {
88                 this.recordGenerator = recordGenerator;
89                 this.messageSender = (record) -> sendMessage(record);
90         }
91
92
93         /** Tests methods */
94
95         @Test
96         void commitsCurrentOffsetsOnSuccess()
97         {
98                 int numberOfGeneratedMessages =
99                                 recordGenerator.generate(false, false, messageSender);
100
101                 await(numberOfGeneratedMessages + " records received")
102                                 .atMost(Duration.ofSeconds(30))
103                                 .pollInterval(Duration.ofSeconds(1))
104                                 .until(() -> receivedRecords.size() >= numberOfGeneratedMessages);
105
106                 await("Offsets committed")
107                                 .atMost(Duration.ofSeconds(10))
108                                 .pollInterval(Duration.ofSeconds(1))
109                                 .untilAsserted(() ->
110                                 {
111                                         checkSeenOffsetsForProgress();
112                                         compareToCommitedOffsets(newOffsets);
113                                 });
114
115                 assertThatExceptionOfType(IllegalStateException.class)
116                                 .isThrownBy(() -> endlessConsumer.exitStatus())
117                                 .describedAs("Consumer should still be running");
118
119                 recordGenerator.assertBusinessLogic();
120         }
121
122         @Test
123         @SkipWhenErrorCannotBeGenerated(poisonPill = true)
124         void commitsOffsetOfErrorForReprocessingOnDeserializationError()
125         {
126                 int numberOfGeneratedMessages =
127                                 recordGenerator.generate(true, false, messageSender);
128
129                 await("Consumer failed")
130                                 .atMost(Duration.ofSeconds(30))
131                                 .pollInterval(Duration.ofSeconds(1))
132                                 .until(() -> !endlessConsumer.running());
133
134                 checkSeenOffsetsForProgress();
135                 compareToCommitedOffsets(newOffsets);
136
137                 endlessConsumer.start();
138                 await("Consumer failed")
139                                 .atMost(Duration.ofSeconds(30))
140                                 .pollInterval(Duration.ofSeconds(1))
141                                 .until(() -> !endlessConsumer.running());
142
143                 checkSeenOffsetsForProgress();
144                 compareToCommitedOffsets(newOffsets);
145                 assertThat(receivedRecords.size())
146                                 .describedAs("Received not all sent events")
147                                 .isLessThan(numberOfGeneratedMessages);
148
149                 assertThatNoException()
150                                 .describedAs("Consumer should not be running")
151                                 .isThrownBy(() -> endlessConsumer.exitStatus());
152                 assertThat(endlessConsumer.exitStatus())
153                                 .describedAs("Consumer should have exited abnormally")
154                                 .containsInstanceOf(RecordDeserializationException.class);
155
156                 recordGenerator.assertBusinessLogic();
157         }
158
159         @Test
160         @SkipWhenErrorCannotBeGenerated(logicError = true)
161         void doesNotCommitOffsetsOnLogicError()
162         {
163                 int numberOfGeneratedMessages =
164                                 recordGenerator.generate(false, true, messageSender);
165
166                 await("Consumer failed")
167                                 .atMost(Duration.ofSeconds(30))
168                                 .pollInterval(Duration.ofSeconds(1))
169                                 .until(() -> !endlessConsumer.running());
170
171                 checkSeenOffsetsForProgress();
172                 compareToCommitedOffsets(oldOffsets);
173
174                 endlessConsumer.start();
175                 await("Consumer failed")
176                                 .atMost(Duration.ofSeconds(30))
177                                 .pollInterval(Duration.ofSeconds(1))
178                                 .until(() -> !endlessConsumer.running());
179
180                 checkSeenOffsetsForProgress();
181                 compareToCommitedOffsets(oldOffsets);
182
183                 assertThatNoException()
184                                 .describedAs("Consumer should not be running")
185                                 .isThrownBy(() -> endlessConsumer.exitStatus());
186                 assertThat(endlessConsumer.exitStatus())
187                                 .describedAs("Consumer should have exited abnormally")
188                                 .containsInstanceOf(RuntimeException.class);
189
190                 recordGenerator.assertBusinessLogic();
191         }
192
193
194         /** Helper methods for the verification of expectations */
195
196         void compareToCommitedOffsets(Map<TopicPartition, Long> offsetsToCheck)
197         {
198                 doForCurrentOffsets((tp, offset) ->
199                 {
200                         Long expected = offsetsToCheck.get(tp) + 1;
201                         log.debug("Checking, if the offset for {} is {}", tp, expected);
202                         assertThat(offset)
203                                         .describedAs("Committed offset corresponds to the offset of the consumer")
204                                         .isEqualTo(expected);
205                 });
206         }
207
208         void checkSeenOffsetsForProgress()
209         {
210                 // Be sure, that some messages were consumed...!
211                 Set<TopicPartition> withProgress = new HashSet<>();
212                 partitions().forEach(tp ->
213                 {
214                         Long oldOffset = oldOffsets.get(tp) + 1;
215                         Long newOffset = newOffsets.get(tp) + 1;
216                         if (!oldOffset.equals(newOffset))
217                         {
218                                 log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset);
219                                 withProgress.add(tp);
220                         }
221                 });
222                 assertThat(withProgress)
223                                 .describedAs("Some offsets must have changed, compared to the old offset-positions")
224                                 .isNotEmpty();
225         }
226
227
228         /** Helper methods for setting up and running the tests */
229
230         void seekToEnd()
231         {
232                 offsetConsumer.assign(partitions());
233                 partitions().forEach(tp ->
234                 {
235                         Long offset = offsetConsumer.position(tp);
236                         log.info("New position for {}: {}", tp, offset);
237                         Integer partition = tp.partition();
238                         StateDocument document =
239                                         stateRepository
240                                                         .findById(partition.toString())
241                                                         .orElse(new StateDocument(partition));
242                         document.offset = offset;
243                         stateRepository.save(document);
244                 });
245                 offsetConsumer.unsubscribe();
246         }
247
248         void doForCurrentOffsets(BiConsumer<TopicPartition, Long> consumer)
249         {
250                 partitions().forEach(tp ->
251                 {
252                         String partition = Integer.toString(tp.partition());
253                         Optional<Long> offset = stateRepository.findById(partition).map(document -> document.offset);
254                         consumer.accept(tp, offset.orElse(0l));
255                 });
256         }
257
258         List<TopicPartition> partitions()
259         {
260                 return
261                                 IntStream
262                                                 .range(0, PARTITIONS)
263                                                 .mapToObj(partition -> new TopicPartition(TOPIC, partition))
264                                                 .collect(Collectors.toList());
265         }
266
267
268         public interface RecordGenerator
269         {
270                 int generate(
271                                 boolean poisonPills,
272                                 boolean logicErrors,
273                                 Consumer<ProducerRecord<Bytes, Bytes>> messageSender);
274
275                 default boolean canGeneratePoisonPill()
276                 {
277                         return true;
278                 }
279
280                 default boolean canGenerateLogicError()
281                 {
282                         return true;
283                 }
284
285                 default void assertBusinessLogic()
286                 {
287                         log.debug("No business-logic to assert");
288                 }
289         }
290
291         void sendMessage(ProducerRecord<Bytes, Bytes> record)
292         {
293                 testRecordProducer.send(record, (metadata, e) ->
294                 {
295                         if (metadata != null)
296                         {
297                                 log.debug(
298                                                 "{}|{} - {}={}",
299                                                 metadata.partition(),
300                                                 metadata.offset(),
301                                                 record.key(),
302                                                 record.value());
303                         }
304                         else
305                         {
306                                 log.warn(
307                                                 "Exception for {}={}: {}",
308                                                 record.key(),
309                                                 record.value(),
310                                                 e.toString());
311                         }
312                 });
313         }
314
315
316         @BeforeEach
317         public void init()
318         {
319                 Properties props;
320                 props = new Properties();
321                 props.put("bootstrap.servers", properties.getBootstrapServer());
322                 props.put("linger.ms", 100);
323                 props.put("key.serializer", BytesSerializer.class.getName());
324                 props.put("value.serializer", BytesSerializer.class.getName());
325                 testRecordProducer = new KafkaProducer<>(props);
326
327                 props = new Properties();
328                 props.put("bootstrap.servers", properties.getBootstrapServer());
329                 props.put("client.id", "OFFSET-CONSUMER");
330                 props.put("group.id", properties.getGroupId());
331                 props.put("key.deserializer", BytesDeserializer.class.getName());
332                 props.put("value.deserializer", BytesDeserializer.class.getName());
333                 offsetConsumer = new KafkaConsumer<>(props);
334
335                 mongoClient.getDatabase(mongoProperties.getDatabase()).drop();
336                 seekToEnd();
337
338                 oldOffsets = new HashMap<>();
339                 newOffsets = new HashMap<>();
340                 receivedRecords = new HashSet<>();
341
342                 doForCurrentOffsets((tp, offset) ->
343                 {
344                         oldOffsets.put(tp, offset - 1);
345                         newOffsets.put(tp, offset - 1);
346                 });
347
348                 TestRecordHandler<K, V> captureOffsetAndExecuteTestHandler =
349                                 new TestRecordHandler<K, V>(recordHandler)
350                                 {
351                                         @Override
352                                         public void onNewRecord(ConsumerRecord<K, V> record)
353                                         {
354                                                 newOffsets.put(
355                                                                 new TopicPartition(record.topic(), record.partition()),
356                                                                 record.offset());
357                                                 receivedRecords.add(record);
358                                         }
359                                 };
360
361                 endlessConsumer =
362                                 new EndlessConsumer<>(
363                                                 executor,
364                                                 properties.getClientId(),
365                                                 properties.getTopic(),
366                                                 kafkaConsumer,
367                                                 rebalanceListener,
368                                                 captureOffsetAndExecuteTestHandler);
369
370                 endlessConsumer.start();
371         }
372
373         @AfterEach
374         public void deinit()
375         {
376                 try
377                 {
378                         endlessConsumer.stop();
379                         testRecordProducer.close();
380                         offsetConsumer.close();
381                 }
382                 catch (Exception e)
383                 {
384                         log.info("Exception while stopping the consumer: {}", e.toString());
385                 }
386         }
387
388
389         @TestConfiguration
390         @Import(ApplicationConfiguration.class)
391         public static class Configuration
392         {
393         }
394 }