a8fa7ea22b9c21d7c16fad357cc1eff27a130daa
[demos/kafka/training] / src / test / java / de / juplo / kafka / GenericApplicationTests.java
1 package de.juplo.kafka;
2
3 import com.mongodb.client.MongoClient;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.clients.consumer.ConsumerRecord;
6 import org.apache.kafka.clients.consumer.KafkaConsumer;
7 import org.apache.kafka.clients.producer.KafkaProducer;
8 import org.apache.kafka.clients.producer.ProducerRecord;
9 import org.apache.kafka.common.TopicPartition;
10 import org.apache.kafka.common.errors.RecordDeserializationException;
11 import org.apache.kafka.common.serialization.*;
12 import org.apache.kafka.common.utils.Bytes;
13 import org.junit.jupiter.api.*;
14 import org.springframework.beans.factory.annotation.Autowired;
15 import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
16 import org.springframework.boot.autoconfigure.mongo.MongoProperties;
17 import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo;
18 import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer;
19 import org.springframework.boot.test.context.TestConfiguration;
20 import org.springframework.context.annotation.Import;
21 import org.springframework.kafka.test.context.EmbeddedKafka;
22 import org.springframework.test.context.TestPropertySource;
23 import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
24
25 import java.time.Duration;
26 import java.util.*;
27 import java.util.concurrent.ExecutorService;
28 import java.util.function.BiConsumer;
29 import java.util.function.Consumer;
30 import java.util.stream.Collectors;
31 import java.util.stream.IntStream;
32
33 import static de.juplo.kafka.GenericApplicationTests.PARTITIONS;
34 import static de.juplo.kafka.GenericApplicationTests.TOPIC;
35 import static org.assertj.core.api.Assertions.*;
36 import static org.awaitility.Awaitility.*;
37
38
39 @SpringJUnitConfig(initializers = ConfigDataApplicationContextInitializer.class)
40 @TestPropertySource(
41                 properties = {
42                                 "sumup.adder.bootstrap-server=${spring.embedded.kafka.brokers}",
43                                 "sumup.adder.topic=" + TOPIC,
44                                 "sumup.adder.commit-interval=500ms",
45                                 "spring.mongodb.embedded.version=4.4.13" })
46 @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
47 @EnableAutoConfiguration
48 @AutoConfigureDataMongo
49 @Slf4j
50 abstract class GenericApplicationTests<K, V>
51 {
52         public static final String TOPIC = "FOO";
53         public static final int PARTITIONS = 10;
54
55
56         @Autowired
57         KafkaConsumer<K, V> kafkaConsumer;
58         @Autowired
59         Consumer<ConsumerRecord<K, V>> consumer;
60         @Autowired
61         ApplicationProperties properties;
62         @Autowired
63         ExecutorService executor;
64         @Autowired
65         StateRepository stateRepository;
66         @Autowired
67         MongoClient mongoClient;
68         @Autowired
69         MongoProperties mongoProperties;
70         @Autowired
71         RebalanceListener rebalanceListener;
72         @Autowired
73         RecordHandler<K, V> recordHandler;
74
75         KafkaProducer<Bytes, Bytes> testRecordProducer;
76         KafkaConsumer<Bytes, Bytes> offsetConsumer;
77         EndlessConsumer<K, V> endlessConsumer;
78         Map<TopicPartition, Long> oldOffsets;
79         Map<TopicPartition, Long> seenOffsets;
80         Set<ConsumerRecord<K, V>> receivedRecords;
81
82
83         final RecordGenerator recordGenerator;
84         final Consumer<ProducerRecord<Bytes, Bytes>> messageSender;
85
86         public GenericApplicationTests(RecordGenerator recordGenerator)
87         {
88                 this.recordGenerator = recordGenerator;
89                 this.messageSender = (record) -> sendMessage(record);
90         }
91
92
93         /** Tests methods */
94
95         @Test
96         void commitsCurrentOffsetsOnSuccess() throws Exception
97         {
98                 int numberOfGeneratedMessages =
99                                 recordGenerator.generate(false, false, messageSender);
100
101                 await(numberOfGeneratedMessages + " records received")
102                                 .atMost(Duration.ofSeconds(30))
103                                 .pollInterval(Duration.ofSeconds(1))
104                                 .until(() -> receivedRecords.size() >= numberOfGeneratedMessages);
105
106                 await("Offsets committed")
107                                 .atMost(Duration.ofSeconds(10))
108                                 .pollInterval(Duration.ofSeconds(1))
109                                 .untilAsserted(() ->
110                                 {
111                                         checkSeenOffsetsForProgress();
112                                         assertSeenOffsetsEqualCommittedOffsets(seenOffsets);
113                                 });
114
115                 assertThatExceptionOfType(IllegalStateException.class)
116                                 .isThrownBy(() -> endlessConsumer.exitStatus())
117                                 .describedAs("Consumer should still be running");
118
119                 endlessConsumer.stop();
120                 recordGenerator.assertBusinessLogic();
121         }
122
123         @Test
124         @SkipWhenErrorCannotBeGenerated(poisonPill = true)
125         void commitsOffsetOfErrorForReprocessingOnDeserializationError()
126         {
127                 int numberOfGeneratedMessages =
128                                 recordGenerator.generate(true, false, messageSender);
129
130                 await("Consumer failed")
131                                 .atMost(Duration.ofSeconds(30))
132                                 .pollInterval(Duration.ofSeconds(1))
133                                 .until(() -> !endlessConsumer.running());
134
135                 checkSeenOffsetsForProgress();
136                 assertSeenOffsetsEqualCommittedOffsets(seenOffsets);
137
138                 endlessConsumer.start();
139                 await("Consumer failed")
140                                 .atMost(Duration.ofSeconds(30))
141                                 .pollInterval(Duration.ofSeconds(1))
142                                 .until(() -> !endlessConsumer.running());
143
144                 checkSeenOffsetsForProgress();
145                 assertSeenOffsetsEqualCommittedOffsets(seenOffsets);
146                 assertThat(receivedRecords.size())
147                                 .describedAs("Received not all sent events")
148                                 .isLessThan(numberOfGeneratedMessages);
149
150                 assertThatNoException()
151                                 .describedAs("Consumer should not be running")
152                                 .isThrownBy(() -> endlessConsumer.exitStatus());
153                 assertThat(endlessConsumer.exitStatus())
154                                 .describedAs("Consumer should have exited abnormally")
155                                 .containsInstanceOf(RecordDeserializationException.class);
156
157                 recordGenerator.assertBusinessLogic();
158         }
159
160         @Test
161         @SkipWhenErrorCannotBeGenerated(logicError = true)
162         void doesNotCommitOffsetsOnLogicError()
163         {
164                 int numberOfGeneratedMessages =
165                                 recordGenerator.generate(false, true, messageSender);
166
167                 await("Consumer failed")
168                                 .atMost(Duration.ofSeconds(30))
169                                 .pollInterval(Duration.ofSeconds(1))
170                                 .until(() -> !endlessConsumer.running());
171
172                 checkSeenOffsetsForProgress();
173                 assertSeenOffsetsAreBehindCommittedOffsets(seenOffsets);
174
175                 endlessConsumer.start();
176                 await("Consumer failed")
177                                 .atMost(Duration.ofSeconds(30))
178                                 .pollInterval(Duration.ofSeconds(1))
179                                 .until(() -> !endlessConsumer.running());
180
181                 assertSeenOffsetsAreBehindCommittedOffsets(seenOffsets);
182
183                 assertThatNoException()
184                                 .describedAs("Consumer should not be running")
185                                 .isThrownBy(() -> endlessConsumer.exitStatus());
186                 assertThat(endlessConsumer.exitStatus())
187                                 .describedAs("Consumer should have exited abnormally")
188                                 .containsInstanceOf(RuntimeException.class);
189
190                 recordGenerator.assertBusinessLogic();
191         }
192
193
194         /** Helper methods for the verification of expectations */
195
196         void assertSeenOffsetsEqualCommittedOffsets(Map<TopicPartition, Long> offsetsToCheck)
197         {
198                 doForCurrentOffsets((tp, offset) ->
199                 {
200                         Long expected = offsetsToCheck.get(tp) + 1;
201                         log.debug("Checking, if the offset {} for {} is exactly {}", offset, tp, expected);
202                         assertThat(offset)
203                                         .describedAs("Committed offset corresponds to the offset of the consumer")
204                                         .isEqualTo(expected);
205                 });
206         }
207
208         void assertSeenOffsetsAreBehindCommittedOffsets(Map<TopicPartition, Long> offsetsToCheck)
209         {
210                 List<Boolean> isOffsetBehindSeen = new LinkedList<>();
211
212                 doForCurrentOffsets((tp, offset) ->
213                 {
214                         Long expected = offsetsToCheck.get(tp) + 1;
215                         log.debug("Checking, if the offset {} for {} is at most {}", offset, tp, expected);
216                         assertThat(offset)
217                                         .describedAs("Committed offset must be at most equal to the offset of the consumer")
218                                         .isLessThanOrEqualTo(expected);
219                         isOffsetBehindSeen.add(offset < expected);
220                 });
221
222                 assertThat(isOffsetBehindSeen.stream().reduce(false, (result, next) -> result | next))
223                                 .describedAs("Committed offsets are behind seen offsets")
224                                 .isTrue();
225         }
226
227         void checkSeenOffsetsForProgress()
228         {
229                 // Be sure, that some messages were consumed...!
230                 Set<TopicPartition> withProgress = new HashSet<>();
231                 partitions().forEach(tp ->
232                 {
233                         Long oldOffset = oldOffsets.get(tp) + 1;
234                         Long newOffset = seenOffsets.get(tp) + 1;
235                         if (!oldOffset.equals(newOffset))
236                         {
237                                 log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset);
238                                 withProgress.add(tp);
239                         }
240                 });
241                 assertThat(withProgress)
242                                 .describedAs("Some offsets must have changed, compared to the old offset-positions")
243                                 .isNotEmpty();
244         }
245
246
247         /** Helper methods for setting up and running the tests */
248
249         void seekToEnd()
250         {
251                 offsetConsumer.assign(partitions());
252                 partitions().forEach(tp ->
253                 {
254                         Long offset = offsetConsumer.position(tp);
255                         log.info("New position for {}: {}", tp, offset);
256                         Integer partition = tp.partition();
257                         StateDocument document =
258                                         stateRepository
259                                                         .findById(partition.toString())
260                                                         .orElse(new StateDocument(partition));
261                         document.offset = offset;
262                         stateRepository.save(document);
263                 });
264                 offsetConsumer.unsubscribe();
265         }
266
267         void doForCurrentOffsets(BiConsumer<TopicPartition, Long> consumer)
268         {
269                 partitions().forEach(tp ->
270                 {
271                         String partition = Integer.toString(tp.partition());
272                         Optional<Long> offset = stateRepository.findById(partition).map(document -> document.offset);
273                         consumer.accept(tp, offset.orElse(0l));
274                 });
275         }
276
277         List<TopicPartition> partitions()
278         {
279                 return
280                                 IntStream
281                                                 .range(0, PARTITIONS)
282                                                 .mapToObj(partition -> new TopicPartition(TOPIC, partition))
283                                                 .collect(Collectors.toList());
284         }
285
286
287         public interface RecordGenerator
288         {
289                 int generate(
290                                 boolean poisonPills,
291                                 boolean logicErrors,
292                                 Consumer<ProducerRecord<Bytes, Bytes>> messageSender);
293
294                 default boolean canGeneratePoisonPill()
295                 {
296                         return true;
297                 }
298
299                 default boolean canGenerateLogicError()
300                 {
301                         return true;
302                 }
303
304                 default void assertBusinessLogic()
305                 {
306                         log.debug("No business-logic to assert");
307                 }
308         }
309
310         void sendMessage(ProducerRecord<Bytes, Bytes> record)
311         {
312                 testRecordProducer.send(record, (metadata, e) ->
313                 {
314                         if (metadata != null)
315                         {
316                                 log.debug(
317                                                 "{}|{} - {}={}",
318                                                 metadata.partition(),
319                                                 metadata.offset(),
320                                                 record.key(),
321                                                 record.value());
322                         }
323                         else
324                         {
325                                 log.warn(
326                                                 "Exception for {}={}: {}",
327                                                 record.key(),
328                                                 record.value(),
329                                                 e.toString());
330                         }
331                 });
332         }
333
334
335         @BeforeEach
336         public void init()
337         {
338                 Properties props;
339                 props = new Properties();
340                 props.put("bootstrap.servers", properties.getBootstrapServer());
341                 props.put("linger.ms", 100);
342                 props.put("key.serializer", BytesSerializer.class.getName());
343                 props.put("value.serializer", BytesSerializer.class.getName());
344                 testRecordProducer = new KafkaProducer<>(props);
345
346                 props = new Properties();
347                 props.put("bootstrap.servers", properties.getBootstrapServer());
348                 props.put("client.id", "OFFSET-CONSUMER");
349                 props.put("group.id", properties.getGroupId());
350                 props.put("key.deserializer", BytesDeserializer.class.getName());
351                 props.put("value.deserializer", BytesDeserializer.class.getName());
352                 offsetConsumer = new KafkaConsumer<>(props);
353
354                 mongoClient.getDatabase(mongoProperties.getDatabase()).drop();
355                 seekToEnd();
356
357                 oldOffsets = new HashMap<>();
358                 seenOffsets = new HashMap<>();
359                 receivedRecords = new HashSet<>();
360
361                 doForCurrentOffsets((tp, offset) ->
362                 {
363                         oldOffsets.put(tp, offset - 1);
364                         seenOffsets.put(tp, offset - 1);
365                 });
366
367                 TestRecordHandler<K, V> captureOffsetAndExecuteTestHandler =
368                                 new TestRecordHandler<K, V>(recordHandler)
369                                 {
370                                         @Override
371                                         public void onNewRecord(ConsumerRecord<K, V> record)
372                                         {
373                                                 seenOffsets.put(
374                                                                 new TopicPartition(record.topic(), record.partition()),
375                                                                 record.offset());
376                                                 receivedRecords.add(record);
377                                         }
378                                 };
379
380                 endlessConsumer =
381                                 new EndlessConsumer<>(
382                                                 executor,
383                                                 properties.getClientId(),
384                                                 properties.getTopic(),
385                                                 kafkaConsumer,
386                                                 rebalanceListener,
387                                                 captureOffsetAndExecuteTestHandler);
388
389                 endlessConsumer.start();
390         }
391
392         @AfterEach
393         public void deinit()
394         {
395                 try
396                 {
397                         testRecordProducer.close();
398                         offsetConsumer.close();
399                 }
400                 catch (Exception e)
401                 {
402                         log.info("Exception while stopping the consumer: {}", e.toString());
403                 }
404         }
405
406
407         @TestConfiguration
408         @Import(ApplicationConfiguration.class)
409         public static class Configuration
410         {
411         }
412 }