1 package de.juplo.kafka;
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.consumer.ConsumerRecord;
5 import org.apache.kafka.clients.consumer.KafkaConsumer;
6 import org.apache.kafka.clients.producer.KafkaProducer;
7 import org.apache.kafka.clients.producer.ProducerRecord;
8 import org.apache.kafka.common.TopicPartition;
9 import org.apache.kafka.common.errors.RecordDeserializationException;
10 import org.apache.kafka.common.serialization.*;
11 import org.apache.kafka.common.utils.Bytes;
12 import org.junit.jupiter.api.*;
13 import org.springframework.beans.factory.annotation.Autowired;
14 import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer;
15 import org.springframework.boot.test.context.TestConfiguration;
16 import org.springframework.context.annotation.Bean;
17 import org.springframework.context.annotation.Import;
18 import org.springframework.kafka.test.context.EmbeddedKafka;
19 import org.springframework.test.context.TestPropertySource;
20 import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
22 import java.time.Duration;
24 import java.util.function.BiConsumer;
25 import java.util.function.Consumer;
26 import java.util.stream.Collectors;
27 import java.util.stream.IntStream;
29 import static de.juplo.kafka.GenericApplicationTests.PARTITIONS;
30 import static de.juplo.kafka.GenericApplicationTests.TOPIC;
31 import static org.assertj.core.api.Assertions.*;
32 import static org.awaitility.Awaitility.*;
35 @SpringJUnitConfig(initializers = ConfigDataApplicationContextInitializer.class)
38 "consumer.bootstrap-server=${spring.embedded.kafka.brokers}",
39 "consumer.topic=" + TOPIC,
40 "consumer.commit-interval=500ms" })
41 @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
43 abstract class GenericApplicationTests<K, V>
45 public static final String TOPIC = "FOO";
46 public static final int PARTITIONS = 10;
50 org.apache.kafka.clients.consumer.Consumer<K, V> kafkaConsumer;
52 Consumer<ConsumerRecord<K, V>> consumer;
54 ApplicationProperties applicationProperties;
56 TestRecordHandler<K, V> recordHandler;
58 EndlessConsumer<K, V> endlessConsumer;
61 KafkaProducer<Bytes, Bytes> testRecordProducer;
62 KafkaConsumer<Bytes, Bytes> offsetConsumer;
63 Map<TopicPartition, Long> oldOffsets;
66 final RecordGenerator recordGenerator;
67 final Consumer<ProducerRecord<Bytes, Bytes>> messageSender;
69 public GenericApplicationTests(RecordGenerator recordGenerator)
71 this.recordGenerator = recordGenerator;
72 this.messageSender = (record) -> sendMessage(record);
79 void commitsCurrentOffsetsOnSuccess() throws Exception
81 int numberOfGeneratedMessages =
82 recordGenerator.generate(false, false, messageSender);
84 await(numberOfGeneratedMessages + " records received")
85 .atMost(Duration.ofSeconds(30))
86 .pollInterval(Duration.ofSeconds(1))
87 .until(() -> recordHandler.receivedRecords.size() >= numberOfGeneratedMessages);
89 await("Offsets committed")
90 .atMost(Duration.ofSeconds(10))
91 .pollInterval(Duration.ofSeconds(1))
94 checkSeenOffsetsForProgress();
95 assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets);
98 assertThatExceptionOfType(IllegalStateException.class)
99 .isThrownBy(() -> endlessConsumer.exitStatus())
100 .describedAs("Consumer should still be running");
102 endlessConsumer.stop();
103 recordGenerator.assertBusinessLogic();
107 @SkipWhenErrorCannotBeGenerated(poisonPill = true)
108 void commitsOffsetOfErrorForReprocessingOnDeserializationError()
110 int numberOfGeneratedMessages =
111 recordGenerator.generate(true, false, messageSender);
113 await("Consumer failed")
114 .atMost(Duration.ofSeconds(30))
115 .pollInterval(Duration.ofSeconds(1))
116 .until(() -> !endlessConsumer.running());
118 checkSeenOffsetsForProgress();
119 assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets);
121 endlessConsumer.start();
122 await("Consumer failed")
123 .atMost(Duration.ofSeconds(30))
124 .pollInterval(Duration.ofSeconds(1))
125 .until(() -> !endlessConsumer.running());
127 checkSeenOffsetsForProgress();
128 assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets);
129 assertThat(recordHandler.receivedRecords.size())
130 .describedAs("Received not all sent events")
131 .isLessThan(numberOfGeneratedMessages);
133 assertThatNoException()
134 .describedAs("Consumer should not be running")
135 .isThrownBy(() -> endlessConsumer.exitStatus());
136 assertThat(endlessConsumer.exitStatus())
137 .describedAs("Consumer should have exited abnormally")
138 .containsInstanceOf(RecordDeserializationException.class);
140 recordGenerator.assertBusinessLogic();
144 @SkipWhenErrorCannotBeGenerated(logicError = true)
145 void doesNotCommitOffsetsOnLogicError()
147 int numberOfGeneratedMessages =
148 recordGenerator.generate(false, true, messageSender);
150 await("Consumer failed")
151 .atMost(Duration.ofSeconds(30))
152 .pollInterval(Duration.ofSeconds(1))
153 .until(() -> !endlessConsumer.running());
155 checkSeenOffsetsForProgress();
156 assertSeenOffsetsAreBehindCommittedOffsets(recordHandler.seenOffsets);
158 endlessConsumer.start();
159 await("Consumer failed")
160 .atMost(Duration.ofSeconds(30))
161 .pollInterval(Duration.ofSeconds(1))
162 .until(() -> !endlessConsumer.running());
164 assertSeenOffsetsAreBehindCommittedOffsets(recordHandler.seenOffsets);
166 assertThatNoException()
167 .describedAs("Consumer should not be running")
168 .isThrownBy(() -> endlessConsumer.exitStatus());
169 assertThat(endlessConsumer.exitStatus())
170 .describedAs("Consumer should have exited abnormally")
171 .containsInstanceOf(RuntimeException.class);
173 recordGenerator.assertBusinessLogic();
177 /** Helper methods for the verification of expectations */
179 void assertSeenOffsetsEqualCommittedOffsets(Map<TopicPartition, Long> offsetsToCheck)
181 doForCurrentOffsets((tp, offset) ->
183 Long expected = offsetsToCheck.get(tp) + 1;
184 log.debug("Checking, if the offset {} for {} is exactly {}", offset, tp, expected);
186 .describedAs("Committed offset corresponds to the offset of the consumer")
187 .isEqualTo(expected);
191 void assertSeenOffsetsAreBehindCommittedOffsets(Map<TopicPartition, Long> offsetsToCheck)
193 List<Boolean> isOffsetBehindSeen = new LinkedList<>();
195 doForCurrentOffsets((tp, offset) ->
197 Long expected = offsetsToCheck.get(tp) + 1;
198 log.debug("Checking, if the offset {} for {} is at most {}", offset, tp, expected);
200 .describedAs("Committed offset must be at most equal to the offset of the consumer")
201 .isLessThanOrEqualTo(expected);
202 isOffsetBehindSeen.add(offset < expected);
205 assertThat(isOffsetBehindSeen.stream().reduce(false, (result, next) -> result | next))
206 .describedAs("Committed offsets are behind seen offsets")
210 void checkSeenOffsetsForProgress()
212 // Be sure, that some messages were consumed...!
213 Set<TopicPartition> withProgress = new HashSet<>();
214 partitions().forEach(tp ->
216 Long oldOffset = oldOffsets.get(tp) + 1;
217 Long newOffset = recordHandler.seenOffsets.get(tp) + 1;
218 if (!oldOffset.equals(newOffset))
220 log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset);
221 withProgress.add(tp);
224 assertThat(withProgress)
225 .describedAs("Some offsets must have changed, compared to the old offset-positions")
230 /** Helper methods for setting up and running the tests */
234 offsetConsumer.assign(partitions());
235 offsetConsumer.seekToEnd(partitions());
236 partitions().forEach(tp ->
238 // seekToEnd() works lazily: it only takes effect on poll()/position()
239 Long offset = offsetConsumer.position(tp);
240 log.info("New position for {}: {}", tp, offset);
242 // The new positions must be commited!
243 offsetConsumer.commitSync();
244 offsetConsumer.unsubscribe();
247 void doForCurrentOffsets(BiConsumer<TopicPartition, Long> consumer)
249 offsetConsumer.assign(partitions());
250 partitions().forEach(tp -> consumer.accept(tp, offsetConsumer.position(tp)));
251 offsetConsumer.unsubscribe();
254 List<TopicPartition> partitions()
258 .range(0, PARTITIONS)
259 .mapToObj(partition -> new TopicPartition(TOPIC, partition))
260 .collect(Collectors.toList());
264 public interface RecordGenerator
269 Consumer<ProducerRecord<Bytes, Bytes>> messageSender);
271 default boolean canGeneratePoisonPill()
276 default boolean canGenerateLogicError()
281 default void assertBusinessLogic()
283 log.debug("No business-logic to assert");
287 void sendMessage(ProducerRecord<Bytes, Bytes> record)
289 testRecordProducer.send(record, (metadata, e) ->
291 if (metadata != null)
295 metadata.partition(),
303 "Exception for {}={}: {}",
316 props = new Properties();
317 props.put("bootstrap.servers", applicationProperties.getBootstrapServer());
318 props.put("linger.ms", 100);
319 props.put("key.serializer", BytesSerializer.class.getName());
320 props.put("value.serializer", BytesSerializer.class.getName());
321 testRecordProducer = new KafkaProducer<>(props);
323 props = new Properties();
324 props.put("bootstrap.servers", applicationProperties.getBootstrapServer());
325 props.put("client.id", "OFFSET-CONSUMER");
326 props.put("group.id", applicationProperties.getGroupId());
327 props.put("key.deserializer", BytesDeserializer.class.getName());
328 props.put("value.deserializer", BytesDeserializer.class.getName());
329 offsetConsumer = new KafkaConsumer<>(props);
333 oldOffsets = new HashMap<>();
334 recordHandler.seenOffsets = new HashMap<>();
335 recordHandler.receivedRecords = new HashSet<>();
337 doForCurrentOffsets((tp, offset) ->
339 oldOffsets.put(tp, offset - 1);
340 recordHandler.seenOffsets.put(tp, offset - 1);
343 endlessConsumer.start();
351 endlessConsumer.stop();
355 log.debug("{}", e.toString());
360 testRecordProducer.close();
361 offsetConsumer.close();
365 log.info("Exception while stopping the consumer: {}", e.toString());
371 @Import(ApplicationConfiguration.class)
372 public static class Configuration
375 public RecordHandler recordHandler(RecordHandler applicationRecordHandler)
377 return new TestRecordHandler(applicationRecordHandler);