Test prüft ungültige und unbekannte Nachrichten
[demos/kafka/training] / src / test / java / de / juplo / kafka / ApplicationTests.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.consumer.ConsumerRecord;
5 import org.apache.kafka.clients.consumer.KafkaConsumer;
6 import org.apache.kafka.clients.producer.KafkaProducer;
7 import org.apache.kafka.clients.producer.ProducerRecord;
8 import org.apache.kafka.common.TopicPartition;
9 import org.apache.kafka.common.errors.RecordDeserializationException;
10 import org.apache.kafka.common.serialization.*;
11 import org.apache.kafka.common.utils.Bytes;
12 import org.junit.jupiter.api.*;
13 import org.springframework.beans.factory.annotation.Autowired;
14 import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer;
15 import org.springframework.boot.test.context.TestConfiguration;
16 import org.springframework.context.annotation.Bean;
17 import org.springframework.context.annotation.Import;
18 import org.springframework.kafka.support.serializer.JsonSerializer;
19 import org.springframework.kafka.test.context.EmbeddedKafka;
20 import org.springframework.test.context.TestPropertySource;
21 import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
22
23 import java.time.Duration;
24 import java.time.LocalDateTime;
25 import java.util.*;
26 import java.util.concurrent.ExecutionException;
27 import java.util.concurrent.ExecutorService;
28 import java.util.function.BiConsumer;
29 import java.util.function.Consumer;
30 import java.util.stream.Collectors;
31 import java.util.stream.IntStream;
32
33 import static de.juplo.kafka.ApplicationTests.PARTITIONS;
34 import static de.juplo.kafka.ApplicationTests.TOPIC;
35 import static org.assertj.core.api.Assertions.*;
36 import static org.awaitility.Awaitility.*;
37
38
39 @SpringJUnitConfig(initializers = ConfigDataApplicationContextInitializer.class)
40 @TestMethodOrder(MethodOrderer.OrderAnnotation.class)
41 @TestPropertySource(
42                 properties = {
43                                 "consumer.bootstrap-server=${spring.embedded.kafka.brokers}",
44                                 "consumer.topic=" + TOPIC,
45                                 "consumer.commit-interval=1s" })
46 @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
47 @Slf4j
48 class ApplicationTests
49 {
50         public static final String TOPIC = "FOO";
51         public static final int PARTITIONS = 10;
52
53
54         StringSerializer stringSerializer = new StringSerializer();
55
56         @Autowired
57         Serializer valueSerializer;
58         @Autowired
59         KafkaProducer<String, Bytes> kafkaProducer;
60         @Autowired
61         KafkaConsumer<String, ValidMessage> kafkaConsumer;
62         @Autowired
63         KafkaConsumer<Bytes, Bytes> offsetConsumer;
64         @Autowired
65         ApplicationProperties properties;
66         @Autowired
67         ExecutorService executor;
68
69         Consumer<ConsumerRecord<String, ValidMessage>> testHandler;
70         EndlessConsumer<String, ValidMessage> endlessConsumer;
71         Map<TopicPartition, Long> oldOffsets;
72         Map<TopicPartition, Long> newOffsets;
73         Set<ConsumerRecord<String, ValidMessage>> receivedRecords;
74
75
76         /** Tests methods */
77
78         @Test
79         void commitsCurrentOffsetsOnSuccess()
80         {
81                 send100Messages((partition, key, counter) ->
82                 {
83                         Bytes value;
84                         String type;
85
86                         if (counter%3 != 0)
87                         {
88                                 value = serializeClientMessage(key, counter);
89                                 type = "message";
90                         }
91                         else {
92                                 value = serializeGreeting(key);
93                                 type = "greeting";
94                         }
95
96                         return toRecord(partition, key, value, Optional.of(type));
97                 });
98
99                 await("100 records received")
100                                 .atMost(Duration.ofSeconds(30))
101                                 .pollInterval(Duration.ofSeconds(1))
102                                 .until(() -> receivedRecords.size() >= 100);
103
104                 await("Offsets committed")
105                                 .atMost(Duration.ofSeconds(10))
106                                 .pollInterval(Duration.ofSeconds(1))
107                                 .untilAsserted(() ->
108                                 {
109                                         checkSeenOffsetsForProgress();
110                                         compareToCommitedOffsets(newOffsets);
111                                 });
112
113                 assertThatExceptionOfType(IllegalStateException.class)
114                                 .isThrownBy(() -> endlessConsumer.exitStatus())
115                                 .describedAs("Consumer should still be running");
116         }
117
118         @Test
119         void commitsOffsetOfErrorForReprocessingOnDeserializationErrorInvalidMessage()
120         {
121                 send100Messages((partition, key, counter) ->
122                 {
123                         Bytes value;
124                         String type;
125
126                         if (counter == 77)
127                         {
128                                 value = serializeFooMessage(key, counter);
129                                 type = null;
130                         }
131                         else
132                         {
133                                 if (counter%3 != 0)
134                                 {
135                                         value = serializeClientMessage(key, counter);
136                                         type = "message";
137                                 }
138                                 else {
139                                         value = serializeGreeting(key);
140                                         type = "greeting";
141                                 }
142                         }
143
144                         return toRecord(partition, key, value, Optional.ofNullable(type));
145                 });
146
147                 await("Consumer failed")
148                                 .atMost(Duration.ofSeconds(30))
149                                 .pollInterval(Duration.ofSeconds(1))
150                                 .until(() -> !endlessConsumer.running());
151
152                 checkSeenOffsetsForProgress();
153                 compareToCommitedOffsets(newOffsets);
154
155                 endlessConsumer.start();
156                 await("Consumer failed")
157                                 .atMost(Duration.ofSeconds(30))
158                                 .pollInterval(Duration.ofSeconds(1))
159                                 .until(() -> !endlessConsumer.running());
160
161                 checkSeenOffsetsForProgress();
162                 compareToCommitedOffsets(newOffsets);
163                 assertThat(receivedRecords.size())
164                                 .describedAs("Received not all sent events")
165                                 .isLessThan(100);
166
167                 assertThatNoException()
168                                 .describedAs("Consumer should not be running")
169                                 .isThrownBy(() -> endlessConsumer.exitStatus());
170                 assertThat(endlessConsumer.exitStatus())
171                                 .describedAs("Consumer should have exited abnormally")
172                                 .containsInstanceOf(RecordDeserializationException.class);
173         }
174
175         @Test
176         void commitsOffsetOfErrorForReprocessingOnDeserializationErrorOnUnknownMessage()
177         {
178                 send100Messages((partition, key, counter) ->
179                 {
180                         Bytes value;
181                         String type;
182
183                         if (counter == 77)
184                         {
185                                 value = serializeFooMessage(key, counter);
186                                 type = "foo";
187                         }
188                         else
189                         {
190                                 if (counter%3 != 0)
191                                 {
192                                         value = serializeClientMessage(key, counter);
193                                         type = "message";
194                                 }
195                                 else {
196                                         value = serializeGreeting(key);
197                                         type = "greeting";
198                                 }
199                         }
200
201                         return toRecord(partition, key, value, Optional.of(type));
202                 });
203
204                 await("Consumer failed")
205                                 .atMost(Duration.ofSeconds(30))
206                                 .pollInterval(Duration.ofSeconds(1))
207                                 .until(() -> !endlessConsumer.running());
208
209                 checkSeenOffsetsForProgress();
210                 compareToCommitedOffsets(newOffsets);
211
212                 endlessConsumer.start();
213                 await("Consumer failed")
214                                 .atMost(Duration.ofSeconds(30))
215                                 .pollInterval(Duration.ofSeconds(1))
216                                 .until(() -> !endlessConsumer.running());
217
218                 checkSeenOffsetsForProgress();
219                 compareToCommitedOffsets(newOffsets);
220                 assertThat(receivedRecords.size())
221                                 .describedAs("Received not all sent events")
222                                 .isLessThan(100);
223
224                 assertThatNoException()
225                                 .describedAs("Consumer should not be running")
226                                 .isThrownBy(() -> endlessConsumer.exitStatus());
227                 assertThat(endlessConsumer.exitStatus())
228                                 .describedAs("Consumer should have exited abnormally")
229                                 .containsInstanceOf(RecordDeserializationException.class);
230         }
231
232
233         /** Helper methods for the verification of expectations */
234
235         void compareToCommitedOffsets(Map<TopicPartition, Long> offsetsToCheck)
236         {
237                 doForCurrentOffsets((tp, offset) ->
238                 {
239                         Long expected = offsetsToCheck.get(tp) + 1;
240                         log.debug("Checking, if the offset for {} is {}", tp, expected);
241                         assertThat(offset)
242                                         .describedAs("Committed offset corresponds to the offset of the consumer")
243                                         .isEqualTo(expected);
244                 });
245         }
246
247         void checkSeenOffsetsForProgress()
248         {
249                 // Be sure, that some messages were consumed...!
250                 Set<TopicPartition> withProgress = new HashSet<>();
251                 partitions().forEach(tp ->
252                 {
253                         Long oldOffset = oldOffsets.get(tp) + 1;
254                         Long newOffset = newOffsets.get(tp) + 1;
255                         if (!oldOffset.equals(newOffset))
256                         {
257                                 log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset);
258                                 withProgress.add(tp);
259                         }
260                 });
261                 assertThat(withProgress)
262                                 .describedAs("Some offsets must have changed, compared to the old offset-positions")
263                                 .isNotEmpty();
264         }
265
266
267         /** Helper methods for setting up and running the tests */
268
269         void seekToEnd()
270         {
271                 offsetConsumer.assign(partitions());
272                 offsetConsumer.seekToEnd(partitions());
273                 partitions().forEach(tp ->
274                 {
275                         // seekToEnd() works lazily: it only takes effect on poll()/position()
276                         Long offset = offsetConsumer.position(tp);
277                         log.info("New position for {}: {}", tp, offset);
278                 });
279                 // The new positions must be commited!
280                 offsetConsumer.commitSync();
281                 offsetConsumer.unsubscribe();
282         }
283
284         void doForCurrentOffsets(BiConsumer<TopicPartition, Long> consumer)
285         {
286                 offsetConsumer.assign(partitions());
287                 partitions().forEach(tp -> consumer.accept(tp, offsetConsumer.position(tp)));
288                 offsetConsumer.unsubscribe();
289         }
290
291         List<TopicPartition> partitions()
292         {
293                 return
294                                 IntStream
295                                                 .range(0, PARTITIONS)
296                                                 .mapToObj(partition -> new TopicPartition(TOPIC, partition))
297                                                 .collect(Collectors.toList());
298         }
299
300
301         public interface RecordGenerator<K, V>
302         {
303                 public ProducerRecord<String, Bytes> generate(int partition, String key, int counter);
304         }
305
306         void send100Messages(RecordGenerator recordGenerator)
307         {
308                 int i = 0;
309
310                 for (int partition = 0; partition < 10; partition++)
311                 {
312                         for (int key = 0; key < 10; key++)
313                         {
314                                 ProducerRecord<String, Bytes> record =
315                                                 recordGenerator.generate(partition, Integer.toString(partition*10+key%2), ++i);
316
317                                 kafkaProducer.send(record, (metadata, e) ->
318                                 {
319                                         if (metadata != null)
320                                         {
321                                                 log.debug(
322                                                                 "{}|{} - {}={}",
323                                                                 metadata.partition(),
324                                                                 metadata.offset(),
325                                                                 record.key(),
326                                                                 record.value());
327                                         }
328                                         else
329                                         {
330                                                 log.warn(
331                                                                 "Exception for {}={}: {}",
332                                                                 record.key(),
333                                                                 record.value(),
334                                                                 e.toString());
335                                         }
336                                 });
337                         }
338                 }
339         }
340
341         ProducerRecord<String, Bytes> toRecord(int partition, String key, Bytes value, Optional<String> type)
342                 {
343                 ProducerRecord<String, Bytes> record =
344                                 new ProducerRecord<>(TOPIC, partition, key, value);
345
346                 type.ifPresent(typeId -> record.headers().add("__TypeId__", typeId.getBytes()));
347                 return record;
348         }
349
350         Bytes serializeClientMessage(String key, int value)
351         {
352                 TestClientMessage message = new TestClientMessage(key, Integer.toString(value));
353                 return new Bytes(valueSerializer.serialize(TOPIC, message));
354         }
355
356         Bytes serializeGreeting(String key)
357         {
358                 TestGreeting message = new TestGreeting(key, LocalDateTime.now());
359                 return new Bytes(valueSerializer.serialize(TOPIC, message));
360         }
361
362         Bytes serializeFooMessage(String key, int value)
363         {
364                 TestFooMessage message = new TestFooMessage(key, (long)value);
365                 return new Bytes(valueSerializer.serialize(TOPIC, message));
366         }
367
368         @BeforeEach
369         public void init()
370         {
371                 testHandler = record -> {} ;
372
373                 seekToEnd();
374
375                 oldOffsets = new HashMap<>();
376                 newOffsets = new HashMap<>();
377                 receivedRecords = new HashSet<>();
378
379                 doForCurrentOffsets((tp, offset) ->
380                 {
381                         oldOffsets.put(tp, offset - 1);
382                         newOffsets.put(tp, offset - 1);
383                 });
384
385                 Consumer<ConsumerRecord<String, ValidMessage>> captureOffsetAndExecuteTestHandler =
386                                 record ->
387                                 {
388                                         newOffsets.put(
389                                                         new TopicPartition(record.topic(), record.partition()),
390                                                         record.offset());
391                                         receivedRecords.add(record);
392                                         testHandler.accept(record);
393                                 };
394
395                 endlessConsumer =
396                                 new EndlessConsumer<>(
397                                                 executor,
398                                                 properties.getClientId(),
399                                                 properties.getTopic(),
400                                                 kafkaConsumer,
401                                                 captureOffsetAndExecuteTestHandler);
402
403                 endlessConsumer.start();
404         }
405
406         @AfterEach
407         public void deinit()
408         {
409                 try
410                 {
411                         endlessConsumer.stop();
412                 }
413                 catch (Exception e)
414                 {
415                         log.info("Exception while stopping the consumer: {}", e.toString());
416                 }
417         }
418
419
420         @TestConfiguration
421         @Import(ApplicationConfiguration.class)
422         public static class Configuration
423         {
424                 @Bean
425                 Serializer<ValidMessage> serializer()
426                 {
427                         return new JsonSerializer<>();
428                 }
429
430                 @Bean
431                 KafkaProducer<String, Bytes> kafkaProducer(ApplicationProperties properties)
432                 {
433                         Properties props = new Properties();
434                         props.put("bootstrap.servers", properties.getBootstrapServer());
435                         props.put("linger.ms", 100);
436                         props.put("key.serializer", StringSerializer.class.getName());
437                         props.put("value.serializer", BytesSerializer.class.getName());
438
439                         return new KafkaProducer<>(props);
440                 }
441
442                 @Bean
443                 KafkaConsumer<Bytes, Bytes> offsetConsumer(ApplicationProperties properties)
444                 {
445                         Properties props = new Properties();
446                         props.put("bootstrap.servers", properties.getBootstrapServer());
447                         props.put("client.id", "OFFSET-CONSUMER");
448                         props.put("group.id", properties.getGroupId());
449                         props.put("key.deserializer", BytesDeserializer.class.getName());
450                         props.put("value.deserializer", BytesDeserializer.class.getName());
451
452                         return new KafkaConsumer<>(props);
453                 }
454         }
455 }