test: Überprüfung der Fachlogik ergänzt
[demos/kafka/training] / src / test / java / de / juplo / kafka / GenericApplicationTests.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.consumer.ConsumerRecord;
5 import org.apache.kafka.clients.consumer.KafkaConsumer;
6 import org.apache.kafka.clients.producer.KafkaProducer;
7 import org.apache.kafka.clients.producer.ProducerRecord;
8 import org.apache.kafka.common.TopicPartition;
9 import org.apache.kafka.common.errors.RecordDeserializationException;
10 import org.apache.kafka.common.serialization.*;
11 import org.apache.kafka.common.utils.Bytes;
12 import org.junit.jupiter.api.*;
13 import org.springframework.beans.factory.annotation.Autowired;
14 import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
15 import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo;
16 import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer;
17 import org.springframework.boot.test.context.TestConfiguration;
18 import org.springframework.context.annotation.Import;
19 import org.springframework.kafka.test.context.EmbeddedKafka;
20 import org.springframework.test.context.TestPropertySource;
21 import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
22
23 import java.time.Duration;
24 import java.util.*;
25 import java.util.concurrent.ExecutorService;
26 import java.util.function.BiConsumer;
27 import java.util.function.Consumer;
28 import java.util.stream.Collectors;
29 import java.util.stream.IntStream;
30
31 import static de.juplo.kafka.GenericApplicationTests.PARTITIONS;
32 import static de.juplo.kafka.GenericApplicationTests.TOPIC;
33 import static org.assertj.core.api.Assertions.*;
34 import static org.awaitility.Awaitility.*;
35
36
37 @SpringJUnitConfig(initializers = ConfigDataApplicationContextInitializer.class)
38 @TestPropertySource(
39                 properties = {
40                                 "sumup.adder.bootstrap-server=${spring.embedded.kafka.brokers}",
41                                 "sumup.adder.topic=" + TOPIC,
42                                 "sumup.adder.commit-interval=1s",
43                                 "spring.mongodb.embedded.version=4.4.13" })
44 @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
45 @EnableAutoConfiguration
46 @AutoConfigureDataMongo
47 @Slf4j
48 abstract class GenericApplicationTests<K, V>
49 {
50         public static final String TOPIC = "FOO";
51         public static final int PARTITIONS = 10;
52
53
54         @Autowired
55         KafkaConsumer<K, V> kafkaConsumer;
56         @Autowired
57         Consumer<ConsumerRecord<K, V>> consumer;
58         @Autowired
59         ApplicationProperties properties;
60         @Autowired
61         ExecutorService executor;
62         @Autowired
63         StateRepository stateRepository;
64         @Autowired
65         PollIntervalAwareConsumerRebalanceListener rebalanceListener;
66         @Autowired
67         RecordHandler<K, V> recordHandler;
68
69         KafkaProducer<Bytes, Bytes> testRecordProducer;
70         KafkaConsumer<Bytes, Bytes> offsetConsumer;
71         EndlessConsumer<K, V> endlessConsumer;
72         Map<TopicPartition, Long> oldOffsets;
73         Map<TopicPartition, Long> newOffsets;
74         Set<ConsumerRecord<K, V>> receivedRecords;
75
76
77         final RecordGenerator recordGenerator;
78         final Consumer<ProducerRecord<Bytes, Bytes>> messageSender;
79
80         public GenericApplicationTests(RecordGenerator recordGenerator)
81         {
82                 this.recordGenerator = recordGenerator;
83                 this.messageSender = (record) -> sendMessage(record);
84         }
85
86
87         /** Tests methods */
88
89         @Test
90         void commitsCurrentOffsetsOnSuccess()
91         {
92                 int numberOfGeneratedMessages =
93                                 recordGenerator.generate(false, false, messageSender);
94
95                 await(numberOfGeneratedMessages + " records received")
96                                 .atMost(Duration.ofSeconds(30))
97                                 .pollInterval(Duration.ofSeconds(1))
98                                 .until(() -> receivedRecords.size() >= numberOfGeneratedMessages);
99
100                 await("Offsets committed")
101                                 .atMost(Duration.ofSeconds(10))
102                                 .pollInterval(Duration.ofSeconds(1))
103                                 .untilAsserted(() ->
104                                 {
105                                         checkSeenOffsetsForProgress();
106                                         compareToCommitedOffsets(newOffsets);
107                                 });
108
109                 assertThatExceptionOfType(IllegalStateException.class)
110                                 .isThrownBy(() -> endlessConsumer.exitStatus())
111                                 .describedAs("Consumer should still be running");
112
113                 recordGenerator.assertBusinessLogic();
114         }
115
116         @Test
117         @SkipWhenErrorCannotBeGenerated(poisonPill = true)
118         void commitsOffsetOfErrorForReprocessingOnDeserializationError()
119         {
120                 int numberOfGeneratedMessages =
121                                 recordGenerator.generate(true, false, messageSender);
122
123                 await("Consumer failed")
124                                 .atMost(Duration.ofSeconds(30))
125                                 .pollInterval(Duration.ofSeconds(1))
126                                 .until(() -> !endlessConsumer.running());
127
128                 checkSeenOffsetsForProgress();
129                 compareToCommitedOffsets(newOffsets);
130
131                 endlessConsumer.start();
132                 await("Consumer failed")
133                                 .atMost(Duration.ofSeconds(30))
134                                 .pollInterval(Duration.ofSeconds(1))
135                                 .until(() -> !endlessConsumer.running());
136
137                 checkSeenOffsetsForProgress();
138                 compareToCommitedOffsets(newOffsets);
139                 assertThat(receivedRecords.size())
140                                 .describedAs("Received not all sent events")
141                                 .isLessThan(numberOfGeneratedMessages);
142
143                 assertThatNoException()
144                                 .describedAs("Consumer should not be running")
145                                 .isThrownBy(() -> endlessConsumer.exitStatus());
146                 assertThat(endlessConsumer.exitStatus())
147                                 .describedAs("Consumer should have exited abnormally")
148                                 .containsInstanceOf(RecordDeserializationException.class);
149
150                 recordGenerator.assertBusinessLogic();
151         }
152
153         @Test
154         @SkipWhenErrorCannotBeGenerated(logicError = true)
155         void doesNotCommitOffsetsOnLogicError()
156         {
157                 int numberOfGeneratedMessages =
158                                 recordGenerator.generate(false, true, messageSender);
159
160                 await("Consumer failed")
161                                 .atMost(Duration.ofSeconds(30))
162                                 .pollInterval(Duration.ofSeconds(1))
163                                 .until(() -> !endlessConsumer.running());
164
165                 checkSeenOffsetsForProgress();
166                 compareToCommitedOffsets(oldOffsets);
167
168                 endlessConsumer.start();
169                 await("Consumer failed")
170                                 .atMost(Duration.ofSeconds(30))
171                                 .pollInterval(Duration.ofSeconds(1))
172                                 .until(() -> !endlessConsumer.running());
173
174                 checkSeenOffsetsForProgress();
175                 compareToCommitedOffsets(oldOffsets);
176                 assertThat(receivedRecords.size())
177                                 .describedAs("Received not all sent events")
178                                 .isLessThan(numberOfGeneratedMessages);
179
180                 assertThatNoException()
181                                 .describedAs("Consumer should not be running")
182                                 .isThrownBy(() -> endlessConsumer.exitStatus());
183                 assertThat(endlessConsumer.exitStatus())
184                                 .describedAs("Consumer should have exited abnormally")
185                                 .containsInstanceOf(RuntimeException.class);
186
187                 recordGenerator.assertBusinessLogic();
188         }
189
190
191         /** Helper methods for the verification of expectations */
192
193         void compareToCommitedOffsets(Map<TopicPartition, Long> offsetsToCheck)
194         {
195                 doForCurrentOffsets((tp, offset) ->
196                 {
197                         Long expected = offsetsToCheck.get(tp) + 1;
198                         log.debug("Checking, if the offset for {} is {}", tp, expected);
199                         assertThat(offset)
200                                         .describedAs("Committed offset corresponds to the offset of the consumer")
201                                         .isEqualTo(expected);
202                 });
203         }
204
205         void checkSeenOffsetsForProgress()
206         {
207                 // Be sure, that some messages were consumed...!
208                 Set<TopicPartition> withProgress = new HashSet<>();
209                 partitions().forEach(tp ->
210                 {
211                         Long oldOffset = oldOffsets.get(tp) + 1;
212                         Long newOffset = newOffsets.get(tp) + 1;
213                         if (!oldOffset.equals(newOffset))
214                         {
215                                 log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset);
216                                 withProgress.add(tp);
217                         }
218                 });
219                 assertThat(withProgress)
220                                 .describedAs("Some offsets must have changed, compared to the old offset-positions")
221                                 .isNotEmpty();
222         }
223
224
225         /** Helper methods for setting up and running the tests */
226
227         void seekToEnd()
228         {
229                 offsetConsumer.assign(partitions());
230                 partitions().forEach(tp ->
231                 {
232                         Long offset = offsetConsumer.position(tp);
233                         log.info("New position for {}: {}", tp, offset);
234                         Integer partition = tp.partition();
235                         StateDocument document =
236                                         stateRepository
237                                                         .findById(partition.toString())
238                                                         .orElse(new StateDocument(partition));
239                         document.offset = offset;
240                         stateRepository.save(document);
241                 });
242                 offsetConsumer.unsubscribe();
243         }
244
245         void doForCurrentOffsets(BiConsumer<TopicPartition, Long> consumer)
246         {
247                 partitions().forEach(tp ->
248                 {
249                         String partition = Integer.toString(tp.partition());
250                         Optional<Long> offset = stateRepository.findById(partition).map(document -> document.offset);
251                         consumer.accept(tp, offset.orElse(0l));
252                 });
253         }
254
255         List<TopicPartition> partitions()
256         {
257                 return
258                                 IntStream
259                                                 .range(0, PARTITIONS)
260                                                 .mapToObj(partition -> new TopicPartition(TOPIC, partition))
261                                                 .collect(Collectors.toList());
262         }
263
264
265         public interface RecordGenerator
266         {
267                 int generate(
268                                 boolean poisonPills,
269                                 boolean logicErrors,
270                                 Consumer<ProducerRecord<Bytes, Bytes>> messageSender);
271
272                 default boolean canGeneratePoisonPill()
273                 {
274                         return true;
275                 }
276
277                 default boolean canGenerateLogicError()
278                 {
279                         return true;
280                 }
281
282                 default void assertBusinessLogic()
283                 {
284                         log.debug("No business-logic to assert");
285                 }
286         }
287
288         void sendMessage(ProducerRecord<Bytes, Bytes> record)
289         {
290                 testRecordProducer.send(record, (metadata, e) ->
291                 {
292                         if (metadata != null)
293                         {
294                                 log.debug(
295                                                 "{}|{} - {}={}",
296                                                 metadata.partition(),
297                                                 metadata.offset(),
298                                                 record.key(),
299                                                 record.value());
300                         }
301                         else
302                         {
303                                 log.warn(
304                                                 "Exception for {}={}: {}",
305                                                 record.key(),
306                                                 record.value(),
307                                                 e.toString());
308                         }
309                 });
310         }
311
312
313         @BeforeEach
314         public void init()
315         {
316                 Properties props;
317                 props = new Properties();
318                 props.put("bootstrap.servers", properties.getBootstrapServer());
319                 props.put("linger.ms", 100);
320                 props.put("key.serializer", BytesSerializer.class.getName());
321                 props.put("value.serializer", BytesSerializer.class.getName());
322                 testRecordProducer = new KafkaProducer<>(props);
323
324                 props = new Properties();
325                 props.put("bootstrap.servers", properties.getBootstrapServer());
326                 props.put("client.id", "OFFSET-CONSUMER");
327                 props.put("group.id", properties.getGroupId());
328                 props.put("key.deserializer", BytesDeserializer.class.getName());
329                 props.put("value.deserializer", BytesDeserializer.class.getName());
330                 offsetConsumer = new KafkaConsumer<>(props);
331
332                 seekToEnd();
333
334                 oldOffsets = new HashMap<>();
335                 newOffsets = new HashMap<>();
336                 receivedRecords = new HashSet<>();
337
338                 doForCurrentOffsets((tp, offset) ->
339                 {
340                         oldOffsets.put(tp, offset - 1);
341                         newOffsets.put(tp, offset - 1);
342                 });
343
344                 TestRecordHandler<K, V> captureOffsetAndExecuteTestHandler =
345                                 new TestRecordHandler<K, V>(recordHandler)
346                                 {
347                                         @Override
348                                         public void onNewRecord(ConsumerRecord<K, V> record)
349                                         {
350                                                 newOffsets.put(
351                                                                 new TopicPartition(record.topic(), record.partition()),
352                                                                 record.offset());
353                                                 receivedRecords.add(record);
354                                         }
355                                 };
356
357                 endlessConsumer =
358                                 new EndlessConsumer<>(
359                                                 executor,
360                                                 properties.getClientId(),
361                                                 properties.getTopic(),
362                                                 kafkaConsumer,
363                                                 rebalanceListener,
364                                                 captureOffsetAndExecuteTestHandler);
365
366                 endlessConsumer.start();
367         }
368
369         @AfterEach
370         public void deinit()
371         {
372                 try
373                 {
374                         endlessConsumer.stop();
375                         testRecordProducer.close();
376                         offsetConsumer.close();
377                 }
378                 catch (Exception e)
379                 {
380                         log.info("Exception while stopping the consumer: {}", e.toString());
381                 }
382         }
383
384
385         @TestConfiguration
386         @Import(ApplicationConfiguration.class)
387         public static class Configuration
388         {
389         }
390 }