--- /dev/null
+package de.juplo.kafka;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.serialization.BytesSerializer;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.common.utils.Bytes;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Import;
+import org.springframework.kafka.test.context.EmbeddedKafka;
+import org.springframework.test.context.TestPropertySource;
+import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
+
+import java.util.*;
+import java.util.concurrent.ExecutorService;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static de.juplo.kafka.ApplicationTests.PARTITIONS;
+import static de.juplo.kafka.ApplicationTests.TOPIC;
+import static org.assertj.core.api.Assertions.assertThat;
+
+
+@SpringJUnitConfig(initializers = ConfigDataApplicationContextInitializer.class)
+@TestPropertySource(
+ properties = {
+ "consumer.bootstrap-server=${spring.embedded.kafka.brokers}",
+ "consumer.topic=" + TOPIC })
+@EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
+@Slf4j
+class ApplicationTests
+{
+ public static final String TOPIC = "FOO";
+ public static final int PARTITIONS = 10;
+
+
+ StringSerializer stringSerializer = new StringSerializer();
+ LongSerializer longSerializer = new LongSerializer();
+
+ @Autowired
+ KafkaProducer<String, Bytes> kafkaProducer;
+ @Autowired
+ KafkaConsumer<String, Long> kafkaConsumer;
+ @Autowired
+ ApplicationProperties properties;
+ @Autowired
+ ExecutorService executor;
+
+
+ @Test
+ void commitsCurrentOffsetsOnSuccess()
+ {
+ send100Messages(i -> new Bytes(longSerializer.serialize(TOPIC, i)));
+
+ Set<ConsumerRecord<String, Long>> received = new HashSet<>();
+ Map<Integer, Long> offsets = runEndlessConsumer(record ->
+ {
+ received.add(record);
+ if (received.size() == 100)
+ throw new WakeupException();
+ });
+
+ check(offsets);
+ }
+
+ @Test
+ void commitsNoOffsetsOnError()
+ {
+ send100Messages(counter ->
+ counter == 77
+ ? new Bytes(stringSerializer.serialize(TOPIC, "BOOM!"))
+ : new Bytes(longSerializer.serialize(TOPIC, counter)));
+
+ Map<Integer, Long> oldOffsets = new HashMap<>();
+ doForCurrentOffsets((tp, offset) -> oldOffsets.put(tp.partition(), offset -1));
+ Map<Integer, Long> newOffsets = runEndlessConsumer((record) -> {});
+
+ check(oldOffsets);
+ }
+
+
+ void send100Messages(Function<Long, Bytes> messageGenerator)
+ {
+ long i = 0;
+
+ for (int partition = 0; partition < 10; partition++)
+ {
+ for (int key = 0; key < 10; key++)
+ {
+ Bytes value = messageGenerator.apply(++i);
+
+ ProducerRecord<String, Bytes> record =
+ new ProducerRecord<>(
+ TOPIC,
+ partition,
+ Integer.toString(key%2),
+ value);
+
+ kafkaProducer.send(record, (metadata, e) ->
+ {
+ if (metadata != null)
+ {
+ log.debug(
+ "{}|{} - {}={}",
+ metadata.partition(),
+ metadata.offset(),
+ record.key(),
+ record.value());
+ }
+ else
+ {
+ log.warn(
+ "Exception for {}={}: {}",
+ record.key(),
+ record.value(),
+ e.toString());
+ }
+ });
+ }
+ }
+ }
+
+ Map<Integer, Long> runEndlessConsumer(Consumer<ConsumerRecord<String, Long>> consumer)
+ {
+ Map<Integer, Long> offsets = new HashMap<>();
+ doForCurrentOffsets((tp, offset) -> offsets.put(tp.partition(), offset -1));
+ Consumer<ConsumerRecord<String, Long>> captureOffset = record -> offsets.put(record.partition(), record.offset());
+ EndlessConsumer<String, Long> endlessConsumer =
+ new EndlessConsumer<>(
+ executor,
+ properties.getClientId(),
+ properties.getTopic(),
+ kafkaConsumer,
+ captureOffset.andThen(consumer));
+
+ endlessConsumer.run();
+
+ return offsets;
+ }
+
+ List<TopicPartition> partitions()
+ {
+ return
+ IntStream
+ .range(0, PARTITIONS)
+ .mapToObj(partition -> new TopicPartition(TOPIC, partition))
+ .collect(Collectors.toList());
+ }
+
+ void doForCurrentOffsets(BiConsumer<TopicPartition, Long> consumer)
+ {
+ kafkaConsumer.assign(partitions());
+ partitions().forEach(tp -> consumer.accept(tp, kafkaConsumer.position(tp)));
+ kafkaConsumer.unsubscribe();
+ }
+
+ void check(Map<Integer, Long> offsets)
+ {
+ doForCurrentOffsets((tp, offset) ->
+ {
+ Long expected = offsets.get(tp.partition()) + 1;
+ log.debug("Checking, if the offset for {} is {}", tp, expected);
+ assertThat(offset).isEqualTo(expected);
+ });
+ }
+
+
+ @TestConfiguration
+ @Import(ApplicationConfiguration.class)
+ public static class Configuration
+ {
+ @Bean
+ KafkaProducer<String, Bytes> kafkaProducer(ApplicationProperties properties)
+ {
+ Properties props = new Properties();
+ props.put("bootstrap.servers", properties.getBootstrapServer());
+ props.put("linger.ms", 100);
+ props.put("key.serializer", StringSerializer.class.getName());
+ props.put("value.serializer", BytesSerializer.class.getName());
+
+ return new KafkaProducer<>(props);
+ }
+ }
+}