Merge branch 'deserialization' into sumup-adder--ohne--stored-offsets
[demos/kafka/training] / src / test / java / de / juplo / kafka / GenericApplicationTests.java
1 package de.juplo.kafka;
2
3 import com.mongodb.client.MongoClient;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.clients.consumer.ConsumerRecord;
6 import org.apache.kafka.clients.consumer.KafkaConsumer;
7 import org.apache.kafka.clients.producer.KafkaProducer;
8 import org.apache.kafka.clients.producer.ProducerRecord;
9 import org.apache.kafka.common.TopicPartition;
10 import org.apache.kafka.common.errors.RecordDeserializationException;
11 import org.apache.kafka.common.serialization.*;
12 import org.apache.kafka.common.utils.Bytes;
13 import org.junit.jupiter.api.*;
14 import org.springframework.beans.factory.annotation.Autowired;
15 import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
16 import org.springframework.boot.autoconfigure.mongo.MongoProperties;
17 import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo;
18 import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer;
19 import org.springframework.boot.test.context.TestConfiguration;
20 import org.springframework.context.annotation.Bean;
21 import org.springframework.context.annotation.Import;
22 import org.springframework.kafka.test.context.EmbeddedKafka;
23 import org.springframework.test.context.TestPropertySource;
24 import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
25
26 import java.time.Duration;
27 import java.util.*;
28 import java.util.function.BiConsumer;
29 import java.util.function.Consumer;
30 import java.util.stream.Collectors;
31 import java.util.stream.IntStream;
32
33 import static de.juplo.kafka.GenericApplicationTests.PARTITIONS;
34 import static de.juplo.kafka.GenericApplicationTests.TOPIC;
35 import static org.assertj.core.api.Assertions.*;
36 import static org.awaitility.Awaitility.*;
37
38
39 @SpringJUnitConfig(initializers = ConfigDataApplicationContextInitializer.class)
40 @TestPropertySource(
41                 properties = {
42                                 "sumup.adder.bootstrap-server=${spring.embedded.kafka.brokers}",
43                                 "sumup.adder.topic=" + TOPIC,
44                                 "sumup.adder.commit-interval=500ms",
45                                 "spring.mongodb.embedded.version=4.4.13" })
46 @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
47 @EnableAutoConfiguration
48 @AutoConfigureDataMongo
49 @Slf4j
50 abstract class GenericApplicationTests<K, V>
51 {
52         public static final String TOPIC = "FOO";
53         public static final int PARTITIONS = 10;
54
55
56         @Autowired
57         org.apache.kafka.clients.consumer.Consumer<K, V> kafkaConsumer;
58         @Autowired
59         Consumer<ConsumerRecord<K, V>> consumer;
60         @Autowired
61         ApplicationProperties applicationProperties;
62         @Autowired
63         MongoClient mongoClient;
64         @Autowired
65         MongoProperties mongoProperties;
66         @Autowired
67         RebalanceListener rebalanceListener;
68         @Autowired
69         TestRecordHandler<K, V> recordHandler;
70         @Autowired
71         EndlessConsumer<K, V> endlessConsumer;
72
73         KafkaProducer<Bytes, Bytes> testRecordProducer;
74         KafkaConsumer<Bytes, Bytes> offsetConsumer;
75         Map<TopicPartition, Long> oldOffsets;
76
77
78         final RecordGenerator recordGenerator;
79         final Consumer<ProducerRecord<Bytes, Bytes>> messageSender;
80
81         public GenericApplicationTests(RecordGenerator recordGenerator)
82         {
83                 this.recordGenerator = recordGenerator;
84                 this.messageSender = (record) -> sendMessage(record);
85         }
86
87
88         /** Tests methods */
89
90         @Test
91         void commitsCurrentOffsetsOnSuccess() throws Exception
92         {
93                 int numberOfGeneratedMessages =
94                                 recordGenerator.generate(false, false, messageSender);
95
96                 await(numberOfGeneratedMessages + " records received")
97                                 .atMost(Duration.ofSeconds(30))
98                                 .pollInterval(Duration.ofSeconds(1))
99                                 .until(() -> recordHandler.receivedRecords.size() >= numberOfGeneratedMessages);
100
101                 await("Offsets committed")
102                                 .atMost(Duration.ofSeconds(10))
103                                 .pollInterval(Duration.ofSeconds(1))
104                                 .untilAsserted(() ->
105                                 {
106                                         checkSeenOffsetsForProgress();
107                                         assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets);
108                                 });
109
110                 assertThatExceptionOfType(IllegalStateException.class)
111                                 .isThrownBy(() -> endlessConsumer.exitStatus())
112                                 .describedAs("Consumer should still be running");
113
114                 endlessConsumer.stop();
115                 recordGenerator.assertBusinessLogic();
116         }
117
118         @Test
119         @SkipWhenErrorCannotBeGenerated(poisonPill = true)
120         void commitsOffsetOfErrorForReprocessingOnDeserializationError()
121         {
122                 int numberOfGeneratedMessages =
123                                 recordGenerator.generate(true, false, messageSender);
124
125                 await("Consumer failed")
126                                 .atMost(Duration.ofSeconds(30))
127                                 .pollInterval(Duration.ofSeconds(1))
128                                 .until(() -> !endlessConsumer.running());
129
130                 checkSeenOffsetsForProgress();
131                 assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets);
132
133                 endlessConsumer.start();
134                 await("Consumer failed")
135                                 .atMost(Duration.ofSeconds(30))
136                                 .pollInterval(Duration.ofSeconds(1))
137                                 .until(() -> !endlessConsumer.running());
138
139                 checkSeenOffsetsForProgress();
140                 assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets);
141                 assertThat(recordHandler.receivedRecords.size())
142                                 .describedAs("Received not all sent events")
143                                 .isLessThan(numberOfGeneratedMessages);
144
145                 assertThatNoException()
146                                 .describedAs("Consumer should not be running")
147                                 .isThrownBy(() -> endlessConsumer.exitStatus());
148                 assertThat(endlessConsumer.exitStatus())
149                                 .describedAs("Consumer should have exited abnormally")
150                                 .containsInstanceOf(RecordDeserializationException.class);
151
152                 recordGenerator.assertBusinessLogic();
153         }
154
155         @Test
156         @SkipWhenErrorCannotBeGenerated(logicError = true)
157         void doesNotCommitOffsetsOnLogicError()
158         {
159                 int numberOfGeneratedMessages =
160                                 recordGenerator.generate(false, true, messageSender);
161
162                 await("Consumer failed")
163                                 .atMost(Duration.ofSeconds(30))
164                                 .pollInterval(Duration.ofSeconds(1))
165                                 .until(() -> !endlessConsumer.running());
166
167                 checkSeenOffsetsForProgress();
168                 assertSeenOffsetsAreBehindCommittedOffsets(recordHandler.seenOffsets);
169
170                 endlessConsumer.start();
171                 await("Consumer failed")
172                                 .atMost(Duration.ofSeconds(30))
173                                 .pollInterval(Duration.ofSeconds(1))
174                                 .until(() -> !endlessConsumer.running());
175
176                 assertSeenOffsetsAreBehindCommittedOffsets(recordHandler.seenOffsets);
177
178                 assertThatNoException()
179                                 .describedAs("Consumer should not be running")
180                                 .isThrownBy(() -> endlessConsumer.exitStatus());
181                 assertThat(endlessConsumer.exitStatus())
182                                 .describedAs("Consumer should have exited abnormally")
183                                 .containsInstanceOf(RuntimeException.class);
184
185                 recordGenerator.assertBusinessLogic();
186         }
187
188
189         /** Helper methods for the verification of expectations */
190
191         void assertSeenOffsetsEqualCommittedOffsets(Map<TopicPartition, Long> offsetsToCheck)
192         {
193                 doForCurrentOffsets((tp, offset) ->
194                 {
195                         Long expected = offsetsToCheck.get(tp) + 1;
196                         log.debug("Checking, if the offset {} for {} is exactly {}", offset, tp, expected);
197                         assertThat(offset)
198                                         .describedAs("Committed offset corresponds to the offset of the consumer")
199                                         .isEqualTo(expected);
200                 });
201         }
202
203         void assertSeenOffsetsAreBehindCommittedOffsets(Map<TopicPartition, Long> offsetsToCheck)
204         {
205                 List<Boolean> isOffsetBehindSeen = new LinkedList<>();
206
207                 doForCurrentOffsets((tp, offset) ->
208                 {
209                         Long expected = offsetsToCheck.get(tp) + 1;
210                         log.debug("Checking, if the offset {} for {} is at most {}", offset, tp, expected);
211                         assertThat(offset)
212                                         .describedAs("Committed offset must be at most equal to the offset of the consumer")
213                                         .isLessThanOrEqualTo(expected);
214                         isOffsetBehindSeen.add(offset < expected);
215                 });
216
217                 assertThat(isOffsetBehindSeen.stream().reduce(false, (result, next) -> result | next))
218                                 .describedAs("Committed offsets are behind seen offsets")
219                                 .isTrue();
220         }
221
222         void checkSeenOffsetsForProgress()
223         {
224                 // Be sure, that some messages were consumed...!
225                 Set<TopicPartition> withProgress = new HashSet<>();
226                 partitions().forEach(tp ->
227                 {
228                         Long oldOffset = oldOffsets.get(tp) + 1;
229                         Long newOffset = recordHandler.seenOffsets.get(tp) + 1;
230                         if (!oldOffset.equals(newOffset))
231                         {
232                                 log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset);
233                                 withProgress.add(tp);
234                         }
235                 });
236                 assertThat(withProgress)
237                                 .describedAs("Some offsets must have changed, compared to the old offset-positions")
238                                 .isNotEmpty();
239         }
240
241
242         /** Helper methods for setting up and running the tests */
243
244         void seekToEnd()
245         {
246                 offsetConsumer.assign(partitions());
247                 offsetConsumer.seekToEnd(partitions());
248                 partitions().forEach(tp ->
249                 {
250                         // seekToEnd() works lazily: it only takes effect on poll()/position()
251                         Long offset = offsetConsumer.position(tp);
252                         log.info("New position for {}: {}", tp, offset);
253                 });
254                 // The new positions must be commited!
255                 offsetConsumer.commitSync();
256                 offsetConsumer.unsubscribe();
257         }
258
259         void doForCurrentOffsets(BiConsumer<TopicPartition, Long> consumer)
260         {
261                 offsetConsumer.assign(partitions());
262                 partitions().forEach(tp -> consumer.accept(tp, offsetConsumer.position(tp)));
263                 offsetConsumer.unsubscribe();
264         }
265
266         List<TopicPartition> partitions()
267         {
268                 return
269                                 IntStream
270                                                 .range(0, PARTITIONS)
271                                                 .mapToObj(partition -> new TopicPartition(TOPIC, partition))
272                                                 .collect(Collectors.toList());
273         }
274
275
276         public interface RecordGenerator
277         {
278                 int generate(
279                                 boolean poisonPills,
280                                 boolean logicErrors,
281                                 Consumer<ProducerRecord<Bytes, Bytes>> messageSender);
282
283                 default boolean canGeneratePoisonPill()
284                 {
285                         return true;
286                 }
287
288                 default boolean canGenerateLogicError()
289                 {
290                         return true;
291                 }
292
293                 default void assertBusinessLogic()
294                 {
295                         log.debug("No business-logic to assert");
296                 }
297         }
298
299         void sendMessage(ProducerRecord<Bytes, Bytes> record)
300         {
301                 testRecordProducer.send(record, (metadata, e) ->
302                 {
303                         if (metadata != null)
304                         {
305                                 log.debug(
306                                                 "{}|{} - {}={}",
307                                                 metadata.partition(),
308                                                 metadata.offset(),
309                                                 record.key(),
310                                                 record.value());
311                         }
312                         else
313                         {
314                                 log.warn(
315                                                 "Exception for {}={}: {}",
316                                                 record.key(),
317                                                 record.value(),
318                                                 e.toString());
319                         }
320                 });
321         }
322
323
324         @BeforeEach
325         public void init()
326         {
327                 Properties props;
328                 props = new Properties();
329                 props.put("bootstrap.servers", applicationProperties.getBootstrapServer());
330                 props.put("linger.ms", 100);
331                 props.put("key.serializer", BytesSerializer.class.getName());
332                 props.put("value.serializer", BytesSerializer.class.getName());
333                 testRecordProducer = new KafkaProducer<>(props);
334
335                 props = new Properties();
336                 props.put("bootstrap.servers", applicationProperties.getBootstrapServer());
337                 props.put("client.id", "OFFSET-CONSUMER");
338                 props.put("group.id", applicationProperties.getGroupId());
339                 props.put("key.deserializer", BytesDeserializer.class.getName());
340                 props.put("value.deserializer", BytesDeserializer.class.getName());
341                 offsetConsumer = new KafkaConsumer<>(props);
342
343                 mongoClient.getDatabase(mongoProperties.getDatabase()).drop();
344                 seekToEnd();
345
346                 oldOffsets = new HashMap<>();
347                 recordHandler.seenOffsets = new HashMap<>();
348                 recordHandler.receivedRecords = new HashSet<>();
349
350                 doForCurrentOffsets((tp, offset) ->
351                 {
352                         oldOffsets.put(tp, offset - 1);
353                         recordHandler.seenOffsets.put(tp, offset - 1);
354                 });
355
356                 endlessConsumer.start();
357         }
358
359         @AfterEach
360         public void deinit()
361         {
362                 try
363                 {
364                         endlessConsumer.stop();
365                 }
366                 catch (Exception e)
367                 {
368                         log.debug("{}", e.toString());
369                 }
370
371                 try
372                 {
373                         testRecordProducer.close();
374                         offsetConsumer.close();
375                 }
376                 catch (Exception e)
377                 {
378                         log.info("Exception while stopping the consumer: {}", e.toString());
379                 }
380         }
381
382
383         @TestConfiguration
384         @Import(ApplicationConfiguration.class)
385         public static class Configuration
386         {
387                 @Bean
388                 public RecordHandler recordHandler(RecordHandler applicationRecordHandler)
389                 {
390                         return new TestRecordHandler(applicationRecordHandler);
391                 }
392         }
393 }