1 package de.juplo.kafka;
3 import com.mongodb.client.MongoClient;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
6 import org.apache.kafka.clients.consumer.ConsumerRecord;
7 import org.apache.kafka.clients.consumer.KafkaConsumer;
8 import org.apache.kafka.clients.producer.KafkaProducer;
9 import org.apache.kafka.clients.producer.ProducerRecord;
10 import org.apache.kafka.common.TopicPartition;
11 import org.apache.kafka.common.errors.RecordDeserializationException;
12 import org.apache.kafka.common.serialization.*;
13 import org.apache.kafka.common.utils.Bytes;
14 import org.junit.jupiter.api.*;
15 import org.springframework.beans.factory.annotation.Autowired;
16 import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
17 import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
18 import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
19 import org.springframework.boot.autoconfigure.mongo.MongoProperties;
20 import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo;
21 import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer;
22 import org.springframework.boot.test.context.TestConfiguration;
23 import org.springframework.context.annotation.Import;
24 import org.springframework.kafka.test.context.EmbeddedKafka;
25 import org.springframework.test.context.TestPropertySource;
26 import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
28 import java.time.Duration;
30 import java.util.concurrent.ExecutorService;
31 import java.util.function.BiConsumer;
32 import java.util.function.Consumer;
33 import java.util.stream.Collectors;
34 import java.util.stream.IntStream;
36 import static de.juplo.kafka.GenericApplicationTests.PARTITIONS;
37 import static de.juplo.kafka.GenericApplicationTests.TOPIC;
38 import static org.assertj.core.api.Assertions.*;
39 import static org.awaitility.Awaitility.*;
43 initializers = ConfigDataApplicationContextInitializer.class,
45 KafkaAutoConfiguration.class,
46 ApplicationTests.Configuration.class })
49 "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
50 "sumup.adder.topic=" + TOPIC,
51 "spring.kafka.consumer.auto-commit-interval=500ms",
52 "spring.mongodb.embedded.version=4.4.13" })
53 @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
54 @EnableAutoConfiguration
55 @AutoConfigureDataMongo
57 abstract class GenericApplicationTests<K, V>
59 public static final String TOPIC = "FOO";
60 public static final int PARTITIONS = 10;
64 org.apache.kafka.clients.consumer.Consumer<K, V> kafkaConsumer;
66 Consumer<ConsumerRecord<K, V>> consumer;
68 ApplicationProperties applicationProperties;
70 KafkaProperties kafkaProperties;
72 ExecutorService executor;
74 MongoClient mongoClient;
76 MongoProperties mongoProperties;
78 ConsumerRebalanceListener rebalanceListener;
80 RecordHandler<K, V> recordHandler;
82 KafkaProducer<Bytes, Bytes> testRecordProducer;
83 KafkaConsumer<Bytes, Bytes> offsetConsumer;
84 EndlessConsumer<K, V> endlessConsumer;
85 Map<TopicPartition, Long> oldOffsets;
86 Map<TopicPartition, Long> seenOffsets;
87 Set<ConsumerRecord<K, V>> receivedRecords;
90 final RecordGenerator recordGenerator;
91 final Consumer<ProducerRecord<Bytes, Bytes>> messageSender;
93 public GenericApplicationTests(RecordGenerator recordGenerator)
95 this.recordGenerator = recordGenerator;
96 this.messageSender = (record) -> sendMessage(record);
103 void commitsCurrentOffsetsOnSuccess() throws Exception
105 int numberOfGeneratedMessages =
106 recordGenerator.generate(false, false, messageSender);
108 await(numberOfGeneratedMessages + " records received")
109 .atMost(Duration.ofSeconds(30))
110 .pollInterval(Duration.ofSeconds(1))
111 .until(() -> receivedRecords.size() >= numberOfGeneratedMessages);
113 await("Offsets committed")
114 .atMost(Duration.ofSeconds(10))
115 .pollInterval(Duration.ofSeconds(1))
118 checkSeenOffsetsForProgress();
119 assertSeenOffsetsEqualCommittedOffsets(seenOffsets);
122 assertThatExceptionOfType(IllegalStateException.class)
123 .isThrownBy(() -> endlessConsumer.exitStatus())
124 .describedAs("Consumer should still be running");
126 endlessConsumer.stop();
127 recordGenerator.assertBusinessLogic();
131 @SkipWhenErrorCannotBeGenerated(poisonPill = true)
132 void commitsOffsetOfErrorForReprocessingOnDeserializationError()
134 int numberOfGeneratedMessages =
135 recordGenerator.generate(true, false, messageSender);
137 await("Consumer failed")
138 .atMost(Duration.ofSeconds(30))
139 .pollInterval(Duration.ofSeconds(1))
140 .until(() -> !endlessConsumer.running());
142 checkSeenOffsetsForProgress();
143 assertSeenOffsetsEqualCommittedOffsets(seenOffsets);
145 endlessConsumer.start();
146 await("Consumer failed")
147 .atMost(Duration.ofSeconds(30))
148 .pollInterval(Duration.ofSeconds(1))
149 .until(() -> !endlessConsumer.running());
151 checkSeenOffsetsForProgress();
152 assertSeenOffsetsEqualCommittedOffsets(seenOffsets);
153 assertThat(receivedRecords.size())
154 .describedAs("Received not all sent events")
155 .isLessThan(numberOfGeneratedMessages);
157 assertThatNoException()
158 .describedAs("Consumer should not be running")
159 .isThrownBy(() -> endlessConsumer.exitStatus());
160 assertThat(endlessConsumer.exitStatus())
161 .describedAs("Consumer should have exited abnormally")
162 .containsInstanceOf(RecordDeserializationException.class);
164 recordGenerator.assertBusinessLogic();
168 @SkipWhenErrorCannotBeGenerated(logicError = true)
169 void doesNotCommitOffsetsOnLogicError()
171 int numberOfGeneratedMessages =
172 recordGenerator.generate(false, true, messageSender);
174 await("Consumer failed")
175 .atMost(Duration.ofSeconds(30))
176 .pollInterval(Duration.ofSeconds(1))
177 .until(() -> !endlessConsumer.running());
179 checkSeenOffsetsForProgress();
180 assertSeenOffsetsAreBehindCommittedOffsets(seenOffsets);
182 endlessConsumer.start();
183 await("Consumer failed")
184 .atMost(Duration.ofSeconds(30))
185 .pollInterval(Duration.ofSeconds(1))
186 .until(() -> !endlessConsumer.running());
188 assertSeenOffsetsAreBehindCommittedOffsets(seenOffsets);
190 assertThatNoException()
191 .describedAs("Consumer should not be running")
192 .isThrownBy(() -> endlessConsumer.exitStatus());
193 assertThat(endlessConsumer.exitStatus())
194 .describedAs("Consumer should have exited abnormally")
195 .containsInstanceOf(RuntimeException.class);
197 recordGenerator.assertBusinessLogic();
201 /** Helper methods for the verification of expectations */
203 void assertSeenOffsetsEqualCommittedOffsets(Map<TopicPartition, Long> offsetsToCheck)
205 doForCurrentOffsets((tp, offset) ->
207 Long expected = offsetsToCheck.get(tp) + 1;
208 log.debug("Checking, if the offset {} for {} is exactly {}", offset, tp, expected);
210 .describedAs("Committed offset corresponds to the offset of the consumer")
211 .isEqualTo(expected);
215 void assertSeenOffsetsAreBehindCommittedOffsets(Map<TopicPartition, Long> offsetsToCheck)
217 List<Boolean> isOffsetBehindSeen = new LinkedList<>();
219 doForCurrentOffsets((tp, offset) ->
221 Long expected = offsetsToCheck.get(tp) + 1;
222 log.debug("Checking, if the offset {} for {} is at most {}", offset, tp, expected);
224 .describedAs("Committed offset must be at most equal to the offset of the consumer")
225 .isLessThanOrEqualTo(expected);
226 isOffsetBehindSeen.add(offset < expected);
229 assertThat(isOffsetBehindSeen.stream().reduce(false, (result, next) -> result | next))
230 .describedAs("Committed offsets are behind seen offsets")
234 void checkSeenOffsetsForProgress()
236 // Be sure, that some messages were consumed...!
237 Set<TopicPartition> withProgress = new HashSet<>();
238 partitions().forEach(tp ->
240 Long oldOffset = oldOffsets.get(tp) + 1;
241 Long newOffset = seenOffsets.get(tp) + 1;
242 if (!oldOffset.equals(newOffset))
244 log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset);
245 withProgress.add(tp);
248 assertThat(withProgress)
249 .describedAs("Some offsets must have changed, compared to the old offset-positions")
254 /** Helper methods for setting up and running the tests */
258 offsetConsumer.assign(partitions());
259 offsetConsumer.seekToEnd(partitions());
260 partitions().forEach(tp ->
262 // seekToEnd() works lazily: it only takes effect on poll()/position()
263 Long offset = offsetConsumer.position(tp);
264 log.info("New position for {}: {}", tp, offset);
266 // The new positions must be commited!
267 offsetConsumer.commitSync();
268 offsetConsumer.unsubscribe();
271 void doForCurrentOffsets(BiConsumer<TopicPartition, Long> consumer)
273 offsetConsumer.assign(partitions());
274 partitions().forEach(tp -> consumer.accept(tp, offsetConsumer.position(tp)));
275 offsetConsumer.unsubscribe();
278 List<TopicPartition> partitions()
282 .range(0, PARTITIONS)
283 .mapToObj(partition -> new TopicPartition(TOPIC, partition))
284 .collect(Collectors.toList());
288 public interface RecordGenerator
293 Consumer<ProducerRecord<Bytes, Bytes>> messageSender);
295 default boolean canGeneratePoisonPill()
300 default boolean canGenerateLogicError()
305 default void assertBusinessLogic()
307 log.debug("No business-logic to assert");
311 void sendMessage(ProducerRecord<Bytes, Bytes> record)
313 testRecordProducer.send(record, (metadata, e) ->
315 if (metadata != null)
319 metadata.partition(),
327 "Exception for {}={}: {}",
340 props = new Properties();
341 props.put("bootstrap.servers", kafkaProperties.getBootstrapServers());
342 props.put("linger.ms", 100);
343 props.put("key.serializer", BytesSerializer.class.getName());
344 props.put("value.serializer", BytesSerializer.class.getName());
345 testRecordProducer = new KafkaProducer<>(props);
347 props = new Properties();
348 props.put("bootstrap.servers", kafkaProperties.getBootstrapServers());
349 props.put("client.id", "OFFSET-CONSUMER");
350 props.put("group.id", kafkaProperties.getConsumer().getGroupId());
351 props.put("key.deserializer", BytesDeserializer.class.getName());
352 props.put("value.deserializer", BytesDeserializer.class.getName());
353 offsetConsumer = new KafkaConsumer<>(props);
355 mongoClient.getDatabase(mongoProperties.getDatabase()).drop();
358 oldOffsets = new HashMap<>();
359 seenOffsets = new HashMap<>();
360 receivedRecords = new HashSet<>();
362 doForCurrentOffsets((tp, offset) ->
364 oldOffsets.put(tp, offset - 1);
365 seenOffsets.put(tp, offset - 1);
368 TestRecordHandler<K, V> captureOffsetAndExecuteTestHandler =
369 new TestRecordHandler<K, V>(recordHandler)
372 public void onNewRecord(ConsumerRecord<K, V> record)
375 new TopicPartition(record.topic(), record.partition()),
377 receivedRecords.add(record);
382 new EndlessConsumer<>(
384 kafkaProperties.getClientId(),
385 applicationProperties.getTopic(),
388 captureOffsetAndExecuteTestHandler);
390 endlessConsumer.start();
398 testRecordProducer.close();
399 offsetConsumer.close();
403 log.info("Exception while stopping the consumer: {}", e.toString());
409 @Import(ApplicationConfiguration.class)
410 public static class Configuration