Tests aus gemerged springified-consumer--serialization -> deserialization
[demos/kafka/training] / src / test / java / de / juplo / kafka / GenericApplicationTest.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.consumer.ConsumerRecord;
5 import org.apache.kafka.clients.consumer.KafkaConsumer;
6 import org.apache.kafka.clients.producer.KafkaProducer;
7 import org.apache.kafka.clients.producer.ProducerRecord;
8 import org.apache.kafka.common.TopicPartition;
9 import org.apache.kafka.common.errors.RecordDeserializationException;
10 import org.apache.kafka.common.serialization.*;
11 import org.apache.kafka.common.utils.Bytes;
12 import org.junit.jupiter.api.*;
13 import org.springframework.beans.factory.annotation.Autowired;
14 import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer;
15 import org.springframework.boot.test.context.TestConfiguration;
16 import org.springframework.context.annotation.Import;
17 import org.springframework.kafka.test.context.EmbeddedKafka;
18 import org.springframework.test.context.TestPropertySource;
19 import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
20
21 import java.time.Duration;
22 import java.util.*;
23 import java.util.concurrent.ExecutorService;
24 import java.util.function.BiConsumer;
25 import java.util.function.Consumer;
26 import java.util.stream.Collectors;
27 import java.util.stream.IntStream;
28
29 import static de.juplo.kafka.GenericApplicationTest.PARTITIONS;
30 import static de.juplo.kafka.GenericApplicationTest.TOPIC;
31 import static org.assertj.core.api.Assertions.*;
32 import static org.awaitility.Awaitility.*;
33
34
35 @SpringJUnitConfig(initializers = ConfigDataApplicationContextInitializer.class)
36 @TestPropertySource(
37                 properties = {
38                                 "consumer.bootstrap-server=${spring.embedded.kafka.brokers}",
39                                 "consumer.topic=" + TOPIC,
40                                 "consumer.commit-interval=1s" })
41 @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
42 @Slf4j
43 abstract class GenericApplicationTest<K, V>
44 {
45         public static final String TOPIC = "FOO";
46         public static final int PARTITIONS = 10;
47
48
49         @Autowired
50         KafkaConsumer<K, V> kafkaConsumer;
51         @Autowired
52         Consumer<ConsumerRecord<K, V>> consumer;
53         @Autowired
54         ApplicationProperties properties;
55         @Autowired
56         ExecutorService executor;
57
58         KafkaProducer<Bytes, Bytes> testRecordProducer;
59         KafkaConsumer<Bytes, Bytes> offsetConsumer;
60         EndlessConsumer<K, V> endlessConsumer;
61         Map<TopicPartition, Long> oldOffsets;
62         Map<TopicPartition, Long> newOffsets;
63         Set<ConsumerRecord<K, V>> receivedRecords;
64
65
66         final RecordGenerator recordGenerator;
67         final Consumer<ProducerRecord<Bytes, Bytes>> messageSender;
68
69         public GenericApplicationTest(RecordGenerator recordGenerator)
70         {
71                 this.recordGenerator = recordGenerator;
72                 this.messageSender = (record) -> sendMessage(record);
73         }
74
75
76         /** Tests methods */
77
78         @Test
79         void commitsCurrentOffsetsOnSuccess()
80         {
81                 recordGenerator.generate(100, Set.of(), Set.of(), messageSender);
82
83                 await("100 records received")
84                                 .atMost(Duration.ofSeconds(30))
85                                 .pollInterval(Duration.ofSeconds(1))
86                                 .until(() -> receivedRecords.size() >= 100);
87
88                 await("Offsets committed")
89                                 .atMost(Duration.ofSeconds(10))
90                                 .pollInterval(Duration.ofSeconds(1))
91                                 .untilAsserted(() ->
92                                 {
93                                         checkSeenOffsetsForProgress();
94                                         compareToCommitedOffsets(newOffsets);
95                                 });
96
97                 assertThatExceptionOfType(IllegalStateException.class)
98                                 .isThrownBy(() -> endlessConsumer.exitStatus())
99                                 .describedAs("Consumer should still be running");
100         }
101
102         @Test
103         void commitsOffsetOfErrorForReprocessingOnDeserializationError()
104         {
105                 recordGenerator.generate(100, Set.of(77), Set.of(), messageSender);
106
107                 await("Consumer failed")
108                                 .atMost(Duration.ofSeconds(30))
109                                 .pollInterval(Duration.ofSeconds(1))
110                                 .until(() -> !endlessConsumer.running());
111
112                 checkSeenOffsetsForProgress();
113                 compareToCommitedOffsets(newOffsets);
114
115                 endlessConsumer.start();
116                 await("Consumer failed")
117                                 .atMost(Duration.ofSeconds(30))
118                                 .pollInterval(Duration.ofSeconds(1))
119                                 .until(() -> !endlessConsumer.running());
120
121                 checkSeenOffsetsForProgress();
122                 compareToCommitedOffsets(newOffsets);
123                 assertThat(receivedRecords.size())
124                                 .describedAs("Received not all sent events")
125                                 .isLessThan(100);
126
127                 assertThatNoException()
128                                 .describedAs("Consumer should not be running")
129                                 .isThrownBy(() -> endlessConsumer.exitStatus());
130                 assertThat(endlessConsumer.exitStatus())
131                                 .describedAs("Consumer should have exited abnormally")
132                                 .containsInstanceOf(RecordDeserializationException.class);
133         }
134
135         @Test
136         void doesNotCommitOffsetsOnLogicError()
137         {
138                 recordGenerator.generate(100, Set.of(), Set.of(77), messageSender);
139
140                 await("Consumer failed")
141                                 .atMost(Duration.ofSeconds(30))
142                                 .pollInterval(Duration.ofSeconds(1))
143                                 .until(() -> !endlessConsumer.running());
144
145                 checkSeenOffsetsForProgress();
146                 compareToCommitedOffsets(oldOffsets);
147
148                 endlessConsumer.start();
149                 await("Consumer failed")
150                                 .atMost(Duration.ofSeconds(30))
151                                 .pollInterval(Duration.ofSeconds(1))
152                                 .until(() -> !endlessConsumer.running());
153
154                 checkSeenOffsetsForProgress();
155                 compareToCommitedOffsets(oldOffsets);
156                 assertThat(receivedRecords.size())
157                                 .describedAs("Received not all sent events")
158                                 .isLessThan(100);
159
160                 assertThatNoException()
161                                 .describedAs("Consumer should not be running")
162                                 .isThrownBy(() -> endlessConsumer.exitStatus());
163                 assertThat(endlessConsumer.exitStatus())
164                                 .describedAs("Consumer should have exited abnormally")
165                                 .containsInstanceOf(RuntimeException.class);
166         }
167
168
169         /** Helper methods for the verification of expectations */
170
171         void compareToCommitedOffsets(Map<TopicPartition, Long> offsetsToCheck)
172         {
173                 doForCurrentOffsets((tp, offset) ->
174                 {
175                         Long expected = offsetsToCheck.get(tp) + 1;
176                         log.debug("Checking, if the offset for {} is {}", tp, expected);
177                         assertThat(offset)
178                                         .describedAs("Committed offset corresponds to the offset of the consumer")
179                                         .isEqualTo(expected);
180                 });
181         }
182
183         void checkSeenOffsetsForProgress()
184         {
185                 // Be sure, that some messages were consumed...!
186                 Set<TopicPartition> withProgress = new HashSet<>();
187                 partitions().forEach(tp ->
188                 {
189                         Long oldOffset = oldOffsets.get(tp) + 1;
190                         Long newOffset = newOffsets.get(tp) + 1;
191                         if (!oldOffset.equals(newOffset))
192                         {
193                                 log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset);
194                                 withProgress.add(tp);
195                         }
196                 });
197                 assertThat(withProgress)
198                                 .describedAs("Some offsets must have changed, compared to the old offset-positions")
199                                 .isNotEmpty();
200         }
201
202
203         /** Helper methods for setting up and running the tests */
204
205         void seekToEnd()
206         {
207                 offsetConsumer.assign(partitions());
208                 offsetConsumer.seekToEnd(partitions());
209                 partitions().forEach(tp ->
210                 {
211                         // seekToEnd() works lazily: it only takes effect on poll()/position()
212                         Long offset = offsetConsumer.position(tp);
213                         log.info("New position for {}: {}", tp, offset);
214                 });
215                 // The new positions must be commited!
216                 offsetConsumer.commitSync();
217                 offsetConsumer.unsubscribe();
218         }
219
220         void doForCurrentOffsets(BiConsumer<TopicPartition, Long> consumer)
221         {
222                 offsetConsumer.assign(partitions());
223                 partitions().forEach(tp -> consumer.accept(tp, offsetConsumer.position(tp)));
224                 offsetConsumer.unsubscribe();
225         }
226
227         List<TopicPartition> partitions()
228         {
229                 return
230                                 IntStream
231                                                 .range(0, PARTITIONS)
232                                                 .mapToObj(partition -> new TopicPartition(TOPIC, partition))
233                                                 .collect(Collectors.toList());
234         }
235
236
237         public interface RecordGenerator
238         {
239                 void generate(
240                                 int numberOfMessagesToGenerate,
241                                 Set<Integer> poisonPills,
242                                 Set<Integer> logicErrors,
243                                 Consumer<ProducerRecord<Bytes, Bytes>> messageSender);
244         }
245
246         void sendMessage(ProducerRecord<Bytes, Bytes> record)
247         {
248                 testRecordProducer.send(record, (metadata, e) ->
249                 {
250                         if (metadata != null)
251                         {
252                                 log.debug(
253                                                 "{}|{} - {}={}",
254                                                 metadata.partition(),
255                                                 metadata.offset(),
256                                                 record.key(),
257                                                 record.value());
258                         }
259                         else
260                         {
261                                 log.warn(
262                                                 "Exception for {}={}: {}",
263                                                 record.key(),
264                                                 record.value(),
265                                                 e.toString());
266                         }
267                 });
268         }
269
270
271         @BeforeEach
272         public void init()
273         {
274                 Properties props;
275                 props = new Properties();
276                 props.put("bootstrap.servers", properties.getBootstrapServer());
277                 props.put("linger.ms", 100);
278                 props.put("key.serializer", BytesSerializer.class.getName());
279                 props.put("value.serializer", BytesSerializer.class.getName());
280                 testRecordProducer = new KafkaProducer<>(props);
281
282                 props = new Properties();
283                 props.put("bootstrap.servers", properties.getBootstrapServer());
284                 props.put("client.id", "OFFSET-CONSUMER");
285                 props.put("group.id", properties.getGroupId());
286                 props.put("key.deserializer", BytesDeserializer.class.getName());
287                 props.put("value.deserializer", BytesDeserializer.class.getName());
288                 offsetConsumer = new KafkaConsumer<>(props);
289
290                 seekToEnd();
291
292                 oldOffsets = new HashMap<>();
293                 newOffsets = new HashMap<>();
294                 receivedRecords = new HashSet<>();
295
296                 doForCurrentOffsets((tp, offset) ->
297                 {
298                         oldOffsets.put(tp, offset - 1);
299                         newOffsets.put(tp, offset - 1);
300                 });
301
302                 Consumer<ConsumerRecord<K, V>> captureOffsetAndExecuteTestHandler =
303                                 record ->
304                                 {
305                                         newOffsets.put(
306                                                         new TopicPartition(record.topic(), record.partition()),
307                                                         record.offset());
308                                         receivedRecords.add(record);
309                                         consumer.accept(record);
310                                 };
311
312                 endlessConsumer =
313                                 new EndlessConsumer<>(
314                                                 executor,
315                                                 properties.getClientId(),
316                                                 properties.getTopic(),
317                                                 kafkaConsumer,
318                                                 captureOffsetAndExecuteTestHandler);
319
320                 endlessConsumer.start();
321         }
322
323         @AfterEach
324         public void deinit()
325         {
326                 try
327                 {
328                         endlessConsumer.stop();
329                         testRecordProducer.close();
330                         offsetConsumer.close();
331                 }
332                 catch (Exception e)
333                 {
334                         log.info("Exception while stopping the consumer: {}", e.toString());
335                 }
336         }
337
338
339         @TestConfiguration
340         @Import(ApplicationConfiguration.class)
341         public static class Configuration
342         {
343         }
344 }