24d3a9e0c07649282d3107c999cf49cffd94f029
[demos/kafka/training] / src / test / java / de / juplo / kafka / ApplicationTests.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.consumer.ConsumerRecord;
5 import org.apache.kafka.clients.consumer.KafkaConsumer;
6 import org.apache.kafka.clients.producer.KafkaProducer;
7 import org.apache.kafka.clients.producer.ProducerRecord;
8 import org.apache.kafka.common.TopicPartition;
9 import org.apache.kafka.common.errors.RecordDeserializationException;
10 import org.apache.kafka.common.serialization.*;
11 import org.apache.kafka.common.utils.Bytes;
12 import org.junit.jupiter.api.*;
13 import org.springframework.beans.factory.annotation.Autowired;
14 import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer;
15 import org.springframework.boot.test.context.TestConfiguration;
16 import org.springframework.context.annotation.Bean;
17 import org.springframework.context.annotation.Import;
18 import org.springframework.kafka.support.serializer.JsonSerializer;
19 import org.springframework.kafka.test.context.EmbeddedKafka;
20 import org.springframework.test.context.TestPropertySource;
21 import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
22
23 import java.time.Duration;
24 import java.time.LocalDateTime;
25 import java.util.*;
26 import java.util.concurrent.ExecutionException;
27 import java.util.concurrent.ExecutorService;
28 import java.util.function.BiConsumer;
29 import java.util.function.Consumer;
30 import java.util.stream.Collectors;
31 import java.util.stream.IntStream;
32
33 import static de.juplo.kafka.ApplicationTests.PARTITIONS;
34 import static de.juplo.kafka.ApplicationTests.TOPIC;
35 import static org.assertj.core.api.Assertions.*;
36 import static org.awaitility.Awaitility.*;
37
38
39 @SpringJUnitConfig(initializers = ConfigDataApplicationContextInitializer.class)
40 @TestMethodOrder(MethodOrderer.OrderAnnotation.class)
41 @TestPropertySource(
42                 properties = {
43                                 "consumer.bootstrap-server=${spring.embedded.kafka.brokers}",
44                                 "consumer.topic=" + TOPIC,
45                                 "consumer.commit-interval=1s" })
46 @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
47 @Slf4j
48 class ApplicationTests
49 {
50         public static final String TOPIC = "FOO";
51         public static final int PARTITIONS = 10;
52
53
54         StringSerializer stringSerializer = new StringSerializer();
55
56         @Autowired
57         Serializer valueSerializer;
58         @Autowired
59         KafkaProducer<String, Bytes> kafkaProducer;
60         @Autowired
61         KafkaConsumer<String, ValidMessage> kafkaConsumer;
62         @Autowired
63         KafkaConsumer<Bytes, Bytes> offsetConsumer;
64         @Autowired
65         ApplicationProperties properties;
66         @Autowired
67         ExecutorService executor;
68
69         Consumer<ConsumerRecord<String, ValidMessage>> testHandler;
70         EndlessConsumer<String, ValidMessage> endlessConsumer;
71         Map<TopicPartition, Long> oldOffsets;
72         Map<TopicPartition, Long> newOffsets;
73         Set<ConsumerRecord<String, ValidMessage>> receivedRecords;
74
75
76         /** Tests methods */
77
78         @Test
79         void commitsCurrentOffsetsOnSuccess() throws ExecutionException, InterruptedException
80         {
81                 send100Messages((partition, key, counter) ->
82                 {
83                         Bytes value;
84                         String type;
85
86                         if (counter%3 != 0)
87                         {
88                                 value = serializeClientMessage(key, counter);
89                                 type = "message";
90                         }
91                         else {
92                                 value = serializeGreeting(key, counter);
93                                 type = "greeting";
94                         }
95
96                         return toRecord(partition, key, value, type);
97                 });
98
99                 await("100 records received")
100                                 .atMost(Duration.ofSeconds(30))
101                                 .pollInterval(Duration.ofSeconds(1))
102                                 .until(() -> receivedRecords.size() >= 100);
103
104                 await("Offsets committed")
105                                 .atMost(Duration.ofSeconds(10))
106                                 .pollInterval(Duration.ofSeconds(1))
107                                 .untilAsserted(() ->
108                                 {
109                                         checkSeenOffsetsForProgress();
110                                         compareToCommitedOffsets(newOffsets);
111                                 });
112
113                 assertThatExceptionOfType(IllegalStateException.class)
114                                 .isThrownBy(() -> endlessConsumer.exitStatus())
115                                 .describedAs("Consumer should still be running");
116         }
117
118         @Test
119         void commitsOffsetOfErrorForReprocessingOnDeserializationError()
120         {
121                 send100Messages((partition, key, counter) ->
122                 {
123                         Bytes value;
124                         String type;
125
126                         if (counter == 77)
127                         {
128                                 value = serializeFooMessage(key, counter);
129                                 type = "foo";
130                         }
131                         else
132                         {
133                                 if (counter%3 != 0)
134                                 {
135                                         value = serializeClientMessage(key, counter);
136                                         type = "message";
137                                 }
138                                 else {
139                                         value = serializeGreeting(key, counter);
140                                         type = "greeting";
141                                 }
142                         }
143
144                         return toRecord(partition, key, value, type);
145                 });
146
147                 await("Consumer failed")
148                                 .atMost(Duration.ofSeconds(30))
149                                 .pollInterval(Duration.ofSeconds(1))
150                                 .until(() -> !endlessConsumer.running());
151
152                 checkSeenOffsetsForProgress();
153                 compareToCommitedOffsets(newOffsets);
154
155                 endlessConsumer.start();
156                 await("Consumer failed")
157                                 .atMost(Duration.ofSeconds(30))
158                                 .pollInterval(Duration.ofSeconds(1))
159                                 .until(() -> !endlessConsumer.running());
160
161                 checkSeenOffsetsForProgress();
162                 compareToCommitedOffsets(newOffsets);
163                 assertThat(receivedRecords.size())
164                                 .describedAs("Received not all sent events")
165                                 .isLessThan(100);
166
167                 assertThatNoException()
168                                 .describedAs("Consumer should not be running")
169                                 .isThrownBy(() -> endlessConsumer.exitStatus());
170                 assertThat(endlessConsumer.exitStatus())
171                                 .describedAs("Consumer should have exited abnormally")
172                                 .containsInstanceOf(RecordDeserializationException.class);
173         }
174
175
176         /** Helper methods for the verification of expectations */
177
178         void compareToCommitedOffsets(Map<TopicPartition, Long> offsetsToCheck)
179         {
180                 doForCurrentOffsets((tp, offset) ->
181                 {
182                         Long expected = offsetsToCheck.get(tp) + 1;
183                         log.debug("Checking, if the offset for {} is {}", tp, expected);
184                         assertThat(offset)
185                                         .describedAs("Committed offset corresponds to the offset of the consumer")
186                                         .isEqualTo(expected);
187                 });
188         }
189
190         void checkSeenOffsetsForProgress()
191         {
192                 // Be sure, that some messages were consumed...!
193                 Set<TopicPartition> withProgress = new HashSet<>();
194                 partitions().forEach(tp ->
195                 {
196                         Long oldOffset = oldOffsets.get(tp) + 1;
197                         Long newOffset = newOffsets.get(tp) + 1;
198                         if (!oldOffset.equals(newOffset))
199                         {
200                                 log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset);
201                                 withProgress.add(tp);
202                         }
203                 });
204                 assertThat(withProgress)
205                                 .describedAs("Some offsets must have changed, compared to the old offset-positions")
206                                 .isNotEmpty();
207         }
208
209
210         /** Helper methods for setting up and running the tests */
211
212         void seekToEnd()
213         {
214                 offsetConsumer.assign(partitions());
215                 offsetConsumer.seekToEnd(partitions());
216                 partitions().forEach(tp ->
217                 {
218                         // seekToEnd() works lazily: it only takes effect on poll()/position()
219                         Long offset = offsetConsumer.position(tp);
220                         log.info("New position for {}: {}", tp, offset);
221                 });
222                 // The new positions must be commited!
223                 offsetConsumer.commitSync();
224                 offsetConsumer.unsubscribe();
225         }
226
227         void doForCurrentOffsets(BiConsumer<TopicPartition, Long> consumer)
228         {
229                 offsetConsumer.assign(partitions());
230                 partitions().forEach(tp -> consumer.accept(tp, offsetConsumer.position(tp)));
231                 offsetConsumer.unsubscribe();
232         }
233
234         List<TopicPartition> partitions()
235         {
236                 return
237                                 IntStream
238                                                 .range(0, PARTITIONS)
239                                                 .mapToObj(partition -> new TopicPartition(TOPIC, partition))
240                                                 .collect(Collectors.toList());
241         }
242
243
244         public interface RecordGenerator<K, V>
245         {
246                 public ProducerRecord<String, Bytes> generate(int partition, String key, long counter);
247         }
248
249         void send100Messages(RecordGenerator recordGenerator)
250         {
251                 long i = 0;
252
253                 for (int partition = 0; partition < 10; partition++)
254                 {
255                         for (int key = 0; key < 10; key++)
256                         {
257                                 ProducerRecord<String, Bytes> record =
258                                                 recordGenerator.generate(partition, Integer.toString(partition*10+key%2), ++i);
259
260                                 kafkaProducer.send(record, (metadata, e) ->
261                                 {
262                                         if (metadata != null)
263                                         {
264                                                 log.debug(
265                                                                 "{}|{} - {}={}",
266                                                                 metadata.partition(),
267                                                                 metadata.offset(),
268                                                                 record.key(),
269                                                                 record.value());
270                                         }
271                                         else
272                                         {
273                                                 log.warn(
274                                                                 "Exception for {}={}: {}",
275                                                                 record.key(),
276                                                                 record.value(),
277                                                                 e.toString());
278                                         }
279                                 });
280                         }
281                 }
282         }
283
284         ProducerRecord<String, Bytes> toRecord(int partition, String key, Bytes value, String type)
285         {
286                 ProducerRecord<String, Bytes> record =
287                                 new ProducerRecord<>(TOPIC, partition, key, value);
288                 record.headers().add("__TypeId__", type.getBytes());
289                 return record;
290         }
291
292         Bytes serializeClientMessage(String key, Long value)
293         {
294                 TestClientMessage message = new TestClientMessage(key, value.toString());
295                 return new Bytes(valueSerializer.serialize(TOPIC, message));
296         }
297
298         Bytes serializeGreeting(String key, Long value)
299         {
300                 TestGreeting message = new TestGreeting(key, LocalDateTime.now());
301                 return new Bytes(valueSerializer.serialize(TOPIC, message));
302         }
303
304         Bytes serializeFooMessage(String key, Long value)
305         {
306                 TestFooMessage message = new TestFooMessage(key, value);
307                 return new Bytes(valueSerializer.serialize(TOPIC, message));
308         }
309
310         @BeforeEach
311         public void init()
312         {
313                 testHandler = record -> {} ;
314
315                 seekToEnd();
316
317                 oldOffsets = new HashMap<>();
318                 newOffsets = new HashMap<>();
319                 receivedRecords = new HashSet<>();
320
321                 doForCurrentOffsets((tp, offset) ->
322                 {
323                         oldOffsets.put(tp, offset - 1);
324                         newOffsets.put(tp, offset - 1);
325                 });
326
327                 Consumer<ConsumerRecord<String, ValidMessage>> captureOffsetAndExecuteTestHandler =
328                                 record ->
329                                 {
330                                         newOffsets.put(
331                                                         new TopicPartition(record.topic(), record.partition()),
332                                                         record.offset());
333                                         receivedRecords.add(record);
334                                         testHandler.accept(record);
335                                 };
336
337                 endlessConsumer =
338                                 new EndlessConsumer<>(
339                                                 executor,
340                                                 properties.getClientId(),
341                                                 properties.getTopic(),
342                                                 kafkaConsumer,
343                                                 captureOffsetAndExecuteTestHandler);
344
345                 endlessConsumer.start();
346         }
347
348         @AfterEach
349         public void deinit()
350         {
351                 try
352                 {
353                         endlessConsumer.stop();
354                 }
355                 catch (Exception e)
356                 {
357                         log.info("Exception while stopping the consumer: {}", e.toString());
358                 }
359         }
360
361
362         @TestConfiguration
363         @Import(ApplicationConfiguration.class)
364         public static class Configuration
365         {
366                 @Bean
367                 Serializer<ValidMessage> serializer()
368                 {
369                         return new JsonSerializer<>();
370                 }
371
372                 @Bean
373                 KafkaProducer<String, Bytes> kafkaProducer(ApplicationProperties properties)
374                 {
375                         Properties props = new Properties();
376                         props.put("bootstrap.servers", properties.getBootstrapServer());
377                         props.put("linger.ms", 100);
378                         props.put("key.serializer", StringSerializer.class.getName());
379                         props.put("value.serializer", BytesSerializer.class.getName());
380
381                         return new KafkaProducer<>(props);
382                 }
383
384                 @Bean
385                 KafkaConsumer<Bytes, Bytes> offsetConsumer(ApplicationProperties properties)
386                 {
387                         Properties props = new Properties();
388                         props.put("bootstrap.servers", properties.getBootstrapServer());
389                         props.put("client.id", "OFFSET-CONSUMER");
390                         props.put("group.id", properties.getGroupId());
391                         props.put("key.deserializer", BytesDeserializer.class.getName());
392                         props.put("value.deserializer", BytesDeserializer.class.getName());
393
394                         return new KafkaConsumer<>(props);
395                 }
396         }
397 }