1 package de.juplo.kafka;
3 import com.mongodb.client.MongoClient;
4 import lombok.extern.slf4j.Slf4j;
5 import org.apache.kafka.clients.consumer.ConsumerRecord;
6 import org.apache.kafka.clients.consumer.KafkaConsumer;
7 import org.apache.kafka.clients.producer.KafkaProducer;
8 import org.apache.kafka.clients.producer.ProducerRecord;
9 import org.apache.kafka.common.TopicPartition;
10 import org.apache.kafka.common.errors.RecordDeserializationException;
11 import org.apache.kafka.common.serialization.*;
12 import org.apache.kafka.common.utils.Bytes;
13 import org.junit.jupiter.api.*;
14 import org.springframework.beans.factory.annotation.Autowired;
15 import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
16 import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
17 import org.springframework.boot.autoconfigure.mongo.MongoProperties;
18 import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo;
19 import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer;
20 import org.springframework.boot.test.context.TestConfiguration;
21 import org.springframework.context.annotation.Bean;
22 import org.springframework.context.annotation.Import;
23 import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
24 import org.springframework.kafka.core.ConsumerFactory;
25 import org.springframework.kafka.test.context.EmbeddedKafka;
26 import org.springframework.test.context.TestPropertySource;
27 import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
29 import java.time.Duration;
31 import java.util.function.BiConsumer;
32 import java.util.function.Consumer;
33 import java.util.stream.Collectors;
34 import java.util.stream.IntStream;
36 import static de.juplo.kafka.GenericApplicationTests.PARTITIONS;
37 import static de.juplo.kafka.GenericApplicationTests.TOPIC;
38 import static org.assertj.core.api.Assertions.*;
39 import static org.awaitility.Awaitility.*;
42 @SpringJUnitConfig(initializers = ConfigDataApplicationContextInitializer.class)
45 "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
46 "sumup.adder.topic=" + TOPIC,
47 "spring.kafka.consumer.auto-commit-interval=500ms",
48 "spring.mongodb.embedded.version=4.4.13" })
49 @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
50 @EnableAutoConfiguration
51 @AutoConfigureDataMongo
53 abstract class GenericApplicationTests<K, V>
55 public static final String TOPIC = "FOO";
56 public static final int PARTITIONS = 10;
60 org.apache.kafka.clients.consumer.Consumer<K, V> kafkaConsumer;
62 KafkaProperties kafkaProperties;
64 ApplicationProperties applicationProperties;
66 MongoClient mongoClient;
68 MongoProperties mongoProperties;
70 KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
72 TestRecordHandler<K, V> recordHandler;
74 EndlessConsumer<K, V> endlessConsumer;
76 KafkaProducer<Bytes, Bytes> testRecordProducer;
77 KafkaConsumer<Bytes, Bytes> offsetConsumer;
78 Map<TopicPartition, Long> oldOffsets;
81 final RecordGenerator recordGenerator;
82 final Consumer<ProducerRecord<Bytes, Bytes>> messageSender;
84 public GenericApplicationTests(RecordGenerator recordGenerator)
86 this.recordGenerator = recordGenerator;
87 this.messageSender = (record) -> sendMessage(record);
94 void commitsCurrentOffsetsOnSuccess() throws Exception
96 int numberOfGeneratedMessages =
97 recordGenerator.generate(false, false, messageSender);
99 await(numberOfGeneratedMessages + " records received")
100 .atMost(Duration.ofSeconds(30))
101 .pollInterval(Duration.ofSeconds(1))
102 .until(() -> recordHandler.receivedRecords.size() >= numberOfGeneratedMessages);
104 await("Offsets committed")
105 .atMost(Duration.ofSeconds(10))
106 .pollInterval(Duration.ofSeconds(1))
109 checkSeenOffsetsForProgress();
110 assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets);
113 assertThatExceptionOfType(IllegalStateException.class)
114 .isThrownBy(() -> endlessConsumer.exitStatus())
115 .describedAs("Consumer should still be running");
117 endlessConsumer.stop();
118 recordGenerator.assertBusinessLogic();
122 @SkipWhenErrorCannotBeGenerated(poisonPill = true)
123 void commitsOffsetOfErrorForReprocessingOnDeserializationError()
125 int numberOfGeneratedMessages =
126 recordGenerator.generate(true, false, messageSender);
128 await("Consumer failed")
129 .atMost(Duration.ofSeconds(30))
130 .pollInterval(Duration.ofSeconds(1))
131 .until(() -> !endlessConsumer.running());
133 checkSeenOffsetsForProgress();
134 assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets);
136 endlessConsumer.start();
137 await("Consumer failed")
138 .atMost(Duration.ofSeconds(30))
139 .pollInterval(Duration.ofSeconds(1))
140 .until(() -> !endlessConsumer.running());
142 checkSeenOffsetsForProgress();
143 assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets);
144 assertThat(recordHandler.receivedRecords.size())
145 .describedAs("Received not all sent events")
146 .isLessThan(numberOfGeneratedMessages);
148 assertThatNoException()
149 .describedAs("Consumer should not be running")
150 .isThrownBy(() -> endlessConsumer.exitStatus());
151 assertThat(endlessConsumer.exitStatus())
152 .describedAs("Consumer should have exited abnormally")
153 .containsInstanceOf(RecordDeserializationException.class);
155 recordGenerator.assertBusinessLogic();
159 @SkipWhenErrorCannotBeGenerated(logicError = true)
160 void doesNotCommitOffsetsOnLogicError()
162 int numberOfGeneratedMessages =
163 recordGenerator.generate(false, true, messageSender);
165 await("Consumer failed")
166 .atMost(Duration.ofSeconds(30))
167 .pollInterval(Duration.ofSeconds(1))
168 .until(() -> !endlessConsumer.running());
170 checkSeenOffsetsForProgress();
171 assertSeenOffsetsAreBehindCommittedOffsets(recordHandler.seenOffsets);
173 endlessConsumer.start();
174 await("Consumer failed")
175 .atMost(Duration.ofSeconds(30))
176 .pollInterval(Duration.ofSeconds(1))
177 .until(() -> !endlessConsumer.running());
179 assertSeenOffsetsAreBehindCommittedOffsets(recordHandler.seenOffsets);
181 assertThatNoException()
182 .describedAs("Consumer should not be running")
183 .isThrownBy(() -> endlessConsumer.exitStatus());
184 assertThat(endlessConsumer.exitStatus())
185 .describedAs("Consumer should have exited abnormally")
186 .containsInstanceOf(RuntimeException.class);
188 recordGenerator.assertBusinessLogic();
192 /** Helper methods for the verification of expectations */
194 void assertSeenOffsetsEqualCommittedOffsets(Map<TopicPartition, Long> offsetsToCheck)
196 doForCurrentOffsets((tp, offset) ->
198 Long expected = offsetsToCheck.get(tp) + 1;
199 log.debug("Checking, if the offset {} for {} is exactly {}", offset, tp, expected);
201 .describedAs("Committed offset corresponds to the offset of the consumer")
202 .isEqualTo(expected);
206 void assertSeenOffsetsAreBehindCommittedOffsets(Map<TopicPartition, Long> offsetsToCheck)
208 List<Boolean> isOffsetBehindSeen = new LinkedList<>();
210 doForCurrentOffsets((tp, offset) ->
212 Long expected = offsetsToCheck.get(tp) + 1;
213 log.debug("Checking, if the offset {} for {} is at most {}", offset, tp, expected);
215 .describedAs("Committed offset must be at most equal to the offset of the consumer")
216 .isLessThanOrEqualTo(expected);
217 isOffsetBehindSeen.add(offset < expected);
220 assertThat(isOffsetBehindSeen.stream().reduce(false, (result, next) -> result | next))
221 .describedAs("Committed offsets are behind seen offsets")
225 void checkSeenOffsetsForProgress()
227 // Be sure, that some messages were consumed...!
228 Set<TopicPartition> withProgress = new HashSet<>();
229 partitions().forEach(tp ->
231 Long oldOffset = oldOffsets.get(tp) + 1;
232 Long newOffset = recordHandler.seenOffsets.get(tp) + 1;
233 if (!oldOffset.equals(newOffset))
235 log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset);
236 withProgress.add(tp);
239 assertThat(withProgress)
240 .describedAs("Some offsets must have changed, compared to the old offset-positions")
245 /** Helper methods for setting up and running the tests */
249 offsetConsumer.assign(partitions());
250 offsetConsumer.seekToEnd(partitions());
251 partitions().forEach(tp ->
253 // seekToEnd() works lazily: it only takes effect on poll()/position()
254 Long offset = offsetConsumer.position(tp);
255 log.info("New position for {}: {}", tp, offset);
257 // The new positions must be commited!
258 offsetConsumer.commitSync();
259 offsetConsumer.unsubscribe();
262 void doForCurrentOffsets(BiConsumer<TopicPartition, Long> consumer)
264 offsetConsumer.assign(partitions());
265 partitions().forEach(tp -> consumer.accept(tp, offsetConsumer.position(tp)));
266 offsetConsumer.unsubscribe();
269 List<TopicPartition> partitions()
273 .range(0, PARTITIONS)
274 .mapToObj(partition -> new TopicPartition(TOPIC, partition))
275 .collect(Collectors.toList());
279 public interface RecordGenerator
284 Consumer<ProducerRecord<Bytes, Bytes>> messageSender);
286 default boolean canGeneratePoisonPill()
291 default boolean canGenerateLogicError()
296 default void assertBusinessLogic()
298 log.debug("No business-logic to assert");
302 void sendMessage(ProducerRecord<Bytes, Bytes> record)
304 testRecordProducer.send(record, (metadata, e) ->
306 if (metadata != null)
310 metadata.partition(),
318 "Exception for {}={}: {}",
331 props = new Properties();
332 props.put("bootstrap.servers", kafkaProperties.getBootstrapServers());
333 props.put("linger.ms", 100);
334 props.put("key.serializer", BytesSerializer.class.getName());
335 props.put("value.serializer", BytesSerializer.class.getName());
336 testRecordProducer = new KafkaProducer<>(props);
338 props = new Properties();
339 props.put("bootstrap.servers", kafkaProperties.getBootstrapServers());
340 props.put("client.id", "OFFSET-CONSUMER");
341 props.put("group.id", kafkaProperties.getConsumer().getGroupId());
342 props.put("key.deserializer", BytesDeserializer.class.getName());
343 props.put("value.deserializer", BytesDeserializer.class.getName());
344 offsetConsumer = new KafkaConsumer<>(props);
346 mongoClient.getDatabase(mongoProperties.getDatabase()).drop();
349 oldOffsets = new HashMap<>();
350 recordHandler.seenOffsets = new HashMap<>();
351 recordHandler.receivedRecords = new HashSet<>();
353 doForCurrentOffsets((tp, offset) ->
355 oldOffsets.put(tp, offset - 1);
356 recordHandler.seenOffsets.put(tp, offset - 1);
359 endlessConsumer.start();
367 endlessConsumer.stop();
371 log.debug("{}", e.toString());
376 testRecordProducer.close();
377 offsetConsumer.close();
381 log.info("Exception while stopping the consumer: {}", e.toString());
387 @Import(ApplicationConfiguration.class)
388 public static class Configuration
391 public RecordHandler recordHandler(RecordHandler applicationRecordHandler)
393 return new TestRecordHandler(applicationRecordHandler);
396 @Bean(destroyMethod = "close")
397 public org.apache.kafka.clients.consumer.Consumer<String, Message> kafkaConsumer(ConsumerFactory<String, Message> factory)
399 return factory.createConsumer();