Backport von Verbesserungen / Erweiterungen der Tests:
[demos/kafka/training] / src / test / java / de / juplo / kafka / GenericApplicationTests.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
5 import org.apache.kafka.clients.consumer.ConsumerRecord;
6 import org.apache.kafka.clients.consumer.KafkaConsumer;
7 import org.apache.kafka.clients.producer.KafkaProducer;
8 import org.apache.kafka.clients.producer.ProducerRecord;
9 import org.apache.kafka.common.TopicPartition;
10 import org.apache.kafka.common.errors.RecordDeserializationException;
11 import org.apache.kafka.common.serialization.*;
12 import org.apache.kafka.common.utils.Bytes;
13 import org.junit.jupiter.api.*;
14 import org.springframework.beans.factory.annotation.Autowired;
15 import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer;
16 import org.springframework.boot.test.context.TestConfiguration;
17 import org.springframework.context.annotation.Import;
18 import org.springframework.kafka.test.context.EmbeddedKafka;
19 import org.springframework.test.context.TestPropertySource;
20 import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
21
22 import java.time.Duration;
23 import java.util.*;
24 import java.util.concurrent.ExecutorService;
25 import java.util.function.BiConsumer;
26 import java.util.function.Consumer;
27 import java.util.stream.Collectors;
28 import java.util.stream.IntStream;
29
30 import static de.juplo.kafka.GenericApplicationTests.PARTITIONS;
31 import static de.juplo.kafka.GenericApplicationTests.TOPIC;
32 import static org.assertj.core.api.Assertions.*;
33 import static org.awaitility.Awaitility.*;
34
35
36 @SpringJUnitConfig(initializers = ConfigDataApplicationContextInitializer.class)
37 @TestPropertySource(
38                 properties = {
39                                 "consumer.bootstrap-server=${spring.embedded.kafka.brokers}",
40                                 "consumer.topic=" + TOPIC,
41                                 "consumer.commit-interval=500ms" })
42 @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
43 @Slf4j
44 abstract class GenericApplicationTests<K, V>
45 {
46         public static final String TOPIC = "FOO";
47         public static final int PARTITIONS = 10;
48
49
50         @Autowired
51         org.apache.kafka.clients.consumer.Consumer<K, V> kafkaConsumer;
52         @Autowired
53         Consumer<ConsumerRecord<K, V>> consumer;
54         @Autowired
55         ApplicationProperties applicationProperties;
56         @Autowired
57         ExecutorService executor;
58         @Autowired
59         ConsumerRebalanceListener rebalanceListener;
60         @Autowired
61         RecordHandler<K, V> recordHandler;
62
63         KafkaProducer<Bytes, Bytes> testRecordProducer;
64         KafkaConsumer<Bytes, Bytes> offsetConsumer;
65         EndlessConsumer<K, V> endlessConsumer;
66         Map<TopicPartition, Long> oldOffsets;
67         Map<TopicPartition, Long> seenOffsets;
68         Set<ConsumerRecord<K, V>> receivedRecords;
69
70
71         final RecordGenerator recordGenerator;
72         final Consumer<ProducerRecord<Bytes, Bytes>> messageSender;
73
74         public GenericApplicationTests(RecordGenerator recordGenerator)
75         {
76                 this.recordGenerator = recordGenerator;
77                 this.messageSender = (record) -> sendMessage(record);
78         }
79
80
81         /** Tests methods */
82
83         @Test
84         void commitsCurrentOffsetsOnSuccess() throws Exception
85         {
86                 int numberOfGeneratedMessages =
87                                 recordGenerator.generate(false, false, messageSender);
88
89                 await(numberOfGeneratedMessages + " records received")
90                                 .atMost(Duration.ofSeconds(30))
91                                 .pollInterval(Duration.ofSeconds(1))
92                                 .until(() -> receivedRecords.size() >= numberOfGeneratedMessages);
93
94                 await("Offsets committed")
95                                 .atMost(Duration.ofSeconds(10))
96                                 .pollInterval(Duration.ofSeconds(1))
97                                 .untilAsserted(() ->
98                                 {
99                                         checkSeenOffsetsForProgress();
100                                         assertSeenOffsetsEqualCommittedOffsets(seenOffsets);
101                                 });
102
103                 assertThatExceptionOfType(IllegalStateException.class)
104                                 .isThrownBy(() -> endlessConsumer.exitStatus())
105                                 .describedAs("Consumer should still be running");
106
107                 endlessConsumer.stop();
108                 recordGenerator.assertBusinessLogic();
109         }
110
111         @Test
112         @SkipWhenErrorCannotBeGenerated(poisonPill = true)
113         void commitsOffsetOfErrorForReprocessingOnDeserializationError()
114         {
115                 int numberOfGeneratedMessages =
116                                 recordGenerator.generate(true, false, messageSender);
117
118                 await("Consumer failed")
119                                 .atMost(Duration.ofSeconds(30))
120                                 .pollInterval(Duration.ofSeconds(1))
121                                 .until(() -> !endlessConsumer.running());
122
123                 checkSeenOffsetsForProgress();
124                 assertSeenOffsetsEqualCommittedOffsets(seenOffsets);
125
126                 endlessConsumer.start();
127                 await("Consumer failed")
128                                 .atMost(Duration.ofSeconds(30))
129                                 .pollInterval(Duration.ofSeconds(1))
130                                 .until(() -> !endlessConsumer.running());
131
132                 checkSeenOffsetsForProgress();
133                 assertSeenOffsetsEqualCommittedOffsets(seenOffsets);
134                 assertThat(receivedRecords.size())
135                                 .describedAs("Received not all sent events")
136                                 .isLessThan(numberOfGeneratedMessages);
137
138                 assertThatNoException()
139                                 .describedAs("Consumer should not be running")
140                                 .isThrownBy(() -> endlessConsumer.exitStatus());
141                 assertThat(endlessConsumer.exitStatus())
142                                 .describedAs("Consumer should have exited abnormally")
143                                 .containsInstanceOf(RecordDeserializationException.class);
144
145                 recordGenerator.assertBusinessLogic();
146         }
147
148         @Test
149         @SkipWhenErrorCannotBeGenerated(logicError = true)
150         void doesNotCommitOffsetsOnLogicError()
151         {
152                 int numberOfGeneratedMessages =
153                                 recordGenerator.generate(false, true, messageSender);
154
155                 await("Consumer failed")
156                                 .atMost(Duration.ofSeconds(30))
157                                 .pollInterval(Duration.ofSeconds(1))
158                                 .until(() -> !endlessConsumer.running());
159
160                 checkSeenOffsetsForProgress();
161                 assertSeenOffsetsAreBehindCommittedOffsets(seenOffsets);
162
163                 endlessConsumer.start();
164                 await("Consumer failed")
165                                 .atMost(Duration.ofSeconds(30))
166                                 .pollInterval(Duration.ofSeconds(1))
167                                 .until(() -> !endlessConsumer.running());
168
169                 assertSeenOffsetsAreBehindCommittedOffsets(seenOffsets);
170
171                 assertThatNoException()
172                                 .describedAs("Consumer should not be running")
173                                 .isThrownBy(() -> endlessConsumer.exitStatus());
174                 assertThat(endlessConsumer.exitStatus())
175                                 .describedAs("Consumer should have exited abnormally")
176                                 .containsInstanceOf(RuntimeException.class);
177
178                 recordGenerator.assertBusinessLogic();
179         }
180
181
182         /** Helper methods for the verification of expectations */
183
184         void assertSeenOffsetsEqualCommittedOffsets(Map<TopicPartition, Long> offsetsToCheck)
185         {
186                 doForCurrentOffsets((tp, offset) ->
187                 {
188                         Long expected = offsetsToCheck.get(tp) + 1;
189                         log.debug("Checking, if the offset {} for {} is exactly {}", offset, tp, expected);
190                         assertThat(offset)
191                                         .describedAs("Committed offset corresponds to the offset of the consumer")
192                                         .isEqualTo(expected);
193                 });
194         }
195
196         void assertSeenOffsetsAreBehindCommittedOffsets(Map<TopicPartition, Long> offsetsToCheck)
197         {
198                 List<Boolean> isOffsetBehindSeen = new LinkedList<>();
199
200                 doForCurrentOffsets((tp, offset) ->
201                 {
202                         Long expected = offsetsToCheck.get(tp) + 1;
203                         log.debug("Checking, if the offset {} for {} is at most {}", offset, tp, expected);
204                         assertThat(offset)
205                                         .describedAs("Committed offset must be at most equal to the offset of the consumer")
206                                         .isLessThanOrEqualTo(expected);
207                         isOffsetBehindSeen.add(offset < expected);
208                 });
209
210                 assertThat(isOffsetBehindSeen.stream().reduce(false, (result, next) -> result | next))
211                                 .describedAs("Committed offsets are behind seen offsets")
212                                 .isTrue();
213         }
214
215         void checkSeenOffsetsForProgress()
216         {
217                 // Be sure, that some messages were consumed...!
218                 Set<TopicPartition> withProgress = new HashSet<>();
219                 partitions().forEach(tp ->
220                 {
221                         Long oldOffset = oldOffsets.get(tp) + 1;
222                         Long newOffset = seenOffsets.get(tp) + 1;
223                         if (!oldOffset.equals(newOffset))
224                         {
225                                 log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset);
226                                 withProgress.add(tp);
227                         }
228                 });
229                 assertThat(withProgress)
230                                 .describedAs("Some offsets must have changed, compared to the old offset-positions")
231                                 .isNotEmpty();
232         }
233
234
235         /** Helper methods for setting up and running the tests */
236
237         void seekToEnd()
238         {
239                 offsetConsumer.assign(partitions());
240                 offsetConsumer.seekToEnd(partitions());
241                 partitions().forEach(tp ->
242                 {
243                         // seekToEnd() works lazily: it only takes effect on poll()/position()
244                         Long offset = offsetConsumer.position(tp);
245                         log.info("New position for {}: {}", tp, offset);
246                 });
247                 // The new positions must be commited!
248                 offsetConsumer.commitSync();
249                 offsetConsumer.unsubscribe();
250         }
251
252         void doForCurrentOffsets(BiConsumer<TopicPartition, Long> consumer)
253         {
254                 offsetConsumer.assign(partitions());
255                 partitions().forEach(tp -> consumer.accept(tp, offsetConsumer.position(tp)));
256                 offsetConsumer.unsubscribe();
257         }
258
259         List<TopicPartition> partitions()
260         {
261                 return
262                                 IntStream
263                                                 .range(0, PARTITIONS)
264                                                 .mapToObj(partition -> new TopicPartition(TOPIC, partition))
265                                                 .collect(Collectors.toList());
266         }
267
268
269         public interface RecordGenerator
270         {
271                 int generate(
272                                 boolean poisonPills,
273                                 boolean logicErrors,
274                                 Consumer<ProducerRecord<Bytes, Bytes>> messageSender);
275
276                 default boolean canGeneratePoisonPill()
277                 {
278                         return true;
279                 }
280
281                 default boolean canGenerateLogicError()
282                 {
283                         return true;
284                 }
285
286                 default void assertBusinessLogic()
287                 {
288                         log.debug("No business-logic to assert");
289                 }
290         }
291
292         void sendMessage(ProducerRecord<Bytes, Bytes> record)
293         {
294                 testRecordProducer.send(record, (metadata, e) ->
295                 {
296                         if (metadata != null)
297                         {
298                                 log.debug(
299                                                 "{}|{} - {}={}",
300                                                 metadata.partition(),
301                                                 metadata.offset(),
302                                                 record.key(),
303                                                 record.value());
304                         }
305                         else
306                         {
307                                 log.warn(
308                                                 "Exception for {}={}: {}",
309                                                 record.key(),
310                                                 record.value(),
311                                                 e.toString());
312                         }
313                 });
314         }
315
316
317         @BeforeEach
318         public void init()
319         {
320                 Properties props;
321                 props = new Properties();
322                 props.put("bootstrap.servers", applicationProperties.getBootstrapServer());
323                 props.put("linger.ms", 100);
324                 props.put("key.serializer", BytesSerializer.class.getName());
325                 props.put("value.serializer", BytesSerializer.class.getName());
326                 testRecordProducer = new KafkaProducer<>(props);
327
328                 props = new Properties();
329                 props.put("bootstrap.servers", applicationProperties.getBootstrapServer());
330                 props.put("client.id", "OFFSET-CONSUMER");
331                 props.put("group.id", applicationProperties.getGroupId());
332                 props.put("key.deserializer", BytesDeserializer.class.getName());
333                 props.put("value.deserializer", BytesDeserializer.class.getName());
334                 offsetConsumer = new KafkaConsumer<>(props);
335
336                 seekToEnd();
337
338                 oldOffsets = new HashMap<>();
339                 seenOffsets = new HashMap<>();
340                 receivedRecords = new HashSet<>();
341
342                 doForCurrentOffsets((tp, offset) ->
343                 {
344                         oldOffsets.put(tp, offset - 1);
345                         seenOffsets.put(tp, offset - 1);
346                 });
347
348                 TestRecordHandler<K, V> captureOffsetAndExecuteTestHandler =
349                                 new TestRecordHandler<K, V>(recordHandler)
350                                 {
351                                         @Override
352                                         public void onNewRecord(ConsumerRecord<K, V> record)
353                                         {
354                                                 seenOffsets.put(
355                                                                 new TopicPartition(record.topic(), record.partition()),
356                                                                 record.offset());
357                                                 receivedRecords.add(record);
358                                                 consumer.accept(record);
359                                         }
360                                 };
361
362                 endlessConsumer =
363                                 new EndlessConsumer<>(
364                                                 executor,
365                                                 applicationProperties.getClientId(),
366                                                 applicationProperties.getTopic(),
367                                                 kafkaConsumer,
368                                                 captureOffsetAndExecuteTestHandler);
369
370                 endlessConsumer.start();
371         }
372
373         @AfterEach
374         public void deinit()
375         {
376                 try
377                 {
378                         testRecordProducer.close();
379                         offsetConsumer.close();
380                 }
381                 catch (Exception e)
382                 {
383                         log.info("Exception while stopping the consumer: {}", e.toString());
384                 }
385         }
386
387
388         @TestConfiguration
389         @Import(ApplicationConfiguration.class)
390         public static class Configuration
391         {
392         }
393 }