--- /dev/null
+package de.juplo.kafka;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.RecordsToDelete;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Serializer;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
+import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.test.context.EmbeddedKafka;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static de.juplo.kafka.AbstractExampleConsumerTest.NUM_PARTITIONS;
+import static de.juplo.kafka.AbstractExampleConsumerTest.TOPIC;
+import static org.assertj.core.api.Assertions.assertThat;
+
+
+@SpringBootTest(
+ classes = {
+ KafkaAutoConfiguration.class,
+ ApplicationProperties.class,
+ AbstractExampleConsumerTest.ConsumerRunnableTestConfig.class,
+ },
+ properties = {
+ "spring.kafka.consumer.auto-offset-reset=earliest",
+ "spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer",
+ "logging.level.de.juplo.kafka=TRACE",
+ })
+@EmbeddedKafka(topics = TOPIC, partitions = NUM_PARTITIONS)
+public abstract class AbstractExampleConsumerTest<V>
+{
+ @DisplayName("All messages are consumed as expected")
+ @Test
+ void testOnlyValidMessages()
+ {
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendValidMessage(3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendValidMessage(3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+
+ Awaitility
+ .await("All messages are consumed")
+ .atMost(Duration.ofSeconds(5))
+ .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(20));
+ }
+
+ @DisplayName("A deserialization exception is skipped and all valid messages are consumed")
+ @Test
+ void testDeserializationException()
+ {
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendNonDeserializableMessage(3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendValidMessage(3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+
+ Awaitility
+ .await("All valid messages are consumed")
+ .atMost(Duration.ofSeconds(15))
+ .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(19));
+ }
+
+ @DisplayName("A message, that triggers an unexpected exception in the domain-logic, exits the application")
+ @Test
+ void testUnexpectedDomainError() throws Exception
+ {
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendValidMessage(3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+ sendValidMessage(0);
+ sendValidMessage(1);
+ sendValidMessage(2);
+ sendMessageThatTriggersRuntimeExceptionInDomain(3);
+ sendValidMessage(4);
+ sendValidMessage(5);
+ sendValidMessage(6);
+ sendValidMessage(7);
+ sendValidMessage(8);
+ sendValidMessage(9);
+
+ Awaitility
+ .await("The ConsumerRunnable is exited by an unexpected exception")
+ .atMost(Duration.ofSeconds(5))
+ .pollInterval(Duration.ofMillis(250))
+ .untilAsserted(() -> assertThat(isTerminatedExceptionally.get()).isTrue());
+ }
+
+
+ static final String ID = "TEST";
+ static final String TOPIC = "ExampleConsumerTest_TEST";
+ static final int NUM_PARTITIONS = 10;
+
+ @Autowired
+ KafkaTemplate<String, byte[]> kafkaTemplate;
+
+ final Serializer<V> serializer = createSerializer();
+ final long[] currentOffsets = new long[NUM_PARTITIONS];
+
+ long nextMessage = 1;
+
+ final AbstractMockRecordHandler mockRecordHandler = createMockRecordHandler();
+ final AtomicBoolean isTerminatedExceptionally = new AtomicBoolean();
+
+ ExampleConsumer exampleConsumer;
+
+
+ abstract Serializer<V> createSerializer();
+ abstract AbstractMockRecordHandler<V> createMockRecordHandler();
+ abstract Consumer<?, ?> createConsumer(KafkaProperties properties);
+ abstract V createValidMessage();
+ abstract V createMessageThatTriggersRuntimeException();
+
+
+ @BeforeEach
+ void createExampleConsumer(@Autowired KafkaProperties properties)
+ {
+ exampleConsumer = new ExampleConsumer(
+ ID,
+ TOPIC,
+ createConsumer(properties),
+ mockRecordHandler,
+ () -> isTerminatedExceptionally.set(true));
+ }
+
+ @AfterEach
+ void resetSetup(@Autowired AdminClient adminClient) throws InterruptedException, ExecutionException
+ {
+ exampleConsumer.shutdown();
+ adminClient
+ .deleteRecords(recordsToDelete())
+ .all()
+ .get();
+ mockRecordHandler.clear();
+ nextMessage = 1;
+ isTerminatedExceptionally.set(false);
+ }
+
+ private Map<TopicPartition, RecordsToDelete> recordsToDelete()
+ {
+ return IntStream
+ .range(0, NUM_PARTITIONS)
+ .filter(i -> currentOffsets[i] > 0)
+ .mapToObj(i -> Integer.valueOf(i))
+ .collect(Collectors.toMap(
+ i -> new TopicPartition(TOPIC, i),
+ i -> recordsToDelete(i)));
+ }
+
+ private RecordsToDelete recordsToDelete(int partition)
+ {
+ return RecordsToDelete.beforeOffset(currentOffsets[partition] + 1);
+ }
+
+ private void sendValidMessage(int partition)
+ {
+ send(partition, createValidMessage());
+ }
+
+ private void sendNonDeserializableMessage(int partition)
+ {
+ send(partition, "BOOM!".getBytes());
+ }
+
+ private void sendMessageThatTriggersRuntimeExceptionInDomain(int partition)
+ {
+ send(partition, createMessageThatTriggersRuntimeException());
+ }
+
+ private void send(int partition, V message)
+ {
+ send(partition, serializer.serialize(TOPIC, message));
+ }
+
+ private void send(int partition, byte[] bytes)
+ {
+ nextMessage++;
+ kafkaTemplate
+ .send(TOPIC, partition, "EGAL", bytes)
+ .thenAccept(result ->
+ {
+ RecordMetadata metadata = result.getRecordMetadata();
+ currentOffsets[metadata.partition()] = metadata.offset();
+ });
+ }
+
+
+
+ @TestConfiguration
+ static class ConsumerRunnableTestConfig
+ {
+ @Bean
+ AdminClient adminClient(@Value("${spring.embedded.kafka.brokers}") String kafkaBroker)
+ {
+ Map<String, Object> properties = new HashMap<>();
+ properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBroker);
+ return AdminClient.create(properties);
+ }
+ }
+}