Signatur und Handling des `RecordGenerator` vereinfacht/überarbeitet
[demos/kafka/training] / src / test / java / de / juplo / kafka / GenericApplicationTests.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.consumer.ConsumerRecord;
5 import org.apache.kafka.clients.consumer.KafkaConsumer;
6 import org.apache.kafka.clients.producer.KafkaProducer;
7 import org.apache.kafka.clients.producer.ProducerRecord;
8 import org.apache.kafka.common.TopicPartition;
9 import org.apache.kafka.common.errors.RecordDeserializationException;
10 import org.apache.kafka.common.serialization.*;
11 import org.apache.kafka.common.utils.Bytes;
12 import org.junit.jupiter.api.*;
13 import org.springframework.beans.factory.annotation.Autowired;
14 import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer;
15 import org.springframework.boot.test.context.TestConfiguration;
16 import org.springframework.context.annotation.Import;
17 import org.springframework.kafka.test.context.EmbeddedKafka;
18 import org.springframework.test.context.TestPropertySource;
19 import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
20
21 import java.time.Duration;
22 import java.util.*;
23 import java.util.concurrent.ExecutorService;
24 import java.util.function.BiConsumer;
25 import java.util.function.Consumer;
26 import java.util.stream.Collectors;
27 import java.util.stream.IntStream;
28
29 import static de.juplo.kafka.GenericApplicationTests.PARTITIONS;
30 import static de.juplo.kafka.GenericApplicationTests.TOPIC;
31 import static org.assertj.core.api.Assertions.*;
32 import static org.awaitility.Awaitility.*;
33
34
35 @SpringJUnitConfig(initializers = ConfigDataApplicationContextInitializer.class)
36 @TestPropertySource(
37                 properties = {
38                                 "consumer.bootstrap-server=${spring.embedded.kafka.brokers}",
39                                 "consumer.topic=" + TOPIC,
40                                 "consumer.commit-interval=1s" })
41 @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
42 @Slf4j
43 abstract class GenericApplicationTests<K, V>
44 {
45         public static final String TOPIC = "FOO";
46         public static final int PARTITIONS = 10;
47
48
49         @Autowired
50         KafkaConsumer<K, V> kafkaConsumer;
51         @Autowired
52         Consumer<ConsumerRecord<K, V>> consumer;
53         @Autowired
54         ApplicationProperties properties;
55         @Autowired
56         ExecutorService executor;
57
58         KafkaProducer<Bytes, Bytes> testRecordProducer;
59         KafkaConsumer<Bytes, Bytes> offsetConsumer;
60         EndlessConsumer<K, V> endlessConsumer;
61         Map<TopicPartition, Long> oldOffsets;
62         Map<TopicPartition, Long> newOffsets;
63         Set<ConsumerRecord<K, V>> receivedRecords;
64
65
66         final RecordGenerator recordGenerator;
67         final Consumer<ProducerRecord<Bytes, Bytes>> messageSender;
68
69         public GenericApplicationTests(RecordGenerator recordGenerator)
70         {
71                 this.recordGenerator = recordGenerator;
72                 this.messageSender = (record) -> sendMessage(record);
73         }
74
75
76         /** Tests methods */
77
78         @Test
79         void commitsCurrentOffsetsOnSuccess()
80         {
81                 int numberOfGeneratedMessages =
82                                 recordGenerator.generate(false, false, messageSender);
83
84                 await(numberOfGeneratedMessages + " records received")
85                                 .atMost(Duration.ofSeconds(30))
86                                 .pollInterval(Duration.ofSeconds(1))
87                                 .until(() -> receivedRecords.size() >= numberOfGeneratedMessages);
88
89                 await("Offsets committed")
90                                 .atMost(Duration.ofSeconds(10))
91                                 .pollInterval(Duration.ofSeconds(1))
92                                 .untilAsserted(() ->
93                                 {
94                                         checkSeenOffsetsForProgress();
95                                         compareToCommitedOffsets(newOffsets);
96                                 });
97
98                 assertThatExceptionOfType(IllegalStateException.class)
99                                 .isThrownBy(() -> endlessConsumer.exitStatus())
100                                 .describedAs("Consumer should still be running");
101         }
102
103         @Test
104         @SkipWhenErrorCannotBeGenerated(poisonPill = true)
105         void commitsOffsetOfErrorForReprocessingOnDeserializationError()
106         {
107                 int numberOfGeneratedMessages =
108                                 recordGenerator.generate(true, false, messageSender);
109
110                 await("Consumer failed")
111                                 .atMost(Duration.ofSeconds(30))
112                                 .pollInterval(Duration.ofSeconds(1))
113                                 .until(() -> !endlessConsumer.running());
114
115                 checkSeenOffsetsForProgress();
116                 compareToCommitedOffsets(newOffsets);
117
118                 endlessConsumer.start();
119                 await("Consumer failed")
120                                 .atMost(Duration.ofSeconds(30))
121                                 .pollInterval(Duration.ofSeconds(1))
122                                 .until(() -> !endlessConsumer.running());
123
124                 checkSeenOffsetsForProgress();
125                 compareToCommitedOffsets(newOffsets);
126                 assertThat(receivedRecords.size())
127                                 .describedAs("Received not all sent events")
128                                 .isLessThan(numberOfGeneratedMessages);
129
130                 assertThatNoException()
131                                 .describedAs("Consumer should not be running")
132                                 .isThrownBy(() -> endlessConsumer.exitStatus());
133                 assertThat(endlessConsumer.exitStatus())
134                                 .describedAs("Consumer should have exited abnormally")
135                                 .containsInstanceOf(RecordDeserializationException.class);
136         }
137
138         @Test
139         @SkipWhenErrorCannotBeGenerated(logicError = true)
140         void doesNotCommitOffsetsOnLogicError()
141         {
142                 int numberOfGeneratedMessages =
143                                 recordGenerator.generate(false, true, messageSender);
144
145                 await("Consumer failed")
146                                 .atMost(Duration.ofSeconds(30))
147                                 .pollInterval(Duration.ofSeconds(1))
148                                 .until(() -> !endlessConsumer.running());
149
150                 checkSeenOffsetsForProgress();
151                 compareToCommitedOffsets(oldOffsets);
152
153                 endlessConsumer.start();
154                 await("Consumer failed")
155                                 .atMost(Duration.ofSeconds(30))
156                                 .pollInterval(Duration.ofSeconds(1))
157                                 .until(() -> !endlessConsumer.running());
158
159                 checkSeenOffsetsForProgress();
160                 compareToCommitedOffsets(oldOffsets);
161                 assertThat(receivedRecords.size())
162                                 .describedAs("Received not all sent events")
163                                 .isLessThan(numberOfGeneratedMessages);
164
165                 assertThatNoException()
166                                 .describedAs("Consumer should not be running")
167                                 .isThrownBy(() -> endlessConsumer.exitStatus());
168                 assertThat(endlessConsumer.exitStatus())
169                                 .describedAs("Consumer should have exited abnormally")
170                                 .containsInstanceOf(RuntimeException.class);
171         }
172
173
174         /** Helper methods for the verification of expectations */
175
176         void compareToCommitedOffsets(Map<TopicPartition, Long> offsetsToCheck)
177         {
178                 doForCurrentOffsets((tp, offset) ->
179                 {
180                         Long expected = offsetsToCheck.get(tp) + 1;
181                         log.debug("Checking, if the offset for {} is {}", tp, expected);
182                         assertThat(offset)
183                                         .describedAs("Committed offset corresponds to the offset of the consumer")
184                                         .isEqualTo(expected);
185                 });
186         }
187
188         void checkSeenOffsetsForProgress()
189         {
190                 // Be sure, that some messages were consumed...!
191                 Set<TopicPartition> withProgress = new HashSet<>();
192                 partitions().forEach(tp ->
193                 {
194                         Long oldOffset = oldOffsets.get(tp) + 1;
195                         Long newOffset = newOffsets.get(tp) + 1;
196                         if (!oldOffset.equals(newOffset))
197                         {
198                                 log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset);
199                                 withProgress.add(tp);
200                         }
201                 });
202                 assertThat(withProgress)
203                                 .describedAs("Some offsets must have changed, compared to the old offset-positions")
204                                 .isNotEmpty();
205         }
206
207
208         /** Helper methods for setting up and running the tests */
209
210         void seekToEnd()
211         {
212                 offsetConsumer.assign(partitions());
213                 offsetConsumer.seekToEnd(partitions());
214                 partitions().forEach(tp ->
215                 {
216                         // seekToEnd() works lazily: it only takes effect on poll()/position()
217                         Long offset = offsetConsumer.position(tp);
218                         log.info("New position for {}: {}", tp, offset);
219                 });
220                 // The new positions must be commited!
221                 offsetConsumer.commitSync();
222                 offsetConsumer.unsubscribe();
223         }
224
225         void doForCurrentOffsets(BiConsumer<TopicPartition, Long> consumer)
226         {
227                 offsetConsumer.assign(partitions());
228                 partitions().forEach(tp -> consumer.accept(tp, offsetConsumer.position(tp)));
229                 offsetConsumer.unsubscribe();
230         }
231
232         List<TopicPartition> partitions()
233         {
234                 return
235                                 IntStream
236                                                 .range(0, PARTITIONS)
237                                                 .mapToObj(partition -> new TopicPartition(TOPIC, partition))
238                                                 .collect(Collectors.toList());
239         }
240
241
242         public interface RecordGenerator
243         {
244                 int generate(
245                                 boolean poisonPills,
246                                 boolean logicErrors,
247                                 Consumer<ProducerRecord<Bytes, Bytes>> messageSender);
248
249                 default boolean canGeneratePoisonPill()
250                 {
251                         return true;
252                 }
253
254                 default boolean canGenerateLogicError()
255                 {
256                         return true;
257                 }
258         }
259
260         void sendMessage(ProducerRecord<Bytes, Bytes> record)
261         {
262                 testRecordProducer.send(record, (metadata, e) ->
263                 {
264                         if (metadata != null)
265                         {
266                                 log.debug(
267                                                 "{}|{} - {}={}",
268                                                 metadata.partition(),
269                                                 metadata.offset(),
270                                                 record.key(),
271                                                 record.value());
272                         }
273                         else
274                         {
275                                 log.warn(
276                                                 "Exception for {}={}: {}",
277                                                 record.key(),
278                                                 record.value(),
279                                                 e.toString());
280                         }
281                 });
282         }
283
284
285         @BeforeEach
286         public void init()
287         {
288                 Properties props;
289                 props = new Properties();
290                 props.put("bootstrap.servers", properties.getBootstrapServer());
291                 props.put("linger.ms", 100);
292                 props.put("key.serializer", BytesSerializer.class.getName());
293                 props.put("value.serializer", BytesSerializer.class.getName());
294                 testRecordProducer = new KafkaProducer<>(props);
295
296                 props = new Properties();
297                 props.put("bootstrap.servers", properties.getBootstrapServer());
298                 props.put("client.id", "OFFSET-CONSUMER");
299                 props.put("group.id", properties.getGroupId());
300                 props.put("key.deserializer", BytesDeserializer.class.getName());
301                 props.put("value.deserializer", BytesDeserializer.class.getName());
302                 offsetConsumer = new KafkaConsumer<>(props);
303
304                 seekToEnd();
305
306                 oldOffsets = new HashMap<>();
307                 newOffsets = new HashMap<>();
308                 receivedRecords = new HashSet<>();
309
310                 doForCurrentOffsets((tp, offset) ->
311                 {
312                         oldOffsets.put(tp, offset - 1);
313                         newOffsets.put(tp, offset - 1);
314                 });
315
316                 Consumer<ConsumerRecord<K, V>> captureOffsetAndExecuteTestHandler =
317                                 record ->
318                                 {
319                                         newOffsets.put(
320                                                         new TopicPartition(record.topic(), record.partition()),
321                                                         record.offset());
322                                         receivedRecords.add(record);
323                                         consumer.accept(record);
324                                 };
325
326                 endlessConsumer =
327                                 new EndlessConsumer<>(
328                                                 executor,
329                                                 properties.getClientId(),
330                                                 properties.getTopic(),
331                                                 kafkaConsumer,
332                                                 captureOffsetAndExecuteTestHandler);
333
334                 endlessConsumer.start();
335         }
336
337         @AfterEach
338         public void deinit()
339         {
340                 try
341                 {
342                         endlessConsumer.stop();
343                         testRecordProducer.close();
344                         offsetConsumer.close();
345                 }
346                 catch (Exception e)
347                 {
348                         log.info("Exception while stopping the consumer: {}", e.toString());
349                 }
350         }
351
352
353         @TestConfiguration
354         @Import(ApplicationConfiguration.class)
355         public static class Configuration
356         {
357         }
358 }