1 package de.juplo.kafka;
3 import lombok.extern.slf4j.Slf4j;
4 import org.apache.kafka.clients.consumer.KafkaConsumer;
5 import org.apache.kafka.clients.producer.KafkaProducer;
6 import org.apache.kafka.clients.producer.ProducerRecord;
7 import org.apache.kafka.common.TopicPartition;
8 import org.apache.kafka.common.serialization.*;
9 import org.apache.kafka.common.utils.Bytes;
10 import org.junit.jupiter.api.*;
11 import org.springframework.beans.factory.annotation.Autowired;
12 import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
13 import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
14 import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer;
15 import org.springframework.boot.test.context.TestConfiguration;
16 import org.springframework.context.annotation.Bean;
17 import org.springframework.context.annotation.Import;
18 import org.springframework.context.annotation.Primary;
19 import org.springframework.kafka.listener.adapter.ConsumerRecordMetadata;
20 import org.springframework.kafka.support.serializer.JsonSerializer;
21 import org.springframework.kafka.test.context.EmbeddedKafka;
22 import org.springframework.test.context.TestPropertySource;
23 import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
25 import java.time.Duration;
27 import java.util.concurrent.ExecutionException;
28 import java.util.function.BiConsumer;
29 import java.util.function.BiFunction;
30 import java.util.stream.Collectors;
31 import java.util.stream.IntStream;
33 import static de.juplo.kafka.ApplicationTests.PARTITIONS;
34 import static de.juplo.kafka.ApplicationTests.TOPIC;
35 import static org.assertj.core.api.Assertions.*;
36 import static org.awaitility.Awaitility.*;
40 initializers = ConfigDataApplicationContextInitializer.class,
42 EndlessConsumer.class,
43 KafkaAutoConfiguration.class,
44 ApplicationTests.Configuration.class })
47 "spring.kafka.consumer.bootstrap-servers=${spring.embedded.kafka.brokers}",
48 "spring.kafka.producer.bootstrap-servers=${spring.embedded.kafka.brokers}",
49 "consumer.topic=" + TOPIC })
50 @EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
52 class ApplicationTests
54 public static final String TOPIC = "FOO";
55 public static final int PARTITIONS = 10;
58 StringSerializer stringSerializer = new StringSerializer();
61 Serializer valueSerializer;
63 KafkaProducer<String, Bytes> kafkaProducer;
65 KafkaConsumer<Bytes, Bytes> offsetConsumer;
67 ApplicationProperties applicationProperties;
69 KafkaProperties kafkaProperties;
71 EndlessConsumer endlessConsumer;
73 ClientMessageHandler clientMessageHandler;
75 Map<TopicPartition, Long> oldOffsets;
76 Map<TopicPartition, Long> newOffsets;
77 Set<ClientMessage> received;
83 void commitsCurrentOffsetsOnSuccess() throws ExecutionException, InterruptedException
85 send100Messages((key, counter) -> serialize(key, counter));
87 await("100 records received")
88 .atMost(Duration.ofSeconds(30))
89 .until(() -> received.size() == 100);
91 await("Offsets committed")
92 .atMost(Duration.ofSeconds(10))
95 checkSeenOffsetsForProgress();
96 compareToCommitedOffsets(newOffsets);
99 assertThat(endlessConsumer.isRunning())
100 .describedAs("Consumer should still be running")
105 void commitsCurrentOffsetsOnDeserializationError()
107 send100Messages((key, counter) ->
109 ? new Bytes(stringSerializer.serialize(TOPIC, "BOOM!"))
110 : serialize(key, counter));
112 await("99 records received")
113 .atMost(Duration.ofSeconds(30))
114 .until(() -> received.size() == 99);
116 await("Offsets committed")
117 .atMost(Duration.ofSeconds(10))
121 // Funktioniert nur, weil nach der Nachrichten, die den
122 // Deserialisierungs-Fehler auslöst noch valide Nachrichten
125 // Der MessageHandler sieht den Offset der Fehlerhaften
127 checkSeenOffsetsForProgress();
128 compareToCommitedOffsets(newOffsets);
131 assertThat(endlessConsumer.isRunning())
132 .describedAs("Consumer should still be running")
137 void commitsOffsetOnProgramLogicErrorFoo()
139 clientMessageHandler.testHandler = (clientMessage, metadata) ->
141 if (Integer.parseInt(clientMessage.message)%10 ==0)
142 throw new RuntimeException("BOOM: " + clientMessage.message + "%10 == 0");
145 send100Messages((key, counter) -> serialize(key, counter));
147 await("80 records received")
148 .atMost(Duration.ofSeconds(30))
149 .until(() -> received.size() == 100);
151 await("Offsets committed")
152 .atMost(Duration.ofSeconds(10))
153 .pollDelay(Duration.ofSeconds(1))
156 checkSeenOffsetsForProgress();
157 compareToCommitedOffsets(newOffsets);
160 assertThat(endlessConsumer.isRunning())
161 .describedAs("Consumer should still be running")
166 /** Helper methods for the verification of expectations */
168 void compareToCommitedOffsets(Map<TopicPartition, Long> offsetsToCheck)
170 doForCurrentOffsets((tp, offset) ->
172 Long expected = offsetsToCheck.get(tp) + 1;
173 log.debug("TEST: Comparing the expected offset of {} for {} to {}", expected, tp, offset);
175 .describedAs("Committed offset corresponds to the offset of the consumer")
176 .isEqualTo(expected);
180 void checkSeenOffsetsForProgress()
182 // Be sure, that some messages were consumed...!
183 Set<TopicPartition> withProgress = new HashSet<>();
184 partitions().forEach(tp ->
186 Long oldOffset = oldOffsets.get(tp);
187 Long newOffset = newOffsets.get(tp);
188 if (!oldOffset.equals(newOffset))
190 log.debug("TEST: Progress for {}: {} -> {}", tp, oldOffset, newOffset);
191 withProgress.add(tp);
194 log.debug("TEST: Offsets with progress: {}", withProgress);
195 assertThat(withProgress)
196 .describedAs("Some offsets must have changed, compared to the old offset-positions")
201 /** Helper methods for setting up and running the tests */
203 void doForCurrentOffsets(BiConsumer<TopicPartition, Long> consumer)
205 offsetConsumer.assign(partitions());
206 partitions().forEach(tp -> consumer.accept(tp, offsetConsumer.position(tp)));
207 offsetConsumer.unsubscribe();
210 List<TopicPartition> partitions()
214 .range(0, PARTITIONS)
215 .mapToObj(partition -> new TopicPartition(TOPIC, partition))
216 .collect(Collectors.toList());
220 void send100Messages(BiFunction<Integer, Long, Bytes> messageGenerator)
224 for (int partition = 0; partition < 10; partition++)
226 for (int key = 0; key < 10; key++)
228 Bytes value = messageGenerator.apply(key, ++i);
230 ProducerRecord<String, Bytes> record =
231 new ProducerRecord<>(
234 Integer.toString(key%2),
237 record.headers().add("__TypeId__", "message".getBytes());
238 kafkaProducer.send(record, (metadata, e) ->
240 if (metadata != null)
243 "TEST: Sending partition={}, offset={} - {}={}",
244 metadata.partition(),
252 "TEST: Exception for {}={}: {}",
262 Bytes serialize(Integer key, Long value)
264 ClientMessage message = new ClientMessage();
265 message.setClient(key.toString());
266 message.setMessage(value.toString());
267 return new Bytes(valueSerializer.serialize(TOPIC, message));
274 clientMessageHandler.testHandler = (clientMessage, metadata) -> {};
276 oldOffsets = new HashMap<>();
277 newOffsets = new HashMap<>();
278 received = new HashSet<>();
280 doForCurrentOffsets((tp, offset) ->
282 oldOffsets.put(tp, offset - 1);
283 newOffsets.put(tp, offset - 1);
286 clientMessageHandler.captureOffsets =
287 (clientMessage, metadata) ->
289 received.add(clientMessage);
290 log.debug("TEST: Processing record #{}: {}", received.size(), clientMessage);
292 new TopicPartition(metadata.topic(), metadata.partition()), metadata.offset());
295 endlessConsumer.start();
303 endlessConsumer.stop();
307 log.info("TEST: Exception while stopping the consumer: {}", e.toString());
311 public static class ClientMessageHandler implements BiConsumer<ClientMessage, ConsumerRecordMetadata>
313 BiConsumer<ClientMessage, ConsumerRecordMetadata> captureOffsets;
314 BiConsumer<ClientMessage, ConsumerRecordMetadata> testHandler;
318 public void accept(ClientMessage clientMessage, ConsumerRecordMetadata metadata)
321 .andThen(testHandler)
322 .accept(clientMessage, metadata);
327 @Import(ApplicationConfiguration.class)
328 public static class Configuration
332 public BiConsumer<ClientMessage, ConsumerRecordMetadata> testHandler()
334 return new ClientMessageHandler();
338 Serializer<ClientMessage> serializer()
340 return new JsonSerializer<>();
344 KafkaProducer<String, Bytes> kafkaProducer(KafkaProperties properties)
346 Properties props = new Properties();
347 props.put("bootstrap.servers", properties.getConsumer().getBootstrapServers());
348 props.put("linger.ms", 100);
349 props.put("key.serializer", StringSerializer.class.getName());
350 props.put("value.serializer", BytesSerializer.class.getName());
352 return new KafkaProducer<>(props);
356 KafkaConsumer<Bytes, Bytes> offsetConsumer(KafkaProperties properties)
358 Properties props = new Properties();
359 props.put("bootstrap.servers", properties.getConsumer().getBootstrapServers());
360 props.put("client.id", "OFFSET-CONSUMER");
361 props.put("group.id", properties.getConsumer().getGroupId());
362 props.put("key.deserializer", BytesDeserializer.class.getName());
363 props.put("value.deserializer", BytesDeserializer.class.getName());
365 return new KafkaConsumer<>(props);