GRÜN: Fehler in der Test-Logik korrigiert
[demos/kafka/training] / src / test / java / de / juplo / kafka / GenericApplicationTests.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.consumer.ConsumerRecord;
5 import org.apache.kafka.clients.consumer.KafkaConsumer;
6 import org.apache.kafka.clients.producer.KafkaProducer;
7 import org.apache.kafka.clients.producer.ProducerRecord;
8 import org.apache.kafka.common.TopicPartition;
9 import org.apache.kafka.common.errors.RecordDeserializationException;
10 import org.apache.kafka.common.serialization.*;
11 import org.apache.kafka.common.utils.Bytes;
12 import org.junit.jupiter.api.*;
13 import org.springframework.beans.factory.annotation.Autowired;
14 import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer;
15 import org.springframework.boot.test.context.TestConfiguration;
16 import org.springframework.context.annotation.Import;
17 import org.springframework.kafka.test.context.EmbeddedKafka;
18 import org.springframework.test.context.TestPropertySource;
19 import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
20
21 import java.time.Duration;
22 import java.util.*;
23 import java.util.concurrent.ExecutorService;
24 import java.util.function.BiConsumer;
25 import java.util.function.Consumer;
26 import java.util.stream.Collectors;
27 import java.util.stream.IntStream;
28
29 import static de.juplo.kafka.GenericApplicationTests.PARTITIONS;
30 import static de.juplo.kafka.GenericApplicationTests.TOPIC;
31 import static org.assertj.core.api.Assertions.*;
32 import static org.awaitility.Awaitility.*;
33
34
35 @SpringJUnitConfig(initializers = ConfigDataApplicationContextInitializer.class)
36 @TestPropertySource(
37                 properties = {
38                                 "consumer.bootstrap-server=${spring.embedded.kafka.brokers}",
39                                 "consumer.topic=" + TOPIC,
40                                 "consumer.commit-interval=500ms" })
41 @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
42 @Slf4j
43 abstract class GenericApplicationTests<K, V>
44 {
45         public static final String TOPIC = "FOO";
46         public static final int PARTITIONS = 10;
47
48
49         @Autowired
50         KafkaConsumer<K, V> kafkaConsumer;
51         @Autowired
52         Consumer<ConsumerRecord<K, V>> consumer;
53         @Autowired
54         ApplicationProperties properties;
55         @Autowired
56         ExecutorService executor;
57
58         KafkaProducer<Bytes, Bytes> testRecordProducer;
59         KafkaConsumer<Bytes, Bytes> offsetConsumer;
60         EndlessConsumer<K, V> endlessConsumer;
61         Map<TopicPartition, Long> oldOffsets;
62         Map<TopicPartition, Long> seenOffsets;
63         Set<ConsumerRecord<K, V>> receivedRecords;
64
65
66         final RecordGenerator recordGenerator;
67         final Consumer<ProducerRecord<Bytes, Bytes>> messageSender;
68
69         public GenericApplicationTests(RecordGenerator recordGenerator)
70         {
71                 this.recordGenerator = recordGenerator;
72                 this.messageSender = (record) -> sendMessage(record);
73         }
74
75
76         /** Tests methods */
77
78         @Test
79         void commitsCurrentOffsetsOnSuccess()
80         {
81                 int numberOfGeneratedMessages =
82                                 recordGenerator.generate(false, false, messageSender);
83
84                 await(numberOfGeneratedMessages + " records received")
85                                 .atMost(Duration.ofSeconds(30))
86                                 .pollInterval(Duration.ofSeconds(1))
87                                 .until(() -> receivedRecords.size() >= numberOfGeneratedMessages);
88
89                 await("Offsets committed")
90                                 .atMost(Duration.ofSeconds(10))
91                                 .pollInterval(Duration.ofSeconds(1))
92                                 .untilAsserted(() ->
93                                 {
94                                         checkSeenOffsetsForProgress();
95                                         assertSeenOffsetsEqualCommittedOffsets(seenOffsets);
96                                 });
97
98                 assertThatExceptionOfType(IllegalStateException.class)
99                                 .isThrownBy(() -> endlessConsumer.exitStatus())
100                                 .describedAs("Consumer should still be running");
101
102                 recordGenerator.assertBusinessLogic();
103         }
104
105         @Test
106         @SkipWhenErrorCannotBeGenerated(poisonPill = true)
107         void commitsOffsetOfErrorForReprocessingOnDeserializationError()
108         {
109                 int numberOfGeneratedMessages =
110                                 recordGenerator.generate(true, false, messageSender);
111
112                 await("Consumer failed")
113                                 .atMost(Duration.ofSeconds(30))
114                                 .pollInterval(Duration.ofSeconds(1))
115                                 .until(() -> !endlessConsumer.running());
116
117                 checkSeenOffsetsForProgress();
118                 assertSeenOffsetsEqualCommittedOffsets(seenOffsets);
119
120                 endlessConsumer.start();
121                 await("Consumer failed")
122                                 .atMost(Duration.ofSeconds(30))
123                                 .pollInterval(Duration.ofSeconds(1))
124                                 .until(() -> !endlessConsumer.running());
125
126                 checkSeenOffsetsForProgress();
127                 assertSeenOffsetsEqualCommittedOffsets(seenOffsets);
128                 assertThat(receivedRecords.size())
129                                 .describedAs("Received not all sent events")
130                                 .isLessThan(numberOfGeneratedMessages);
131
132                 assertThatNoException()
133                                 .describedAs("Consumer should not be running")
134                                 .isThrownBy(() -> endlessConsumer.exitStatus());
135                 assertThat(endlessConsumer.exitStatus())
136                                 .describedAs("Consumer should have exited abnormally")
137                                 .containsInstanceOf(RecordDeserializationException.class);
138
139                 recordGenerator.assertBusinessLogic();
140         }
141
142         @Test
143         @SkipWhenErrorCannotBeGenerated(logicError = true)
144         void doesNotCommitOffsetsOnLogicError()
145         {
146                 int numberOfGeneratedMessages =
147                                 recordGenerator.generate(false, true, messageSender);
148
149                 await("Consumer failed")
150                                 .atMost(Duration.ofSeconds(30))
151                                 .pollInterval(Duration.ofSeconds(1))
152                                 .until(() -> !endlessConsumer.running());
153
154                 checkSeenOffsetsForProgress();
155                 assertSeenOffsetsAreBehindCommittedOffsets(seenOffsets);
156
157                 endlessConsumer.start();
158                 await("Consumer failed")
159                                 .atMost(Duration.ofSeconds(30))
160                                 .pollInterval(Duration.ofSeconds(1))
161                                 .until(() -> !endlessConsumer.running());
162
163                 assertSeenOffsetsAreBehindCommittedOffsets(seenOffsets);
164
165                 assertThatNoException()
166                                 .describedAs("Consumer should not be running")
167                                 .isThrownBy(() -> endlessConsumer.exitStatus());
168                 assertThat(endlessConsumer.exitStatus())
169                                 .describedAs("Consumer should have exited abnormally")
170                                 .containsInstanceOf(RuntimeException.class);
171
172                 recordGenerator.assertBusinessLogic();
173         }
174
175
176         /** Helper methods for the verification of expectations */
177
178         void assertSeenOffsetsEqualCommittedOffsets(Map<TopicPartition, Long> offsetsToCheck)
179         {
180                 doForCurrentOffsets((tp, offset) ->
181                 {
182                         Long expected = offsetsToCheck.get(tp) + 1;
183                         log.debug("Checking, if the offset {} for {} is exactly {}", offset, tp, expected);
184                         assertThat(offset)
185                                         .describedAs("Committed offset corresponds to the offset of the consumer")
186                                         .isEqualTo(expected);
187                 });
188         }
189
190         void assertSeenOffsetsAreBehindCommittedOffsets(Map<TopicPartition, Long> offsetsToCheck)
191         {
192                 List<Boolean> isOffsetBehindSeen = new LinkedList<>();
193
194                 doForCurrentOffsets((tp, offset) ->
195                 {
196                         Long expected = offsetsToCheck.get(tp) + 1;
197                         log.debug("Checking, if the offset {} for {} is at most {}", offset, tp, expected);
198                         assertThat(offset)
199                                         .describedAs("Committed offset corresponds to the offset of the consumer")
200                                         .isLessThanOrEqualTo(expected);
201                         isOffsetBehindSeen.add(offset < expected);
202                 });
203
204                 assertThat(isOffsetBehindSeen.stream().reduce(false, (result, next) -> result | next))
205                                 .describedAs("Committed offsets are behind seen offsets")
206                                 .isTrue();
207         }
208
209         void checkSeenOffsetsForProgress()
210         {
211                 // Be sure, that some messages were consumed...!
212                 Set<TopicPartition> withProgress = new HashSet<>();
213                 partitions().forEach(tp ->
214                 {
215                         Long oldOffset = oldOffsets.get(tp) + 1;
216                         Long newOffset = seenOffsets.get(tp) + 1;
217                         if (!oldOffset.equals(newOffset))
218                         {
219                                 log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset);
220                                 withProgress.add(tp);
221                         }
222                 });
223                 assertThat(withProgress)
224                                 .describedAs("Some offsets must have changed, compared to the old offset-positions")
225                                 .isNotEmpty();
226         }
227
228
229         /** Helper methods for setting up and running the tests */
230
231         void seekToEnd()
232         {
233                 offsetConsumer.assign(partitions());
234                 offsetConsumer.seekToEnd(partitions());
235                 partitions().forEach(tp ->
236                 {
237                         // seekToEnd() works lazily: it only takes effect on poll()/position()
238                         Long offset = offsetConsumer.position(tp);
239                         log.info("New position for {}: {}", tp, offset);
240                 });
241                 // The new positions must be commited!
242                 offsetConsumer.commitSync();
243                 offsetConsumer.unsubscribe();
244         }
245
246         void doForCurrentOffsets(BiConsumer<TopicPartition, Long> consumer)
247         {
248                 offsetConsumer.assign(partitions());
249                 partitions().forEach(tp -> consumer.accept(tp, offsetConsumer.position(tp)));
250                 offsetConsumer.unsubscribe();
251         }
252
253         List<TopicPartition> partitions()
254         {
255                 return
256                                 IntStream
257                                                 .range(0, PARTITIONS)
258                                                 .mapToObj(partition -> new TopicPartition(TOPIC, partition))
259                                                 .collect(Collectors.toList());
260         }
261
262
263         public interface RecordGenerator
264         {
265                 int generate(
266                                 boolean poisonPills,
267                                 boolean logicErrors,
268                                 Consumer<ProducerRecord<Bytes, Bytes>> messageSender);
269
270                 default boolean canGeneratePoisonPill()
271                 {
272                         return true;
273                 }
274
275                 default boolean canGenerateLogicError()
276                 {
277                         return true;
278                 }
279
280                 default void assertBusinessLogic()
281                 {
282                         log.debug("No business-logic to assert");
283                 }
284         }
285
286         void sendMessage(ProducerRecord<Bytes, Bytes> record)
287         {
288                 testRecordProducer.send(record, (metadata, e) ->
289                 {
290                         if (metadata != null)
291                         {
292                                 log.debug(
293                                                 "{}|{} - {}={}",
294                                                 metadata.partition(),
295                                                 metadata.offset(),
296                                                 record.key(),
297                                                 record.value());
298                         }
299                         else
300                         {
301                                 log.warn(
302                                                 "Exception for {}={}: {}",
303                                                 record.key(),
304                                                 record.value(),
305                                                 e.toString());
306                         }
307                 });
308         }
309
310
311         @BeforeEach
312         public void init()
313         {
314                 Properties props;
315                 props = new Properties();
316                 props.put("bootstrap.servers", properties.getBootstrapServer());
317                 props.put("linger.ms", 100);
318                 props.put("key.serializer", BytesSerializer.class.getName());
319                 props.put("value.serializer", BytesSerializer.class.getName());
320                 testRecordProducer = new KafkaProducer<>(props);
321
322                 props = new Properties();
323                 props.put("bootstrap.servers", properties.getBootstrapServer());
324                 props.put("client.id", "OFFSET-CONSUMER");
325                 props.put("group.id", properties.getGroupId());
326                 props.put("key.deserializer", BytesDeserializer.class.getName());
327                 props.put("value.deserializer", BytesDeserializer.class.getName());
328                 offsetConsumer = new KafkaConsumer<>(props);
329
330                 seekToEnd();
331
332                 oldOffsets = new HashMap<>();
333                 seenOffsets = new HashMap<>();
334                 receivedRecords = new HashSet<>();
335
336                 doForCurrentOffsets((tp, offset) ->
337                 {
338                         oldOffsets.put(tp, offset - 1);
339                         seenOffsets.put(tp, offset - 1);
340                 });
341
342                 Consumer<ConsumerRecord<K, V>> captureOffsetAndExecuteTestHandler =
343                                 record ->
344                                 {
345                                         seenOffsets.put(
346                                                         new TopicPartition(record.topic(), record.partition()),
347                                                         record.offset());
348                                         receivedRecords.add(record);
349                                         consumer.accept(record);
350                                 };
351
352                 endlessConsumer =
353                                 new EndlessConsumer<>(
354                                                 executor,
355                                                 properties.getClientId(),
356                                                 properties.getTopic(),
357                                                 kafkaConsumer,
358                                                 captureOffsetAndExecuteTestHandler);
359
360                 endlessConsumer.start();
361         }
362
363         @AfterEach
364         public void deinit()
365         {
366                 try
367                 {
368                         endlessConsumer.stop();
369                         testRecordProducer.close();
370                         offsetConsumer.close();
371                 }
372                 catch (Exception e)
373                 {
374                         log.info("Exception while stopping the consumer: {}", e.toString());
375                 }
376         }
377
378
379         @TestConfiguration
380         @Import(ApplicationConfiguration.class)
381         public static class Configuration
382         {
383         }
384 }