GRÜN: (ungewollt!) - Unabhängigkeit der Tests wieder hergestellt
[demos/kafka/training] / src / test / java / de / juplo / kafka / GenericApplicationTests.java
1 package de.juplo.kafka;
2
3 import com.mongodb.client.MongoClient;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.clients.consumer.ConsumerRecord;
6 import org.apache.kafka.clients.consumer.KafkaConsumer;
7 import org.apache.kafka.clients.producer.KafkaProducer;
8 import org.apache.kafka.clients.producer.ProducerRecord;
9 import org.apache.kafka.common.TopicPartition;
10 import org.apache.kafka.common.errors.RecordDeserializationException;
11 import org.apache.kafka.common.serialization.*;
12 import org.apache.kafka.common.utils.Bytes;
13 import org.junit.jupiter.api.*;
14 import org.springframework.beans.factory.annotation.Autowired;
15 import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
16 import org.springframework.boot.autoconfigure.mongo.MongoProperties;
17 import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo;
18 import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer;
19 import org.springframework.boot.test.context.TestConfiguration;
20 import org.springframework.context.annotation.Import;
21 import org.springframework.kafka.test.context.EmbeddedKafka;
22 import org.springframework.test.context.TestPropertySource;
23 import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
24
25 import java.time.Duration;
26 import java.util.*;
27 import java.util.concurrent.ExecutorService;
28 import java.util.function.BiConsumer;
29 import java.util.function.Consumer;
30 import java.util.stream.Collectors;
31 import java.util.stream.IntStream;
32
33 import static de.juplo.kafka.GenericApplicationTests.PARTITIONS;
34 import static de.juplo.kafka.GenericApplicationTests.TOPIC;
35 import static org.assertj.core.api.Assertions.*;
36 import static org.awaitility.Awaitility.*;
37
38
39 @SpringJUnitConfig(initializers = ConfigDataApplicationContextInitializer.class)
40 @TestPropertySource(
41                 properties = {
42                                 "sumup.adder.bootstrap-server=${spring.embedded.kafka.brokers}",
43                                 "sumup.adder.topic=" + TOPIC,
44                                 "sumup.adder.commit-interval=1s",
45                                 "spring.mongodb.embedded.version=4.4.13" })
46 @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
47 @EnableAutoConfiguration
48 @AutoConfigureDataMongo
49 @Slf4j
50 abstract class GenericApplicationTests<K, V>
51 {
52         public static final String TOPIC = "FOO";
53         public static final int PARTITIONS = 10;
54
55
56         @Autowired
57         KafkaConsumer<K, V> kafkaConsumer;
58         @Autowired
59         Consumer<ConsumerRecord<K, V>> consumer;
60         @Autowired
61         ApplicationProperties properties;
62         @Autowired
63         ExecutorService executor;
64         @Autowired
65         MongoClient mongoClient;
66         @Autowired
67         MongoProperties mongoProperties;
68         @Autowired
69         PollIntervalAwareConsumerRebalanceListener rebalanceListener;
70         @Autowired
71         RecordHandler<K, V> recordHandler;
72
73         KafkaProducer<Bytes, Bytes> testRecordProducer;
74         KafkaConsumer<Bytes, Bytes> offsetConsumer;
75         EndlessConsumer<K, V> endlessConsumer;
76         Map<TopicPartition, Long> oldOffsets;
77         Map<TopicPartition, Long> newOffsets;
78         Set<ConsumerRecord<K, V>> receivedRecords;
79
80
81         final RecordGenerator recordGenerator;
82         final Consumer<ProducerRecord<Bytes, Bytes>> messageSender;
83
84         public GenericApplicationTests(RecordGenerator recordGenerator)
85         {
86                 this.recordGenerator = recordGenerator;
87                 this.messageSender = (record) -> sendMessage(record);
88         }
89
90
91         /** Tests methods */
92
93         @Test
94         void commitsCurrentOffsetsOnSuccess()
95         {
96                 int numberOfGeneratedMessages =
97                                 recordGenerator.generate(false, false, messageSender);
98
99                 await(numberOfGeneratedMessages + " records received")
100                                 .atMost(Duration.ofSeconds(30))
101                                 .pollInterval(Duration.ofSeconds(1))
102                                 .until(() -> receivedRecords.size() >= numberOfGeneratedMessages);
103
104                 await("Offsets committed")
105                                 .atMost(Duration.ofSeconds(10))
106                                 .pollInterval(Duration.ofSeconds(1))
107                                 .untilAsserted(() ->
108                                 {
109                                         checkSeenOffsetsForProgress();
110                                         compareToCommitedOffsets(newOffsets);
111                                 });
112
113                 assertThatExceptionOfType(IllegalStateException.class)
114                                 .isThrownBy(() -> endlessConsumer.exitStatus())
115                                 .describedAs("Consumer should still be running");
116
117                 recordGenerator.assertBusinessLogic();
118         }
119
120         @Test
121         @SkipWhenErrorCannotBeGenerated(poisonPill = true)
122         void commitsOffsetOfErrorForReprocessingOnDeserializationError()
123         {
124                 int numberOfGeneratedMessages =
125                                 recordGenerator.generate(true, false, messageSender);
126
127                 await("Consumer failed")
128                                 .atMost(Duration.ofSeconds(30))
129                                 .pollInterval(Duration.ofSeconds(1))
130                                 .until(() -> !endlessConsumer.running());
131
132                 checkSeenOffsetsForProgress();
133                 compareToCommitedOffsets(newOffsets);
134
135                 endlessConsumer.start();
136                 await("Consumer failed")
137                                 .atMost(Duration.ofSeconds(30))
138                                 .pollInterval(Duration.ofSeconds(1))
139                                 .until(() -> !endlessConsumer.running());
140
141                 checkSeenOffsetsForProgress();
142                 compareToCommitedOffsets(newOffsets);
143                 assertThat(receivedRecords.size())
144                                 .describedAs("Received not all sent events")
145                                 .isLessThan(numberOfGeneratedMessages);
146
147                 assertThatNoException()
148                                 .describedAs("Consumer should not be running")
149                                 .isThrownBy(() -> endlessConsumer.exitStatus());
150                 assertThat(endlessConsumer.exitStatus())
151                                 .describedAs("Consumer should have exited abnormally")
152                                 .containsInstanceOf(RecordDeserializationException.class);
153
154                 recordGenerator.assertBusinessLogic();
155         }
156
157         @Test
158         @SkipWhenErrorCannotBeGenerated(logicError = true)
159         void doesNotCommitOffsetsOnLogicError()
160         {
161                 int numberOfGeneratedMessages =
162                                 recordGenerator.generate(false, true, messageSender);
163
164                 await("Consumer failed")
165                                 .atMost(Duration.ofSeconds(30))
166                                 .pollInterval(Duration.ofSeconds(1))
167                                 .until(() -> !endlessConsumer.running());
168
169                 checkSeenOffsetsForProgress();
170                 compareToCommitedOffsets(oldOffsets);
171
172                 endlessConsumer.start();
173                 await("Consumer failed")
174                                 .atMost(Duration.ofSeconds(30))
175                                 .pollInterval(Duration.ofSeconds(1))
176                                 .until(() -> !endlessConsumer.running());
177
178                 checkSeenOffsetsForProgress();
179                 compareToCommitedOffsets(oldOffsets);
180                 assertThat(receivedRecords.size())
181                                 .describedAs("Received not all sent events")
182                                 .isLessThan(numberOfGeneratedMessages);
183
184                 assertThatNoException()
185                                 .describedAs("Consumer should not be running")
186                                 .isThrownBy(() -> endlessConsumer.exitStatus());
187                 assertThat(endlessConsumer.exitStatus())
188                                 .describedAs("Consumer should have exited abnormally")
189                                 .containsInstanceOf(RuntimeException.class);
190
191                 recordGenerator.assertBusinessLogic();
192         }
193
194
195         /** Helper methods for the verification of expectations */
196
197         void compareToCommitedOffsets(Map<TopicPartition, Long> offsetsToCheck)
198         {
199                 doForCurrentOffsets((tp, offset) ->
200                 {
201                         Long expected = offsetsToCheck.get(tp) + 1;
202                         log.debug("Checking, if the offset for {} is {}", tp, expected);
203                         assertThat(offset)
204                                         .describedAs("Committed offset corresponds to the offset of the consumer")
205                                         .isEqualTo(expected);
206                 });
207         }
208
209         void checkSeenOffsetsForProgress()
210         {
211                 // Be sure, that some messages were consumed...!
212                 Set<TopicPartition> withProgress = new HashSet<>();
213                 partitions().forEach(tp ->
214                 {
215                         Long oldOffset = oldOffsets.get(tp) + 1;
216                         Long newOffset = newOffsets.get(tp) + 1;
217                         if (!oldOffset.equals(newOffset))
218                         {
219                                 log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset);
220                                 withProgress.add(tp);
221                         }
222                 });
223                 assertThat(withProgress)
224                                 .describedAs("Some offsets must have changed, compared to the old offset-positions")
225                                 .isNotEmpty();
226         }
227
228
229         /** Helper methods for setting up and running the tests */
230
231         void seekToEnd()
232         {
233                 offsetConsumer.assign(partitions());
234                 offsetConsumer.seekToEnd(partitions());
235                 partitions().forEach(tp ->
236                 {
237                         // seekToEnd() works lazily: it only takes effect on poll()/position()
238                         Long offset = offsetConsumer.position(tp);
239                         log.info("New position for {}: {}", tp, offset);
240                 });
241                 // The new positions must be commited!
242                 offsetConsumer.commitSync();
243                 offsetConsumer.unsubscribe();
244         }
245
246         void doForCurrentOffsets(BiConsumer<TopicPartition, Long> consumer)
247         {
248                 offsetConsumer.assign(partitions());
249                 partitions().forEach(tp -> consumer.accept(tp, offsetConsumer.position(tp)));
250                 offsetConsumer.unsubscribe();
251         }
252
253         List<TopicPartition> partitions()
254         {
255                 return
256                                 IntStream
257                                                 .range(0, PARTITIONS)
258                                                 .mapToObj(partition -> new TopicPartition(TOPIC, partition))
259                                                 .collect(Collectors.toList());
260         }
261
262
263         public interface RecordGenerator
264         {
265                 int generate(
266                                 boolean poisonPills,
267                                 boolean logicErrors,
268                                 Consumer<ProducerRecord<Bytes, Bytes>> messageSender);
269
270                 default boolean canGeneratePoisonPill()
271                 {
272                         return true;
273                 }
274
275                 default boolean canGenerateLogicError()
276                 {
277                         return true;
278                 }
279
280                 default void assertBusinessLogic()
281                 {
282                         log.debug("No business-logic to assert");
283                 }
284         }
285
286         void sendMessage(ProducerRecord<Bytes, Bytes> record)
287         {
288                 testRecordProducer.send(record, (metadata, e) ->
289                 {
290                         if (metadata != null)
291                         {
292                                 log.debug(
293                                                 "{}|{} - {}={}",
294                                                 metadata.partition(),
295                                                 metadata.offset(),
296                                                 record.key(),
297                                                 record.value());
298                         }
299                         else
300                         {
301                                 log.warn(
302                                                 "Exception for {}={}: {}",
303                                                 record.key(),
304                                                 record.value(),
305                                                 e.toString());
306                         }
307                 });
308         }
309
310
311         @BeforeEach
312         public void init()
313         {
314                 Properties props;
315                 props = new Properties();
316                 props.put("bootstrap.servers", properties.getBootstrapServer());
317                 props.put("linger.ms", 100);
318                 props.put("key.serializer", BytesSerializer.class.getName());
319                 props.put("value.serializer", BytesSerializer.class.getName());
320                 testRecordProducer = new KafkaProducer<>(props);
321
322                 props = new Properties();
323                 props.put("bootstrap.servers", properties.getBootstrapServer());
324                 props.put("client.id", "OFFSET-CONSUMER");
325                 props.put("group.id", properties.getGroupId());
326                 props.put("key.deserializer", BytesDeserializer.class.getName());
327                 props.put("value.deserializer", BytesDeserializer.class.getName());
328                 offsetConsumer = new KafkaConsumer<>(props);
329
330                 seekToEnd();
331
332                 oldOffsets = new HashMap<>();
333                 newOffsets = new HashMap<>();
334                 receivedRecords = new HashSet<>();
335
336                 doForCurrentOffsets((tp, offset) ->
337                 {
338                         oldOffsets.put(tp, offset - 1);
339                         newOffsets.put(tp, offset - 1);
340                 });
341
342                 TestRecordHandler<K, V> captureOffsetAndExecuteTestHandler =
343                                 new TestRecordHandler<K, V>(recordHandler)
344                                 {
345                                         @Override
346                                         public void onNewRecord(ConsumerRecord<K, V> record)
347                                         {
348                                                 newOffsets.put(
349                                                                 new TopicPartition(record.topic(), record.partition()),
350                                                                 record.offset());
351                                                 receivedRecords.add(record);
352                                         }
353                                 };
354
355                 mongoClient.getDatabase(mongoProperties.getDatabase()).drop();
356
357                 endlessConsumer =
358                                 new EndlessConsumer<>(
359                                                 executor,
360                                                 properties.getClientId(),
361                                                 properties.getTopic(),
362                                                 kafkaConsumer,
363                                                 rebalanceListener,
364                                                 captureOffsetAndExecuteTestHandler);
365
366                 endlessConsumer.start();
367         }
368
369         @AfterEach
370         public void deinit()
371         {
372                 try
373                 {
374                         endlessConsumer.stop();
375                         testRecordProducer.close();
376                         offsetConsumer.close();
377                 }
378                 catch (Exception e)
379                 {
380                         log.info("Exception while stopping the consumer: {}", e.toString());
381                 }
382         }
383
384
385         @TestConfiguration
386         @Import(ApplicationConfiguration.class)
387         public static class Configuration
388         {
389         }
390 }