Es sollte eine andere Group-ID voreingestellt sein
[demos/kafka/training] / src / test / java / de / juplo / kafka / ApplicationTests.java
1 package de.juplo.kafka;
2
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.consumer.ConsumerRecord;
5 import org.apache.kafka.clients.consumer.KafkaConsumer;
6 import org.apache.kafka.clients.producer.KafkaProducer;
7 import org.apache.kafka.clients.producer.ProducerRecord;
8 import org.apache.kafka.common.TopicPartition;
9 import org.apache.kafka.common.errors.RecordDeserializationException;
10 import org.apache.kafka.common.serialization.*;
11 import org.apache.kafka.common.utils.Bytes;
12 import org.junit.jupiter.api.*;
13 import org.springframework.beans.factory.annotation.Autowired;
14 import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
15 import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer;
16 import org.springframework.boot.test.context.TestConfiguration;
17 import org.springframework.context.annotation.Bean;
18 import org.springframework.context.annotation.Import;
19 import org.springframework.kafka.test.context.EmbeddedKafka;
20 import org.springframework.test.context.TestPropertySource;
21 import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
22
23 import java.time.Duration;
24 import java.util.*;
25 import java.util.concurrent.ExecutionException;
26 import java.util.concurrent.ExecutorService;
27 import java.util.function.BiConsumer;
28 import java.util.stream.Collectors;
29 import java.util.stream.IntStream;
30
31 import static de.juplo.kafka.ApplicationTests.*;
32 import static org.assertj.core.api.Assertions.*;
33 import static org.awaitility.Awaitility.*;
34
35
36 @SpringJUnitConfig(initializers = ConfigDataApplicationContextInitializer.class)
37 @TestMethodOrder(MethodOrderer.OrderAnnotation.class)
38 @TestPropertySource(
39                 properties = {
40                                 "sumup.requests.bootstrap-server=${spring.embedded.kafka.brokers}",
41                                 "sumup.requests.topic-in=" + INPUT_TOPIC,
42                                 "sumup.requests.commit-interval=1s" })
43 @EmbeddedKafka(topics = { INPUT_TOPIC, OUTPUT_TOPIC }, partitions = PARTITIONS)
44 @EnableAutoConfiguration
45 @Slf4j
46 class ApplicationTests
47 {
48         public static final String INPUT_TOPIC = "FOO";
49         public static final String OUTPUT_TOPIC = "BAR";
50         public static final int PARTITIONS = 10;
51
52
53         StringSerializer stringSerializer = new StringSerializer();
54
55         @Autowired
56         Serializer valueSerializer;
57         @Autowired
58         KafkaProducer<String, Bytes> testProducer;
59         @Autowired
60         KafkaConsumer<String, Integer> kafkaConsumer;
61         @Autowired
62         KafkaConsumer<Bytes, Bytes> offsetConsumer;
63         @Autowired
64         ApplicationProperties properties;
65         @Autowired
66         ExecutorService executor;
67         @Autowired
68         RecordHandler noopRecordHandler;
69
70         EndlessConsumer<String, Integer> endlessConsumer;
71         Map<TopicPartition, Long> oldOffsets;
72         Map<TopicPartition, Long> newOffsets;
73         Set<ConsumerRecord<String, Integer>> receivedRecords;
74
75
76         /** Tests methods */
77
78         @Test
79         void commitsCurrentOffsetsOnSuccess() throws ExecutionException, InterruptedException
80         {
81                 send100Messages((partition, key, counter) ->
82                 {
83                         Bytes value = new Bytes(valueSerializer.serialize(INPUT_TOPIC, counter));
84                         return new ProducerRecord<>(INPUT_TOPIC, partition, key, value);
85                 });
86
87                 await("100 records received")
88                                 .atMost(Duration.ofSeconds(30))
89                                 .pollInterval(Duration.ofSeconds(1))
90                                 .until(() -> receivedRecords.size() >= 100);
91
92                 await("Offsets committed")
93                                 .atMost(Duration.ofSeconds(10))
94                                 .pollInterval(Duration.ofSeconds(1))
95                                 .untilAsserted(() ->
96                                 {
97                                         checkSeenOffsetsForProgress();
98                                         compareToCommitedOffsets(newOffsets);
99                                 });
100
101                 assertThatExceptionOfType(IllegalStateException.class)
102                                 .isThrownBy(() -> endlessConsumer.exitStatus())
103                                 .describedAs("Consumer should still be running");
104         }
105
106         @Test
107         void commitsOffsetOfErrorForReprocessingOnDeserializationError()
108         {
109                 send100Messages((partition, key, counter) ->
110                 {
111                         Bytes value = counter == 77
112                                         ? new Bytes(stringSerializer.serialize(INPUT_TOPIC, "BOOM!"))
113                                         : new Bytes(valueSerializer.serialize(INPUT_TOPIC, counter));
114                         return new ProducerRecord<>(INPUT_TOPIC, partition, key, value);
115                 });
116
117                 await("Consumer failed")
118                                 .atMost(Duration.ofSeconds(30))
119                                 .pollInterval(Duration.ofSeconds(1))
120                                 .until(() -> !endlessConsumer.running());
121
122                 checkSeenOffsetsForProgress();
123                 compareToCommitedOffsets(newOffsets);
124
125                 endlessConsumer.start();
126                 await("Consumer failed")
127                                 .atMost(Duration.ofSeconds(30))
128                                 .pollInterval(Duration.ofSeconds(1))
129                                 .until(() -> !endlessConsumer.running());
130
131                 checkSeenOffsetsForProgress();
132                 compareToCommitedOffsets(newOffsets);
133                 assertThat(receivedRecords.size())
134                                 .describedAs("Received not all sent events")
135                                 .isLessThan(100);
136
137                 assertThatNoException()
138                                 .describedAs("Consumer should not be running")
139                                 .isThrownBy(() -> endlessConsumer.exitStatus());
140                 assertThat(endlessConsumer.exitStatus())
141                                 .describedAs("Consumer should have exited abnormally")
142                                 .containsInstanceOf(RecordDeserializationException.class);
143         }
144
145
146         /** Helper methods for the verification of expectations */
147
148         void compareToCommitedOffsets(Map<TopicPartition, Long> offsetsToCheck)
149         {
150                 doForCurrentOffsets((tp, offset) ->
151                 {
152                         Long expected = offsetsToCheck.get(tp) + 1;
153                         log.debug("Checking, if the offset for {} is {}", tp, expected);
154                         assertThat(offset)
155                                         .describedAs("Committed offset corresponds to the offset of the consumer")
156                                         .isEqualTo(expected);
157                 });
158         }
159
160         void checkSeenOffsetsForProgress()
161         {
162                 // Be sure, that some messages were consumed...!
163                 Set<TopicPartition> withProgress = new HashSet<>();
164                 partitions().forEach(tp ->
165                 {
166                         Long oldOffset = oldOffsets.get(tp) + 1;
167                         Long newOffset = newOffsets.get(tp) + 1;
168                         if (!oldOffset.equals(newOffset))
169                         {
170                                 log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset);
171                                 withProgress.add(tp);
172                         }
173                 });
174                 assertThat(withProgress)
175                                 .describedAs("Some offsets must have changed, compared to the old offset-positions")
176                                 .isNotEmpty();
177         }
178
179
180         /** Helper methods for setting up and running the tests */
181
182         void seekToEnd()
183         {
184                 offsetConsumer.assign(partitions());
185                 offsetConsumer.seekToEnd(partitions());
186                 partitions().forEach(tp ->
187                 {
188                         // seekToEnd() works lazily: it only takes effect on poll()/position()
189                         Long offset = offsetConsumer.position(tp);
190                         log.info("New position for {}: {}", tp, offset);
191                 });
192                 // The new positions must be commited!
193                 offsetConsumer.commitSync();
194                 offsetConsumer.unsubscribe();
195         }
196
197         void doForCurrentOffsets(BiConsumer<TopicPartition, Long> consumer)
198         {
199                 offsetConsumer.assign(partitions());
200                 partitions().forEach(tp -> consumer.accept(tp, offsetConsumer.position(tp)));
201                 offsetConsumer.unsubscribe();
202         }
203
204         List<TopicPartition> partitions()
205         {
206                 return
207                                 IntStream
208                                                 .range(0, PARTITIONS)
209                                                 .mapToObj(partition -> new TopicPartition(INPUT_TOPIC, partition))
210                                                 .collect(Collectors.toList());
211         }
212
213
214         public interface RecordGenerator<K, V>
215         {
216                 public ProducerRecord<String, Bytes> generate(int partition, String key, int counter);
217         }
218
219         void send100Messages(RecordGenerator recordGenerator)
220         {
221                 int i = 0;
222
223                 for (int partition = 0; partition < 10; partition++)
224                 {
225                         for (int key = 0; key < 10; key++)
226                         {
227                                 ProducerRecord<String, Bytes> record =
228                                                 recordGenerator.generate(partition, Integer.toString(partition*10+key%2), ++i);
229
230                                 testProducer.send(record, (metadata, e) ->
231                                 {
232                                         if (metadata != null)
233                                         {
234                                                 log.debug(
235                                                                 "{}|{} - {}={}",
236                                                                 metadata.partition(),
237                                                                 metadata.offset(),
238                                                                 record.key(),
239                                                                 record.value());
240                                         }
241                                         else
242                                         {
243                                                 log.warn(
244                                                                 "Exception for {}={}: {}",
245                                                                 record.key(),
246                                                                 record.value(),
247                                                                 e.toString());
248                                         }
249                                 });
250                         }
251                 }
252         }
253
254
255         @BeforeEach
256         public void init()
257         {
258                 seekToEnd();
259
260                 oldOffsets = new HashMap<>();
261                 newOffsets = new HashMap<>();
262                 receivedRecords = new HashSet<>();
263
264                 doForCurrentOffsets((tp, offset) ->
265                 {
266                         oldOffsets.put(tp, offset - 1);
267                         newOffsets.put(tp, offset - 1);
268                 });
269
270                 TestRecordHandler<String, Integer> captureOffsetAndExecuteTestHandler =
271                                 new TestRecordHandler<String, Integer>(noopRecordHandler) {
272                                         @Override
273                                         public void onNewRecord(ConsumerRecord<String, Integer> record)
274                                         {
275                                                 newOffsets.put(
276                                                                 new TopicPartition(record.topic(), record.partition()),
277                                                                 record.offset());
278                                                 receivedRecords.add(record);
279                                         }
280                                 };
281
282                 endlessConsumer =
283                                 new EndlessConsumer<>(
284                                                 executor,
285                                                 properties.getClientId(),
286                                                 properties.getTopicIn(),
287                                                 kafkaConsumer,
288                                                 captureOffsetAndExecuteTestHandler);
289
290                 endlessConsumer.start();
291         }
292
293         @AfterEach
294         public void deinit()
295         {
296                 try
297                 {
298                         endlessConsumer.stop();
299                 }
300                 catch (Exception e)
301                 {
302                         log.info("Exception while stopping the consumer: {}", e.toString());
303                 }
304         }
305
306
307         @TestConfiguration
308         @Import(ApplicationConfiguration.class)
309         public static class Configuration
310         {
311                 @Bean
312                 Serializer<Integer> valueSerializer()
313                 {
314                         return new IntegerSerializer();
315                 }
316
317                 @Bean
318                 KafkaProducer<String, Bytes> testProducer(ApplicationProperties properties)
319                 {
320                         Properties props = new Properties();
321                         props.put("bootstrap.servers", properties.getBootstrapServer());
322                         props.put("linger.ms", 100);
323                         props.put("key.serializer", StringSerializer.class.getName());
324                         props.put("value.serializer", BytesSerializer.class.getName());
325
326                         return new KafkaProducer<>(props);
327                 }
328
329                 @Bean
330                 KafkaConsumer<Bytes, Bytes> offsetConsumer(ApplicationProperties properties)
331                 {
332                         Properties props = new Properties();
333                         props.put("bootstrap.servers", properties.getBootstrapServer());
334                         props.put("client.id", "OFFSET-CONSUMER");
335                         props.put("group.id", properties.getGroupId());
336                         props.put("key.deserializer", BytesDeserializer.class.getName());
337                         props.put("value.deserializer", BytesDeserializer.class.getName());
338
339                         return new KafkaConsumer<>(props);
340                 }
341
342                 @Bean
343                 KafkaProducer<String, String> kafkaProducer(ApplicationProperties properties)
344                 {
345                         Properties props = new Properties();
346                         props.put("bootstrap.servers", properties.getBootstrapServer());
347                         props.put("key.serializer", StringSerializer.class.getName());
348                         props.put("value.serializer", StringSerializer.class.getName());
349
350                         return new KafkaProducer<>(props);
351                 }
352         }
353 }