1 package de.juplo.kafka;
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.consumer.ConsumerRecord;
5 import org.apache.kafka.clients.consumer.KafkaConsumer;
6 import org.apache.kafka.clients.producer.KafkaProducer;
7 import org.apache.kafka.clients.producer.ProducerRecord;
8 import org.apache.kafka.common.TopicPartition;
9 import org.apache.kafka.common.errors.RecordDeserializationException;
10 import org.apache.kafka.common.serialization.*;
11 import org.apache.kafka.common.utils.Bytes;
12 import org.junit.jupiter.api.*;
13 import org.springframework.beans.factory.annotation.Autowired;
14 import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
15 import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo;
16 import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer;
17 import org.springframework.boot.test.context.TestConfiguration;
18 import org.springframework.context.annotation.Bean;
19 import org.springframework.context.annotation.Import;
20 import org.springframework.kafka.test.context.EmbeddedKafka;
21 import org.springframework.test.context.TestPropertySource;
22 import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
24 import java.time.Duration;
26 import java.util.concurrent.ExecutionException;
27 import java.util.concurrent.ExecutorService;
28 import java.util.function.BiConsumer;
29 import java.util.stream.Collectors;
30 import java.util.stream.IntStream;
32 import static de.juplo.kafka.ApplicationTests.PARTITIONS;
33 import static de.juplo.kafka.ApplicationTests.TOPIC;
34 import static org.assertj.core.api.Assertions.*;
35 import static org.awaitility.Awaitility.*;
38 @SpringJUnitConfig(initializers = ConfigDataApplicationContextInitializer.class)
39 @TestMethodOrder(MethodOrderer.OrderAnnotation.class)
42 "consumer.bootstrap-server=${spring.embedded.kafka.brokers}",
43 "consumer.topic=" + TOPIC,
44 "consumer.commit-interval=1s",
45 "spring.mongodb.embedded.version=4.4.13" })
46 @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
47 @EnableAutoConfiguration
48 @AutoConfigureDataMongo
50 class ApplicationTests
52 public static final String TOPIC = "FOO";
53 public static final int PARTITIONS = 10;
56 StringSerializer stringSerializer = new StringSerializer();
59 Serializer valueSerializer;
61 KafkaProducer<String, Bytes> kafkaProducer;
63 KafkaConsumer<String, Long> kafkaConsumer;
65 KafkaConsumer<Bytes, Bytes> offsetConsumer;
67 PartitionStatisticsRepository partitionStatisticsRepository;
69 ApplicationProperties properties;
71 ExecutorService executor;
73 PartitionStatisticsRepository repository;
75 KeyCountingRebalanceListener keyCountingRebalanceListener;
77 KeyCountingRecordHandler keyCountingRecordHandler;
79 EndlessConsumer<String, Long> endlessConsumer;
80 Map<TopicPartition, Long> oldOffsets;
81 Map<TopicPartition, Long> newOffsets;
82 Set<ConsumerRecord<String, Long>> receivedRecords;
88 void commitsCurrentOffsetsOnSuccess() throws ExecutionException, InterruptedException
90 send100Messages((partition, key, counter) ->
92 Bytes value = new Bytes(valueSerializer.serialize(TOPIC, counter));
93 return new ProducerRecord<>(TOPIC, partition, key, value);
96 await("100 records received")
97 .atMost(Duration.ofSeconds(30))
98 .pollInterval(Duration.ofSeconds(1))
99 .until(() -> receivedRecords.size() >= 100);
101 await("Offsets committed")
102 .atMost(Duration.ofSeconds(10))
103 .pollInterval(Duration.ofSeconds(1))
106 checkSeenOffsetsForProgress();
107 compareToCommitedOffsets(newOffsets);
110 assertThatExceptionOfType(IllegalStateException.class)
111 .isThrownBy(() -> endlessConsumer.exitStatus())
112 .describedAs("Consumer should still be running");
116 void commitsOffsetOfErrorForReprocessingOnDeserializationError()
118 send100Messages((partition, key, counter) ->
120 Bytes value = counter == 77
121 ? new Bytes(stringSerializer.serialize(TOPIC, "BOOM!"))
122 : new Bytes(valueSerializer.serialize(TOPIC, counter));
123 return new ProducerRecord<>(TOPIC, partition, key, value);
126 await("Consumer failed")
127 .atMost(Duration.ofSeconds(30))
128 .pollInterval(Duration.ofSeconds(1))
129 .until(() -> !endlessConsumer.running());
131 checkSeenOffsetsForProgress();
132 compareToCommitedOffsets(newOffsets);
134 endlessConsumer.start();
135 await("Consumer failed")
136 .atMost(Duration.ofSeconds(30))
137 .pollInterval(Duration.ofSeconds(1))
138 .until(() -> !endlessConsumer.running());
140 checkSeenOffsetsForProgress();
141 compareToCommitedOffsets(newOffsets);
142 assertThat(receivedRecords.size())
143 .describedAs("Received not all sent events")
146 assertThatNoException()
147 .describedAs("Consumer should not be running")
148 .isThrownBy(() -> endlessConsumer.exitStatus());
149 assertThat(endlessConsumer.exitStatus())
150 .describedAs("Consumer should have exited abnormally")
151 .containsInstanceOf(RecordDeserializationException.class);
155 /** Helper methods for the verification of expectations */
157 void compareToCommitedOffsets(Map<TopicPartition, Long> offsetsToCheck)
159 doForCurrentOffsets((tp, offset) ->
161 Long expected = offsetsToCheck.get(tp) + 1;
162 log.debug("Checking, if the offset for {} is {}", tp, expected);
164 .describedAs("Committed offset corresponds to the offset of the consumer")
165 .isEqualTo(expected);
169 void checkSeenOffsetsForProgress()
171 // Be sure, that some messages were consumed...!
172 Set<TopicPartition> withProgress = new HashSet<>();
173 partitions().forEach(tp ->
175 Long oldOffset = oldOffsets.get(tp) + 1;
176 Long newOffset = newOffsets.get(tp) + 1;
177 if (!oldOffset.equals(newOffset))
179 log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset);
180 withProgress.add(tp);
183 assertThat(withProgress)
184 .describedAs("Some offsets must have changed, compared to the old offset-positions")
189 /** Helper methods for setting up and running the tests */
193 offsetConsumer.assign(partitions());
194 partitions().forEach(tp ->
196 Long offset = offsetConsumer.position(tp);
197 log.info("New position for {}: {}", tp, offset);
198 Integer partition = tp.partition();
199 StatisticsDocument document =
200 partitionStatisticsRepository
201 .findById(partition.toString())
202 .orElse(new StatisticsDocument(partition));
203 document.offset = offset;
204 partitionStatisticsRepository.save(document);
206 offsetConsumer.unsubscribe();
209 void doForCurrentOffsets(BiConsumer<TopicPartition, Long> consumer)
211 partitions().forEach(tp ->
213 String partition = Integer.toString(tp.partition());
214 Optional<Long> offset = partitionStatisticsRepository.findById(partition).map(document -> document.offset);
215 consumer.accept(tp, offset.orElse(0l));
219 List<TopicPartition> partitions()
223 .range(0, PARTITIONS)
224 .mapToObj(partition -> new TopicPartition(TOPIC, partition))
225 .collect(Collectors.toList());
229 public interface RecordGenerator<K, V>
231 public ProducerRecord<String, Bytes> generate(int partition, String key, long counter);
234 void send100Messages(RecordGenerator recordGenerator)
238 for (int partition = 0; partition < 10; partition++)
240 for (int key = 0; key < 10; key++)
242 ProducerRecord<String, Bytes> record =
243 recordGenerator.generate(partition, Integer.toString(partition*10+key%2), ++i);
245 kafkaProducer.send(record, (metadata, e) ->
247 if (metadata != null)
251 metadata.partition(),
259 "Exception for {}={}: {}",
275 oldOffsets = new HashMap<>();
276 newOffsets = new HashMap<>();
277 receivedRecords = new HashSet<>();
279 doForCurrentOffsets((tp, offset) ->
281 oldOffsets.put(tp, offset - 1);
282 newOffsets.put(tp, offset - 1);
285 TestRecordHandler<String, Long> captureOffsetAndExecuteTestHandler =
286 new TestRecordHandler<String, Long>(keyCountingRecordHandler) {
288 public void onNewRecord(ConsumerRecord<String, Long> record)
291 new TopicPartition(record.topic(), record.partition()),
293 receivedRecords.add(record);
298 new EndlessConsumer<>(
300 properties.getClientId(),
301 properties.getTopic(),
303 keyCountingRebalanceListener,
304 captureOffsetAndExecuteTestHandler);
306 endlessConsumer.start();
314 endlessConsumer.stop();
318 log.info("Exception while stopping the consumer: {}", e.toString());
324 @Import(ApplicationConfiguration.class)
325 public static class Configuration
328 Serializer<Long> serializer()
330 return new LongSerializer();
334 KafkaProducer<String, Bytes> kafkaProducer(ApplicationProperties properties)
336 Properties props = new Properties();
337 props.put("bootstrap.servers", properties.getBootstrapServer());
338 props.put("linger.ms", 100);
339 props.put("key.serializer", StringSerializer.class.getName());
340 props.put("value.serializer", BytesSerializer.class.getName());
342 return new KafkaProducer<>(props);
346 KafkaConsumer<Bytes, Bytes> offsetConsumer(ApplicationProperties properties)
348 Properties props = new Properties();
349 props.put("bootstrap.servers", properties.getBootstrapServer());
350 props.put("client.id", "OFFSET-CONSUMER");
351 props.put("enable.auto.commit", false);
352 props.put("auto.offset.reset", "latest");
353 props.put("key.deserializer", BytesDeserializer.class.getName());
354 props.put("value.deserializer", BytesDeserializer.class.getName());
356 return new KafkaConsumer<>(props);