ROT: Fehler in Test-Logik aufgedeckt
[demos/kafka/training] / src / test / java / de / juplo / kafka / GenericApplicationTests.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.consumer.ConsumerRecord;
5 import org.apache.kafka.clients.consumer.KafkaConsumer;
6 import org.apache.kafka.clients.producer.KafkaProducer;
7 import org.apache.kafka.clients.producer.ProducerRecord;
8 import org.apache.kafka.common.TopicPartition;
9 import org.apache.kafka.common.errors.RecordDeserializationException;
10 import org.apache.kafka.common.serialization.*;
11 import org.apache.kafka.common.utils.Bytes;
12 import org.junit.jupiter.api.*;
13 import org.springframework.beans.factory.annotation.Autowired;
14 import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer;
15 import org.springframework.boot.test.context.TestConfiguration;
16 import org.springframework.context.annotation.Import;
17 import org.springframework.kafka.test.context.EmbeddedKafka;
18 import org.springframework.test.context.TestPropertySource;
19 import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
20
21 import java.time.Duration;
22 import java.util.*;
23 import java.util.concurrent.ExecutorService;
24 import java.util.function.BiConsumer;
25 import java.util.function.Consumer;
26 import java.util.stream.Collectors;
27 import java.util.stream.IntStream;
28
29 import static de.juplo.kafka.GenericApplicationTests.PARTITIONS;
30 import static de.juplo.kafka.GenericApplicationTests.TOPIC;
31 import static org.assertj.core.api.Assertions.*;
32 import static org.awaitility.Awaitility.*;
33
34
35 @SpringJUnitConfig(initializers = ConfigDataApplicationContextInitializer.class)
36 @TestPropertySource(
37                 properties = {
38                                 "consumer.bootstrap-server=${spring.embedded.kafka.brokers}",
39                                 "consumer.topic=" + TOPIC,
40                                 "consumer.commit-interval=500ms" })
41 @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
42 @Slf4j
43 abstract class GenericApplicationTests<K, V>
44 {
45         public static final String TOPIC = "FOO";
46         public static final int PARTITIONS = 10;
47
48
49         @Autowired
50         KafkaConsumer<K, V> kafkaConsumer;
51         @Autowired
52         Consumer<ConsumerRecord<K, V>> consumer;
53         @Autowired
54         ApplicationProperties properties;
55         @Autowired
56         ExecutorService executor;
57
58         KafkaProducer<Bytes, Bytes> testRecordProducer;
59         KafkaConsumer<Bytes, Bytes> offsetConsumer;
60         EndlessConsumer<K, V> endlessConsumer;
61         Map<TopicPartition, Long> oldOffsets;
62         Map<TopicPartition, Long> newOffsets;
63         Set<ConsumerRecord<K, V>> receivedRecords;
64
65
66         final RecordGenerator recordGenerator;
67         final Consumer<ProducerRecord<Bytes, Bytes>> messageSender;
68
69         public GenericApplicationTests(RecordGenerator recordGenerator)
70         {
71                 this.recordGenerator = recordGenerator;
72                 this.messageSender = (record) -> sendMessage(record);
73         }
74
75
76         /** Tests methods */
77
78         @Test
79         void commitsCurrentOffsetsOnSuccess()
80         {
81                 int numberOfGeneratedMessages =
82                                 recordGenerator.generate(false, false, messageSender);
83
84                 await(numberOfGeneratedMessages + " records received")
85                                 .atMost(Duration.ofSeconds(30))
86                                 .pollInterval(Duration.ofSeconds(1))
87                                 .until(() -> receivedRecords.size() >= numberOfGeneratedMessages);
88
89                 await("Offsets committed")
90                                 .atMost(Duration.ofSeconds(10))
91                                 .pollInterval(Duration.ofSeconds(1))
92                                 .untilAsserted(() ->
93                                 {
94                                         checkSeenOffsetsForProgress();
95                                         compareToCommitedOffsets(newOffsets);
96                                 });
97
98                 assertThatExceptionOfType(IllegalStateException.class)
99                                 .isThrownBy(() -> endlessConsumer.exitStatus())
100                                 .describedAs("Consumer should still be running");
101
102                 recordGenerator.assertBusinessLogic();
103         }
104
105         @Test
106         @SkipWhenErrorCannotBeGenerated(poisonPill = true)
107         void commitsOffsetOfErrorForReprocessingOnDeserializationError()
108         {
109                 int numberOfGeneratedMessages =
110                                 recordGenerator.generate(true, false, messageSender);
111
112                 await("Consumer failed")
113                                 .atMost(Duration.ofSeconds(30))
114                                 .pollInterval(Duration.ofSeconds(1))
115                                 .until(() -> !endlessConsumer.running());
116
117                 checkSeenOffsetsForProgress();
118                 compareToCommitedOffsets(newOffsets);
119
120                 endlessConsumer.start();
121                 await("Consumer failed")
122                                 .atMost(Duration.ofSeconds(30))
123                                 .pollInterval(Duration.ofSeconds(1))
124                                 .until(() -> !endlessConsumer.running());
125
126                 checkSeenOffsetsForProgress();
127                 compareToCommitedOffsets(newOffsets);
128                 assertThat(receivedRecords.size())
129                                 .describedAs("Received not all sent events")
130                                 .isLessThan(numberOfGeneratedMessages);
131
132                 assertThatNoException()
133                                 .describedAs("Consumer should not be running")
134                                 .isThrownBy(() -> endlessConsumer.exitStatus());
135                 assertThat(endlessConsumer.exitStatus())
136                                 .describedAs("Consumer should have exited abnormally")
137                                 .containsInstanceOf(RecordDeserializationException.class);
138
139                 recordGenerator.assertBusinessLogic();
140         }
141
142         @Test
143         @SkipWhenErrorCannotBeGenerated(logicError = true)
144         void doesNotCommitOffsetsOnLogicError()
145         {
146                 int numberOfGeneratedMessages =
147                                 recordGenerator.generate(false, true, messageSender);
148
149                 await("Consumer failed")
150                                 .atMost(Duration.ofSeconds(30))
151                                 .pollInterval(Duration.ofSeconds(1))
152                                 .until(() -> !endlessConsumer.running());
153
154                 checkSeenOffsetsForProgress();
155                 compareToCommitedOffsets(oldOffsets);
156
157                 endlessConsumer.start();
158                 await("Consumer failed")
159                                 .atMost(Duration.ofSeconds(30))
160                                 .pollInterval(Duration.ofSeconds(1))
161                                 .until(() -> !endlessConsumer.running());
162
163                 checkSeenOffsetsForProgress();
164                 compareToCommitedOffsets(oldOffsets);
165                 assertThat(receivedRecords.size())
166                                 .describedAs("Received not all sent events")
167                                 .isLessThan(numberOfGeneratedMessages);
168
169                 assertThatNoException()
170                                 .describedAs("Consumer should not be running")
171                                 .isThrownBy(() -> endlessConsumer.exitStatus());
172                 assertThat(endlessConsumer.exitStatus())
173                                 .describedAs("Consumer should have exited abnormally")
174                                 .containsInstanceOf(RuntimeException.class);
175
176                 recordGenerator.assertBusinessLogic();
177         }
178
179
180         /** Helper methods for the verification of expectations */
181
182         void compareToCommitedOffsets(Map<TopicPartition, Long> offsetsToCheck)
183         {
184                 doForCurrentOffsets((tp, offset) ->
185                 {
186                         Long expected = offsetsToCheck.get(tp) + 1;
187                         log.debug("Checking, if the offset for {} is {}", tp, expected);
188                         assertThat(offset)
189                                         .describedAs("Committed offset corresponds to the offset of the consumer")
190                                         .isEqualTo(expected);
191                 });
192         }
193
194         void checkSeenOffsetsForProgress()
195         {
196                 // Be sure, that some messages were consumed...!
197                 Set<TopicPartition> withProgress = new HashSet<>();
198                 partitions().forEach(tp ->
199                 {
200                         Long oldOffset = oldOffsets.get(tp) + 1;
201                         Long newOffset = newOffsets.get(tp) + 1;
202                         if (!oldOffset.equals(newOffset))
203                         {
204                                 log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset);
205                                 withProgress.add(tp);
206                         }
207                 });
208                 assertThat(withProgress)
209                                 .describedAs("Some offsets must have changed, compared to the old offset-positions")
210                                 .isNotEmpty();
211         }
212
213
214         /** Helper methods for setting up and running the tests */
215
216         void seekToEnd()
217         {
218                 offsetConsumer.assign(partitions());
219                 offsetConsumer.seekToEnd(partitions());
220                 partitions().forEach(tp ->
221                 {
222                         // seekToEnd() works lazily: it only takes effect on poll()/position()
223                         Long offset = offsetConsumer.position(tp);
224                         log.info("New position for {}: {}", tp, offset);
225                 });
226                 // The new positions must be commited!
227                 offsetConsumer.commitSync();
228                 offsetConsumer.unsubscribe();
229         }
230
231         void doForCurrentOffsets(BiConsumer<TopicPartition, Long> consumer)
232         {
233                 offsetConsumer.assign(partitions());
234                 partitions().forEach(tp -> consumer.accept(tp, offsetConsumer.position(tp)));
235                 offsetConsumer.unsubscribe();
236         }
237
238         List<TopicPartition> partitions()
239         {
240                 return
241                                 IntStream
242                                                 .range(0, PARTITIONS)
243                                                 .mapToObj(partition -> new TopicPartition(TOPIC, partition))
244                                                 .collect(Collectors.toList());
245         }
246
247
248         public interface RecordGenerator
249         {
250                 int generate(
251                                 boolean poisonPills,
252                                 boolean logicErrors,
253                                 Consumer<ProducerRecord<Bytes, Bytes>> messageSender);
254
255                 default boolean canGeneratePoisonPill()
256                 {
257                         return true;
258                 }
259
260                 default boolean canGenerateLogicError()
261                 {
262                         return true;
263                 }
264
265                 default void assertBusinessLogic()
266                 {
267                         log.debug("No business-logic to assert");
268                 }
269         }
270
271         void sendMessage(ProducerRecord<Bytes, Bytes> record)
272         {
273                 testRecordProducer.send(record, (metadata, e) ->
274                 {
275                         if (metadata != null)
276                         {
277                                 log.debug(
278                                                 "{}|{} - {}={}",
279                                                 metadata.partition(),
280                                                 metadata.offset(),
281                                                 record.key(),
282                                                 record.value());
283                         }
284                         else
285                         {
286                                 log.warn(
287                                                 "Exception for {}={}: {}",
288                                                 record.key(),
289                                                 record.value(),
290                                                 e.toString());
291                         }
292                 });
293         }
294
295
296         @BeforeEach
297         public void init()
298         {
299                 Properties props;
300                 props = new Properties();
301                 props.put("bootstrap.servers", properties.getBootstrapServer());
302                 props.put("linger.ms", 100);
303                 props.put("key.serializer", BytesSerializer.class.getName());
304                 props.put("value.serializer", BytesSerializer.class.getName());
305                 testRecordProducer = new KafkaProducer<>(props);
306
307                 props = new Properties();
308                 props.put("bootstrap.servers", properties.getBootstrapServer());
309                 props.put("client.id", "OFFSET-CONSUMER");
310                 props.put("group.id", properties.getGroupId());
311                 props.put("key.deserializer", BytesDeserializer.class.getName());
312                 props.put("value.deserializer", BytesDeserializer.class.getName());
313                 offsetConsumer = new KafkaConsumer<>(props);
314
315                 seekToEnd();
316
317                 oldOffsets = new HashMap<>();
318                 newOffsets = new HashMap<>();
319                 receivedRecords = new HashSet<>();
320
321                 doForCurrentOffsets((tp, offset) ->
322                 {
323                         oldOffsets.put(tp, offset - 1);
324                         newOffsets.put(tp, offset - 1);
325                 });
326
327                 Consumer<ConsumerRecord<K, V>> captureOffsetAndExecuteTestHandler =
328                                 record ->
329                                 {
330                                         newOffsets.put(
331                                                         new TopicPartition(record.topic(), record.partition()),
332                                                         record.offset());
333                                         receivedRecords.add(record);
334                                         consumer.accept(record);
335                                 };
336
337                 endlessConsumer =
338                                 new EndlessConsumer<>(
339                                                 executor,
340                                                 properties.getClientId(),
341                                                 properties.getTopic(),
342                                                 kafkaConsumer,
343                                                 captureOffsetAndExecuteTestHandler);
344
345                 endlessConsumer.start();
346         }
347
348         @AfterEach
349         public void deinit()
350         {
351                 try
352                 {
353                         endlessConsumer.stop();
354                         testRecordProducer.close();
355                         offsetConsumer.close();
356                 }
357                 catch (Exception e)
358                 {
359                         log.info("Exception while stopping the consumer: {}", e.toString());
360                 }
361         }
362
363
364         @TestConfiguration
365         @Import(ApplicationConfiguration.class)
366         public static class Configuration
367         {
368         }
369 }