ROT: Rückbau auf automatische Commits - Testfälle laufen nicht mehr
[demos/kafka/training] / src / test / java / de / juplo / kafka / GenericApplicationTests.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.consumer.ConsumerRecord;
5 import org.apache.kafka.clients.consumer.KafkaConsumer;
6 import org.apache.kafka.clients.producer.KafkaProducer;
7 import org.apache.kafka.clients.producer.ProducerRecord;
8 import org.apache.kafka.common.TopicPartition;
9 import org.apache.kafka.common.errors.RecordDeserializationException;
10 import org.apache.kafka.common.serialization.*;
11 import org.apache.kafka.common.utils.Bytes;
12 import org.junit.jupiter.api.*;
13 import org.springframework.beans.factory.annotation.Autowired;
14 import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
15 import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo;
16 import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer;
17 import org.springframework.boot.test.context.TestConfiguration;
18 import org.springframework.context.annotation.Import;
19 import org.springframework.kafka.test.context.EmbeddedKafka;
20 import org.springframework.test.context.TestPropertySource;
21 import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
22
23 import java.time.Duration;
24 import java.util.*;
25 import java.util.concurrent.ExecutorService;
26 import java.util.function.BiConsumer;
27 import java.util.function.Consumer;
28 import java.util.stream.Collectors;
29 import java.util.stream.IntStream;
30
31 import static de.juplo.kafka.GenericApplicationTests.PARTITIONS;
32 import static de.juplo.kafka.GenericApplicationTests.TOPIC;
33 import static org.assertj.core.api.Assertions.*;
34 import static org.awaitility.Awaitility.*;
35
36
37 @SpringJUnitConfig(initializers = ConfigDataApplicationContextInitializer.class)
38 @TestPropertySource(
39                 properties = {
40                                 "sumup.adder.bootstrap-server=${spring.embedded.kafka.brokers}",
41                                 "sumup.adder.topic=" + TOPIC,
42                                 "sumup.adder.commit-interval=1s",
43                                 "spring.mongodb.embedded.version=4.4.13" })
44 @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
45 @EnableAutoConfiguration
46 @AutoConfigureDataMongo
47 @Slf4j
48 abstract class GenericApplicationTests<K, V>
49 {
50         public static final String TOPIC = "FOO";
51         public static final int PARTITIONS = 10;
52
53
54         @Autowired
55         KafkaConsumer<K, V> kafkaConsumer;
56         @Autowired
57         Consumer<ConsumerRecord<K, V>> consumer;
58         @Autowired
59         ApplicationProperties properties;
60         @Autowired
61         ExecutorService executor;
62         @Autowired
63         PollIntervalAwareConsumerRebalanceListener rebalanceListener;
64         @Autowired
65         RecordHandler<K, V> recordHandler;
66
67         KafkaProducer<Bytes, Bytes> testRecordProducer;
68         KafkaConsumer<Bytes, Bytes> offsetConsumer;
69         EndlessConsumer<K, V> endlessConsumer;
70         Map<TopicPartition, Long> oldOffsets;
71         Map<TopicPartition, Long> newOffsets;
72         Set<ConsumerRecord<K, V>> receivedRecords;
73
74
75         final RecordGenerator recordGenerator;
76         final Consumer<ProducerRecord<Bytes, Bytes>> messageSender;
77
78         public GenericApplicationTests(RecordGenerator recordGenerator)
79         {
80                 this.recordGenerator = recordGenerator;
81                 this.messageSender = (record) -> sendMessage(record);
82         }
83
84
85         /** Tests methods */
86
87         @Test
88         void commitsCurrentOffsetsOnSuccess()
89         {
90                 int numberOfGeneratedMessages =
91                                 recordGenerator.generate(false, false, messageSender);
92
93                 await(numberOfGeneratedMessages + " records received")
94                                 .atMost(Duration.ofSeconds(30))
95                                 .pollInterval(Duration.ofSeconds(1))
96                                 .until(() -> receivedRecords.size() >= numberOfGeneratedMessages);
97
98                 await("Offsets committed")
99                                 .atMost(Duration.ofSeconds(10))
100                                 .pollInterval(Duration.ofSeconds(1))
101                                 .untilAsserted(() ->
102                                 {
103                                         checkSeenOffsetsForProgress();
104                                         compareToCommitedOffsets(newOffsets);
105                                 });
106
107                 assertThatExceptionOfType(IllegalStateException.class)
108                                 .isThrownBy(() -> endlessConsumer.exitStatus())
109                                 .describedAs("Consumer should still be running");
110
111                 recordGenerator.assertBusinessLogic();
112         }
113
114         @Test
115         @SkipWhenErrorCannotBeGenerated(poisonPill = true)
116         void commitsOffsetOfErrorForReprocessingOnDeserializationError()
117         {
118                 int numberOfGeneratedMessages =
119                                 recordGenerator.generate(true, false, messageSender);
120
121                 await("Consumer failed")
122                                 .atMost(Duration.ofSeconds(30))
123                                 .pollInterval(Duration.ofSeconds(1))
124                                 .until(() -> !endlessConsumer.running());
125
126                 checkSeenOffsetsForProgress();
127                 compareToCommitedOffsets(newOffsets);
128
129                 endlessConsumer.start();
130                 await("Consumer failed")
131                                 .atMost(Duration.ofSeconds(30))
132                                 .pollInterval(Duration.ofSeconds(1))
133                                 .until(() -> !endlessConsumer.running());
134
135                 checkSeenOffsetsForProgress();
136                 compareToCommitedOffsets(newOffsets);
137                 assertThat(receivedRecords.size())
138                                 .describedAs("Received not all sent events")
139                                 .isLessThan(numberOfGeneratedMessages);
140
141                 assertThatNoException()
142                                 .describedAs("Consumer should not be running")
143                                 .isThrownBy(() -> endlessConsumer.exitStatus());
144                 assertThat(endlessConsumer.exitStatus())
145                                 .describedAs("Consumer should have exited abnormally")
146                                 .containsInstanceOf(RecordDeserializationException.class);
147
148                 recordGenerator.assertBusinessLogic();
149         }
150
151         @Test
152         @SkipWhenErrorCannotBeGenerated(logicError = true)
153         void doesNotCommitOffsetsOnLogicError()
154         {
155                 int numberOfGeneratedMessages =
156                                 recordGenerator.generate(false, true, messageSender);
157
158                 await("Consumer failed")
159                                 .atMost(Duration.ofSeconds(30))
160                                 .pollInterval(Duration.ofSeconds(1))
161                                 .until(() -> !endlessConsumer.running());
162
163                 checkSeenOffsetsForProgress();
164                 compareToCommitedOffsets(oldOffsets);
165
166                 endlessConsumer.start();
167                 await("Consumer failed")
168                                 .atMost(Duration.ofSeconds(30))
169                                 .pollInterval(Duration.ofSeconds(1))
170                                 .until(() -> !endlessConsumer.running());
171
172                 checkSeenOffsetsForProgress();
173                 compareToCommitedOffsets(oldOffsets);
174                 assertThat(receivedRecords.size())
175                                 .describedAs("Received not all sent events")
176                                 .isLessThan(numberOfGeneratedMessages);
177
178                 assertThatNoException()
179                                 .describedAs("Consumer should not be running")
180                                 .isThrownBy(() -> endlessConsumer.exitStatus());
181                 assertThat(endlessConsumer.exitStatus())
182                                 .describedAs("Consumer should have exited abnormally")
183                                 .containsInstanceOf(RuntimeException.class);
184
185                 recordGenerator.assertBusinessLogic();
186         }
187
188
189         /** Helper methods for the verification of expectations */
190
191         void compareToCommitedOffsets(Map<TopicPartition, Long> offsetsToCheck)
192         {
193                 doForCurrentOffsets((tp, offset) ->
194                 {
195                         Long expected = offsetsToCheck.get(tp) + 1;
196                         log.debug("Checking, if the offset for {} is {}", tp, expected);
197                         assertThat(offset)
198                                         .describedAs("Committed offset corresponds to the offset of the consumer")
199                                         .isEqualTo(expected);
200                 });
201         }
202
203         void checkSeenOffsetsForProgress()
204         {
205                 // Be sure, that some messages were consumed...!
206                 Set<TopicPartition> withProgress = new HashSet<>();
207                 partitions().forEach(tp ->
208                 {
209                         Long oldOffset = oldOffsets.get(tp) + 1;
210                         Long newOffset = newOffsets.get(tp) + 1;
211                         if (!oldOffset.equals(newOffset))
212                         {
213                                 log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset);
214                                 withProgress.add(tp);
215                         }
216                 });
217                 assertThat(withProgress)
218                                 .describedAs("Some offsets must have changed, compared to the old offset-positions")
219                                 .isNotEmpty();
220         }
221
222
223         /** Helper methods for setting up and running the tests */
224
225         void seekToEnd()
226         {
227                 offsetConsumer.assign(partitions());
228                 offsetConsumer.seekToEnd(partitions());
229                 partitions().forEach(tp ->
230                 {
231                         // seekToEnd() works lazily: it only takes effect on poll()/position()
232                         Long offset = offsetConsumer.position(tp);
233                         log.info("New position for {}: {}", tp, offset);
234                 });
235                 // The new positions must be commited!
236                 offsetConsumer.commitSync();
237                 offsetConsumer.unsubscribe();
238         }
239
240         void doForCurrentOffsets(BiConsumer<TopicPartition, Long> consumer)
241         {
242                 offsetConsumer.assign(partitions());
243                 partitions().forEach(tp -> consumer.accept(tp, offsetConsumer.position(tp)));
244                 offsetConsumer.unsubscribe();
245         }
246
247         List<TopicPartition> partitions()
248         {
249                 return
250                                 IntStream
251                                                 .range(0, PARTITIONS)
252                                                 .mapToObj(partition -> new TopicPartition(TOPIC, partition))
253                                                 .collect(Collectors.toList());
254         }
255
256
257         public interface RecordGenerator
258         {
259                 int generate(
260                                 boolean poisonPills,
261                                 boolean logicErrors,
262                                 Consumer<ProducerRecord<Bytes, Bytes>> messageSender);
263
264                 default boolean canGeneratePoisonPill()
265                 {
266                         return true;
267                 }
268
269                 default boolean canGenerateLogicError()
270                 {
271                         return true;
272                 }
273
274                 default void assertBusinessLogic()
275                 {
276                         log.debug("No business-logic to assert");
277                 }
278         }
279
280         void sendMessage(ProducerRecord<Bytes, Bytes> record)
281         {
282                 testRecordProducer.send(record, (metadata, e) ->
283                 {
284                         if (metadata != null)
285                         {
286                                 log.debug(
287                                                 "{}|{} - {}={}",
288                                                 metadata.partition(),
289                                                 metadata.offset(),
290                                                 record.key(),
291                                                 record.value());
292                         }
293                         else
294                         {
295                                 log.warn(
296                                                 "Exception for {}={}: {}",
297                                                 record.key(),
298                                                 record.value(),
299                                                 e.toString());
300                         }
301                 });
302         }
303
304
305         @BeforeEach
306         public void init()
307         {
308                 Properties props;
309                 props = new Properties();
310                 props.put("bootstrap.servers", properties.getBootstrapServer());
311                 props.put("linger.ms", 100);
312                 props.put("key.serializer", BytesSerializer.class.getName());
313                 props.put("value.serializer", BytesSerializer.class.getName());
314                 testRecordProducer = new KafkaProducer<>(props);
315
316                 props = new Properties();
317                 props.put("bootstrap.servers", properties.getBootstrapServer());
318                 props.put("client.id", "OFFSET-CONSUMER");
319                 props.put("group.id", properties.getGroupId());
320                 props.put("key.deserializer", BytesDeserializer.class.getName());
321                 props.put("value.deserializer", BytesDeserializer.class.getName());
322                 offsetConsumer = new KafkaConsumer<>(props);
323
324                 seekToEnd();
325
326                 oldOffsets = new HashMap<>();
327                 newOffsets = new HashMap<>();
328                 receivedRecords = new HashSet<>();
329
330                 doForCurrentOffsets((tp, offset) ->
331                 {
332                         oldOffsets.put(tp, offset - 1);
333                         newOffsets.put(tp, offset - 1);
334                 });
335
336                 TestRecordHandler<K, V> captureOffsetAndExecuteTestHandler =
337                                 new TestRecordHandler<K, V>(recordHandler)
338                                 {
339                                         @Override
340                                         public void onNewRecord(ConsumerRecord<K, V> record)
341                                         {
342                                                 newOffsets.put(
343                                                                 new TopicPartition(record.topic(), record.partition()),
344                                                                 record.offset());
345                                                 receivedRecords.add(record);
346                                         }
347                                 };
348
349                 endlessConsumer =
350                                 new EndlessConsumer<>(
351                                                 executor,
352                                                 properties.getClientId(),
353                                                 properties.getTopic(),
354                                                 kafkaConsumer,
355                                                 rebalanceListener,
356                                                 captureOffsetAndExecuteTestHandler);
357
358                 endlessConsumer.start();
359         }
360
361         @AfterEach
362         public void deinit()
363         {
364                 try
365                 {
366                         endlessConsumer.stop();
367                         testRecordProducer.close();
368                         offsetConsumer.close();
369                 }
370                 catch (Exception e)
371                 {
372                         log.info("Exception while stopping the consumer: {}", e.toString());
373                 }
374         }
375
376
377         @TestConfiguration
378         @Import(ApplicationConfiguration.class)
379         public static class Configuration
380         {
381         }
382 }