ROT: (Ohne stored-offsets) Überprüfung der Fachlogik korrigiert
[demos/kafka/training] / src / test / java / de / juplo / kafka / GenericApplicationTests.java
1 package de.juplo.kafka;
2
3 import com.mongodb.client.MongoClient;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.clients.consumer.ConsumerRecord;
6 import org.apache.kafka.clients.consumer.KafkaConsumer;
7 import org.apache.kafka.clients.producer.KafkaProducer;
8 import org.apache.kafka.clients.producer.ProducerRecord;
9 import org.apache.kafka.common.TopicPartition;
10 import org.apache.kafka.common.errors.RecordDeserializationException;
11 import org.apache.kafka.common.serialization.*;
12 import org.apache.kafka.common.utils.Bytes;
13 import org.junit.jupiter.api.*;
14 import org.springframework.beans.factory.annotation.Autowired;
15 import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
16 import org.springframework.boot.autoconfigure.mongo.MongoProperties;
17 import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo;
18 import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer;
19 import org.springframework.boot.test.context.TestConfiguration;
20 import org.springframework.context.annotation.Import;
21 import org.springframework.kafka.test.context.EmbeddedKafka;
22 import org.springframework.test.context.TestPropertySource;
23 import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
24
25 import java.time.Duration;
26 import java.util.*;
27 import java.util.concurrent.ExecutorService;
28 import java.util.function.BiConsumer;
29 import java.util.function.Consumer;
30 import java.util.stream.Collectors;
31 import java.util.stream.IntStream;
32
33 import static de.juplo.kafka.GenericApplicationTests.PARTITIONS;
34 import static de.juplo.kafka.GenericApplicationTests.TOPIC;
35 import static org.assertj.core.api.Assertions.*;
36 import static org.awaitility.Awaitility.*;
37
38
39 @SpringJUnitConfig(initializers = ConfigDataApplicationContextInitializer.class)
40 @TestPropertySource(
41                 properties = {
42                                 "sumup.adder.bootstrap-server=${spring.embedded.kafka.brokers}",
43                                 "sumup.adder.topic=" + TOPIC,
44                                 "sumup.adder.commit-interval=500ms",
45                                 "spring.mongodb.embedded.version=4.4.13" })
46 @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
47 @EnableAutoConfiguration
48 @AutoConfigureDataMongo
49 @Slf4j
50 abstract class GenericApplicationTests<K, V>
51 {
52         public static final String TOPIC = "FOO";
53         public static final int PARTITIONS = 10;
54
55
56         @Autowired
57         KafkaConsumer<K, V> kafkaConsumer;
58         @Autowired
59         Consumer<ConsumerRecord<K, V>> consumer;
60         @Autowired
61         ApplicationProperties properties;
62         @Autowired
63         ExecutorService executor;
64         @Autowired
65         MongoClient mongoClient;
66         @Autowired
67         MongoProperties mongoProperties;
68         @Autowired
69         PollIntervalAwareConsumerRebalanceListener rebalanceListener;
70         @Autowired
71         RecordHandler<K, V> recordHandler;
72
73         KafkaProducer<Bytes, Bytes> testRecordProducer;
74         KafkaConsumer<Bytes, Bytes> offsetConsumer;
75         EndlessConsumer<K, V> endlessConsumer;
76         Map<TopicPartition, Long> oldOffsets;
77         Map<TopicPartition, Long> newOffsets;
78         Set<ConsumerRecord<K, V>> receivedRecords;
79
80
81         final RecordGenerator recordGenerator;
82         final Consumer<ProducerRecord<Bytes, Bytes>> messageSender;
83
84         public GenericApplicationTests(RecordGenerator recordGenerator)
85         {
86                 this.recordGenerator = recordGenerator;
87                 this.messageSender = (record) -> sendMessage(record);
88         }
89
90
91         /** Tests methods */
92
93         @Test
94         void commitsCurrentOffsetsOnSuccess()
95         {
96                 int numberOfGeneratedMessages =
97                                 recordGenerator.generate(false, false, messageSender);
98
99                 await(numberOfGeneratedMessages + " records received")
100                                 .atMost(Duration.ofSeconds(30))
101                                 .pollInterval(Duration.ofSeconds(1))
102                                 .until(() -> receivedRecords.size() >= numberOfGeneratedMessages);
103
104                 await("Offsets committed")
105                                 .atMost(Duration.ofSeconds(10))
106                                 .pollInterval(Duration.ofSeconds(1))
107                                 .untilAsserted(() ->
108                                 {
109                                         checkSeenOffsetsForProgress();
110                                         compareToCommitedOffsets(newOffsets);
111                                 });
112
113                 assertThatExceptionOfType(IllegalStateException.class)
114                                 .isThrownBy(() -> endlessConsumer.exitStatus())
115                                 .describedAs("Consumer should still be running");
116
117                 recordGenerator.assertBusinessLogic();
118         }
119
120         @Test
121         @SkipWhenErrorCannotBeGenerated(poisonPill = true)
122         void commitsOffsetOfErrorForReprocessingOnDeserializationError()
123         {
124                 int numberOfGeneratedMessages =
125                                 recordGenerator.generate(true, false, messageSender);
126
127                 await("Consumer failed")
128                                 .atMost(Duration.ofSeconds(30))
129                                 .pollInterval(Duration.ofSeconds(1))
130                                 .until(() -> !endlessConsumer.running());
131
132                 checkSeenOffsetsForProgress();
133                 compareToCommitedOffsets(newOffsets);
134
135                 endlessConsumer.start();
136                 await("Consumer failed")
137                                 .atMost(Duration.ofSeconds(30))
138                                 .pollInterval(Duration.ofSeconds(1))
139                                 .until(() -> !endlessConsumer.running());
140
141                 checkSeenOffsetsForProgress();
142                 compareToCommitedOffsets(newOffsets);
143                 assertThat(receivedRecords.size())
144                                 .describedAs("Received not all sent events")
145                                 .isLessThan(numberOfGeneratedMessages);
146
147                 assertThatNoException()
148                                 .describedAs("Consumer should not be running")
149                                 .isThrownBy(() -> endlessConsumer.exitStatus());
150                 assertThat(endlessConsumer.exitStatus())
151                                 .describedAs("Consumer should have exited abnormally")
152                                 .containsInstanceOf(RecordDeserializationException.class);
153
154                 recordGenerator.assertBusinessLogic();
155         }
156
157         @Test
158         @SkipWhenErrorCannotBeGenerated(logicError = true)
159         void doesNotCommitOffsetsOnLogicError()
160         {
161                 int numberOfGeneratedMessages =
162                                 recordGenerator.generate(false, true, messageSender);
163
164                 await("Consumer failed")
165                                 .atMost(Duration.ofSeconds(30))
166                                 .pollInterval(Duration.ofSeconds(1))
167                                 .until(() -> !endlessConsumer.running());
168
169                 checkSeenOffsetsForProgress();
170                 compareToCommitedOffsets(oldOffsets);
171
172                 endlessConsumer.start();
173                 await("Consumer failed")
174                                 .atMost(Duration.ofSeconds(30))
175                                 .pollInterval(Duration.ofSeconds(1))
176                                 .until(() -> !endlessConsumer.running());
177
178                 checkSeenOffsetsForProgress();
179                 compareToCommitedOffsets(oldOffsets);
180
181                 assertThatNoException()
182                                 .describedAs("Consumer should not be running")
183                                 .isThrownBy(() -> endlessConsumer.exitStatus());
184                 assertThat(endlessConsumer.exitStatus())
185                                 .describedAs("Consumer should have exited abnormally")
186                                 .containsInstanceOf(RuntimeException.class);
187
188                 recordGenerator.assertBusinessLogic();
189         }
190
191
192         /** Helper methods for the verification of expectations */
193
194         void compareToCommitedOffsets(Map<TopicPartition, Long> offsetsToCheck)
195         {
196                 doForCurrentOffsets((tp, offset) ->
197                 {
198                         Long expected = offsetsToCheck.get(tp) + 1;
199                         log.debug("Checking, if the offset for {} is {}", tp, expected);
200                         assertThat(offset)
201                                         .describedAs("Committed offset corresponds to the offset of the consumer")
202                                         .isEqualTo(expected);
203                 });
204         }
205
206         void checkSeenOffsetsForProgress()
207         {
208                 // Be sure, that some messages were consumed...!
209                 Set<TopicPartition> withProgress = new HashSet<>();
210                 partitions().forEach(tp ->
211                 {
212                         Long oldOffset = oldOffsets.get(tp) + 1;
213                         Long newOffset = newOffsets.get(tp) + 1;
214                         if (!oldOffset.equals(newOffset))
215                         {
216                                 log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset);
217                                 withProgress.add(tp);
218                         }
219                 });
220                 assertThat(withProgress)
221                                 .describedAs("Some offsets must have changed, compared to the old offset-positions")
222                                 .isNotEmpty();
223         }
224
225
226         /** Helper methods for setting up and running the tests */
227
228         void seekToEnd()
229         {
230                 offsetConsumer.assign(partitions());
231                 offsetConsumer.seekToEnd(partitions());
232                 partitions().forEach(tp ->
233                 {
234                         // seekToEnd() works lazily: it only takes effect on poll()/position()
235                         Long offset = offsetConsumer.position(tp);
236                         log.info("New position for {}: {}", tp, offset);
237                 });
238                 // The new positions must be commited!
239                 offsetConsumer.commitSync();
240                 offsetConsumer.unsubscribe();
241         }
242
243         void doForCurrentOffsets(BiConsumer<TopicPartition, Long> consumer)
244         {
245                 offsetConsumer.assign(partitions());
246                 partitions().forEach(tp -> consumer.accept(tp, offsetConsumer.position(tp)));
247                 offsetConsumer.unsubscribe();
248         }
249
250         List<TopicPartition> partitions()
251         {
252                 return
253                                 IntStream
254                                                 .range(0, PARTITIONS)
255                                                 .mapToObj(partition -> new TopicPartition(TOPIC, partition))
256                                                 .collect(Collectors.toList());
257         }
258
259
260         public interface RecordGenerator
261         {
262                 int generate(
263                                 boolean poisonPills,
264                                 boolean logicErrors,
265                                 Consumer<ProducerRecord<Bytes, Bytes>> messageSender);
266
267                 default boolean canGeneratePoisonPill()
268                 {
269                         return true;
270                 }
271
272                 default boolean canGenerateLogicError()
273                 {
274                         return true;
275                 }
276
277                 default void assertBusinessLogic()
278                 {
279                         log.debug("No business-logic to assert");
280                 }
281         }
282
283         void sendMessage(ProducerRecord<Bytes, Bytes> record)
284         {
285                 testRecordProducer.send(record, (metadata, e) ->
286                 {
287                         if (metadata != null)
288                         {
289                                 log.debug(
290                                                 "{}|{} - {}={}",
291                                                 metadata.partition(),
292                                                 metadata.offset(),
293                                                 record.key(),
294                                                 record.value());
295                         }
296                         else
297                         {
298                                 log.warn(
299                                                 "Exception for {}={}: {}",
300                                                 record.key(),
301                                                 record.value(),
302                                                 e.toString());
303                         }
304                 });
305         }
306
307
308         @BeforeEach
309         public void init()
310         {
311                 Properties props;
312                 props = new Properties();
313                 props.put("bootstrap.servers", properties.getBootstrapServer());
314                 props.put("linger.ms", 100);
315                 props.put("key.serializer", BytesSerializer.class.getName());
316                 props.put("value.serializer", BytesSerializer.class.getName());
317                 testRecordProducer = new KafkaProducer<>(props);
318
319                 props = new Properties();
320                 props.put("bootstrap.servers", properties.getBootstrapServer());
321                 props.put("client.id", "OFFSET-CONSUMER");
322                 props.put("group.id", properties.getGroupId());
323                 props.put("key.deserializer", BytesDeserializer.class.getName());
324                 props.put("value.deserializer", BytesDeserializer.class.getName());
325                 offsetConsumer = new KafkaConsumer<>(props);
326
327                 seekToEnd();
328
329                 oldOffsets = new HashMap<>();
330                 newOffsets = new HashMap<>();
331                 receivedRecords = new HashSet<>();
332
333                 doForCurrentOffsets((tp, offset) ->
334                 {
335                         oldOffsets.put(tp, offset - 1);
336                         newOffsets.put(tp, offset - 1);
337                 });
338
339                 TestRecordHandler<K, V> captureOffsetAndExecuteTestHandler =
340                                 new TestRecordHandler<K, V>(recordHandler)
341                                 {
342                                         @Override
343                                         public void onNewRecord(ConsumerRecord<K, V> record)
344                                         {
345                                                 newOffsets.put(
346                                                                 new TopicPartition(record.topic(), record.partition()),
347                                                                 record.offset());
348                                                 receivedRecords.add(record);
349                                         }
350                                 };
351
352                 mongoClient.getDatabase(mongoProperties.getDatabase()).drop();
353
354                 endlessConsumer =
355                                 new EndlessConsumer<>(
356                                                 executor,
357                                                 properties.getClientId(),
358                                                 properties.getTopic(),
359                                                 kafkaConsumer,
360                                                 rebalanceListener,
361                                                 captureOffsetAndExecuteTestHandler);
362
363                 endlessConsumer.start();
364         }
365
366         @AfterEach
367         public void deinit()
368         {
369                 try
370                 {
371                         endlessConsumer.stop();
372                         testRecordProducer.close();
373                         offsetConsumer.close();
374                 }
375                 catch (Exception e)
376                 {
377                         log.info("Exception while stopping the consumer: {}", e.toString());
378                 }
379         }
380
381
382         @TestConfiguration
383         @Import(ApplicationConfiguration.class)
384         public static class Configuration
385         {
386         }
387 }