Der Test verwendet die `@Bean` von `EndlessConsumer`
[demos/kafka/training] / src / test / java / de / juplo / kafka / GenericApplicationTests.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.consumer.ConsumerRecord;
5 import org.apache.kafka.clients.consumer.KafkaConsumer;
6 import org.apache.kafka.clients.producer.KafkaProducer;
7 import org.apache.kafka.clients.producer.ProducerRecord;
8 import org.apache.kafka.common.TopicPartition;
9 import org.apache.kafka.common.errors.RecordDeserializationException;
10 import org.apache.kafka.common.serialization.*;
11 import org.apache.kafka.common.utils.Bytes;
12 import org.junit.jupiter.api.*;
13 import org.springframework.beans.factory.annotation.Autowired;
14 import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer;
15 import org.springframework.boot.test.context.TestConfiguration;
16 import org.springframework.context.annotation.Bean;
17 import org.springframework.context.annotation.Import;
18 import org.springframework.kafka.test.context.EmbeddedKafka;
19 import org.springframework.test.context.TestPropertySource;
20 import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
21
22 import java.time.Duration;
23 import java.util.*;
24 import java.util.function.BiConsumer;
25 import java.util.function.Consumer;
26 import java.util.stream.Collectors;
27 import java.util.stream.IntStream;
28
29 import static de.juplo.kafka.GenericApplicationTests.PARTITIONS;
30 import static de.juplo.kafka.GenericApplicationTests.TOPIC;
31 import static org.assertj.core.api.Assertions.*;
32 import static org.awaitility.Awaitility.*;
33
34
35 @SpringJUnitConfig(initializers = ConfigDataApplicationContextInitializer.class)
36 @TestPropertySource(
37                 properties = {
38                                 "consumer.bootstrap-server=${spring.embedded.kafka.brokers}",
39                                 "consumer.topic=" + TOPIC,
40                                 "consumer.commit-interval=500ms" })
41 @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
42 @Slf4j
43 abstract class GenericApplicationTests<K, V>
44 {
45         public static final String TOPIC = "FOO";
46         public static final int PARTITIONS = 10;
47
48
49         @Autowired
50         org.apache.kafka.clients.consumer.Consumer<K, V> kafkaConsumer;
51         @Autowired
52         Consumer<ConsumerRecord<K, V>> consumer;
53         @Autowired
54         ApplicationProperties applicationProperties;
55         @Autowired
56         TestRecordHandler<K, V> recordHandler;
57         @Autowired
58         EndlessConsumer<K, V> endlessConsumer;
59
60
61         KafkaProducer<Bytes, Bytes> testRecordProducer;
62         KafkaConsumer<Bytes, Bytes> offsetConsumer;
63         Map<TopicPartition, Long> oldOffsets;
64
65
66         final RecordGenerator recordGenerator;
67         final Consumer<ProducerRecord<Bytes, Bytes>> messageSender;
68
69         public GenericApplicationTests(RecordGenerator recordGenerator)
70         {
71                 this.recordGenerator = recordGenerator;
72                 this.messageSender = (record) -> sendMessage(record);
73         }
74
75
76         /** Tests methods */
77
78         @Test
79         void commitsCurrentOffsetsOnSuccess() throws Exception
80         {
81                 int numberOfGeneratedMessages =
82                                 recordGenerator.generate(false, false, messageSender);
83
84                 await(numberOfGeneratedMessages + " records received")
85                                 .atMost(Duration.ofSeconds(30))
86                                 .pollInterval(Duration.ofSeconds(1))
87                                 .until(() -> recordHandler.receivedRecords.size() >= numberOfGeneratedMessages);
88
89                 await("Offsets committed")
90                                 .atMost(Duration.ofSeconds(10))
91                                 .pollInterval(Duration.ofSeconds(1))
92                                 .untilAsserted(() ->
93                                 {
94                                         checkSeenOffsetsForProgress();
95                                         assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets);
96                                 });
97
98                 assertThatExceptionOfType(IllegalStateException.class)
99                                 .isThrownBy(() -> endlessConsumer.exitStatus())
100                                 .describedAs("Consumer should still be running");
101
102                 endlessConsumer.stop();
103                 recordGenerator.assertBusinessLogic();
104         }
105
106         @Test
107         @SkipWhenErrorCannotBeGenerated(poisonPill = true)
108         void commitsOffsetOfErrorForReprocessingOnDeserializationError()
109         {
110                 int numberOfGeneratedMessages =
111                                 recordGenerator.generate(true, false, messageSender);
112
113                 await("Consumer failed")
114                                 .atMost(Duration.ofSeconds(30))
115                                 .pollInterval(Duration.ofSeconds(1))
116                                 .until(() -> !endlessConsumer.running());
117
118                 checkSeenOffsetsForProgress();
119                 assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets);
120
121                 endlessConsumer.start();
122                 await("Consumer failed")
123                                 .atMost(Duration.ofSeconds(30))
124                                 .pollInterval(Duration.ofSeconds(1))
125                                 .until(() -> !endlessConsumer.running());
126
127                 checkSeenOffsetsForProgress();
128                 assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets);
129                 assertThat(recordHandler.receivedRecords.size())
130                                 .describedAs("Received not all sent events")
131                                 .isLessThan(numberOfGeneratedMessages);
132
133                 assertThatNoException()
134                                 .describedAs("Consumer should not be running")
135                                 .isThrownBy(() -> endlessConsumer.exitStatus());
136                 assertThat(endlessConsumer.exitStatus())
137                                 .describedAs("Consumer should have exited abnormally")
138                                 .containsInstanceOf(RecordDeserializationException.class);
139
140                 recordGenerator.assertBusinessLogic();
141         }
142
143         @Test
144         @SkipWhenErrorCannotBeGenerated(logicError = true)
145         void doesNotCommitOffsetsOnLogicError()
146         {
147                 int numberOfGeneratedMessages =
148                                 recordGenerator.generate(false, true, messageSender);
149
150                 await("Consumer failed")
151                                 .atMost(Duration.ofSeconds(30))
152                                 .pollInterval(Duration.ofSeconds(1))
153                                 .until(() -> !endlessConsumer.running());
154
155                 checkSeenOffsetsForProgress();
156                 assertSeenOffsetsAreBehindCommittedOffsets(recordHandler.seenOffsets);
157
158                 endlessConsumer.start();
159                 await("Consumer failed")
160                                 .atMost(Duration.ofSeconds(30))
161                                 .pollInterval(Duration.ofSeconds(1))
162                                 .until(() -> !endlessConsumer.running());
163
164                 assertSeenOffsetsAreBehindCommittedOffsets(recordHandler.seenOffsets);
165
166                 assertThatNoException()
167                                 .describedAs("Consumer should not be running")
168                                 .isThrownBy(() -> endlessConsumer.exitStatus());
169                 assertThat(endlessConsumer.exitStatus())
170                                 .describedAs("Consumer should have exited abnormally")
171                                 .containsInstanceOf(RuntimeException.class);
172
173                 recordGenerator.assertBusinessLogic();
174         }
175
176
177         /** Helper methods for the verification of expectations */
178
179         void assertSeenOffsetsEqualCommittedOffsets(Map<TopicPartition, Long> offsetsToCheck)
180         {
181                 doForCurrentOffsets((tp, offset) ->
182                 {
183                         Long expected = offsetsToCheck.get(tp) + 1;
184                         log.debug("Checking, if the offset {} for {} is exactly {}", offset, tp, expected);
185                         assertThat(offset)
186                                         .describedAs("Committed offset corresponds to the offset of the consumer")
187                                         .isEqualTo(expected);
188                 });
189         }
190
191         void assertSeenOffsetsAreBehindCommittedOffsets(Map<TopicPartition, Long> offsetsToCheck)
192         {
193                 List<Boolean> isOffsetBehindSeen = new LinkedList<>();
194
195                 doForCurrentOffsets((tp, offset) ->
196                 {
197                         Long expected = offsetsToCheck.get(tp) + 1;
198                         log.debug("Checking, if the offset {} for {} is at most {}", offset, tp, expected);
199                         assertThat(offset)
200                                         .describedAs("Committed offset must be at most equal to the offset of the consumer")
201                                         .isLessThanOrEqualTo(expected);
202                         isOffsetBehindSeen.add(offset < expected);
203                 });
204
205                 assertThat(isOffsetBehindSeen.stream().reduce(false, (result, next) -> result | next))
206                                 .describedAs("Committed offsets are behind seen offsets")
207                                 .isTrue();
208         }
209
210         void checkSeenOffsetsForProgress()
211         {
212                 // Be sure, that some messages were consumed...!
213                 Set<TopicPartition> withProgress = new HashSet<>();
214                 partitions().forEach(tp ->
215                 {
216                         Long oldOffset = oldOffsets.get(tp) + 1;
217                         Long newOffset = recordHandler.seenOffsets.get(tp) + 1;
218                         if (!oldOffset.equals(newOffset))
219                         {
220                                 log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset);
221                                 withProgress.add(tp);
222                         }
223                 });
224                 assertThat(withProgress)
225                                 .describedAs("Some offsets must have changed, compared to the old offset-positions")
226                                 .isNotEmpty();
227         }
228
229
230         /** Helper methods for setting up and running the tests */
231
232         void seekToEnd()
233         {
234                 offsetConsumer.assign(partitions());
235                 offsetConsumer.seekToEnd(partitions());
236                 partitions().forEach(tp ->
237                 {
238                         // seekToEnd() works lazily: it only takes effect on poll()/position()
239                         Long offset = offsetConsumer.position(tp);
240                         log.info("New position for {}: {}", tp, offset);
241                 });
242                 // The new positions must be commited!
243                 offsetConsumer.commitSync();
244                 offsetConsumer.unsubscribe();
245         }
246
247         void doForCurrentOffsets(BiConsumer<TopicPartition, Long> consumer)
248         {
249                 offsetConsumer.assign(partitions());
250                 partitions().forEach(tp -> consumer.accept(tp, offsetConsumer.position(tp)));
251                 offsetConsumer.unsubscribe();
252         }
253
254         List<TopicPartition> partitions()
255         {
256                 return
257                                 IntStream
258                                                 .range(0, PARTITIONS)
259                                                 .mapToObj(partition -> new TopicPartition(TOPIC, partition))
260                                                 .collect(Collectors.toList());
261         }
262
263
264         public interface RecordGenerator
265         {
266                 int generate(
267                                 boolean poisonPills,
268                                 boolean logicErrors,
269                                 Consumer<ProducerRecord<Bytes, Bytes>> messageSender);
270
271                 default boolean canGeneratePoisonPill()
272                 {
273                         return true;
274                 }
275
276                 default boolean canGenerateLogicError()
277                 {
278                         return true;
279                 }
280
281                 default void assertBusinessLogic()
282                 {
283                         log.debug("No business-logic to assert");
284                 }
285         }
286
287         void sendMessage(ProducerRecord<Bytes, Bytes> record)
288         {
289                 testRecordProducer.send(record, (metadata, e) ->
290                 {
291                         if (metadata != null)
292                         {
293                                 log.debug(
294                                                 "{}|{} - {}={}",
295                                                 metadata.partition(),
296                                                 metadata.offset(),
297                                                 record.key(),
298                                                 record.value());
299                         }
300                         else
301                         {
302                                 log.warn(
303                                                 "Exception for {}={}: {}",
304                                                 record.key(),
305                                                 record.value(),
306                                                 e.toString());
307                         }
308                 });
309         }
310
311
312         @BeforeEach
313         public void init()
314         {
315                 Properties props;
316                 props = new Properties();
317                 props.put("bootstrap.servers", applicationProperties.getBootstrapServer());
318                 props.put("linger.ms", 100);
319                 props.put("key.serializer", BytesSerializer.class.getName());
320                 props.put("value.serializer", BytesSerializer.class.getName());
321                 testRecordProducer = new KafkaProducer<>(props);
322
323                 props = new Properties();
324                 props.put("bootstrap.servers", applicationProperties.getBootstrapServer());
325                 props.put("client.id", "OFFSET-CONSUMER");
326                 props.put("group.id", applicationProperties.getGroupId());
327                 props.put("key.deserializer", BytesDeserializer.class.getName());
328                 props.put("value.deserializer", BytesDeserializer.class.getName());
329                 offsetConsumer = new KafkaConsumer<>(props);
330
331                 seekToEnd();
332
333                 oldOffsets = new HashMap<>();
334                 recordHandler.seenOffsets = new HashMap<>();
335                 recordHandler.receivedRecords = new HashSet<>();
336
337                 doForCurrentOffsets((tp, offset) ->
338                 {
339                         oldOffsets.put(tp, offset - 1);
340                         recordHandler.seenOffsets.put(tp, offset - 1);
341                 });
342
343                 endlessConsumer.start();
344         }
345
346         @AfterEach
347         public void deinit()
348         {
349                 try
350                 {
351                         endlessConsumer.stop();
352                 }
353                 catch (Exception e)
354                 {
355                         log.debug("{}", e.toString());
356                 }
357
358                 try
359                 {
360                         testRecordProducer.close();
361                         offsetConsumer.close();
362                 }
363                 catch (Exception e)
364                 {
365                         log.info("Exception while stopping the consumer: {}", e.toString());
366                 }
367         }
368
369
370         @TestConfiguration
371         @Import(ApplicationConfiguration.class)
372         public static class Configuration
373         {
374                 @Bean
375                 public RecordHandler recordHandler(RecordHandler applicationRecordHandler)
376                 {
377                         return new TestRecordHandler(applicationRecordHandler);
378                 }
379         }
380 }