--- /dev/null
+package de.juplo.kafka;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.RecordsToDelete;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Serializer;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.DisplayName;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
+import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.context.TestConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.test.context.EmbeddedKafka;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import static de.juplo.kafka.AbstractExampleConsumerTest.NUM_PARTITIONS;
+import static de.juplo.kafka.AbstractExampleConsumerTest.TOPIC;
+import static org.assertj.core.api.Assertions.assertThat;
+
+
+@SpringBootTest(
+  classes = {
+    KafkaAutoConfiguration.class,
+    ApplicationProperties.class,
+    AbstractExampleConsumerTest.ConsumerRunnableTestConfig.class,
+  },
+  properties = {
+    "spring.kafka.consumer.auto-offset-reset=earliest",
+    "spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer",
+    "logging.level.de.juplo.kafka=TRACE",
+  })
+@EmbeddedKafka(topics = TOPIC, partitions = NUM_PARTITIONS)
+public abstract class AbstractExampleConsumerTest<V>
+{
+  @DisplayName("All messages are consumed as expected")
+  @Test
+  void testOnlyValidMessages()
+  {
+    sendValidMessage(0);
+    sendValidMessage(1);
+    sendValidMessage(2);
+    sendValidMessage(3);
+    sendValidMessage(4);
+    sendValidMessage(5);
+    sendValidMessage(6);
+    sendValidMessage(7);
+    sendValidMessage(8);
+    sendValidMessage(9);
+    sendValidMessage(0);
+    sendValidMessage(1);
+    sendValidMessage(2);
+    sendValidMessage(3);
+    sendValidMessage(4);
+    sendValidMessage(5);
+    sendValidMessage(6);
+    sendValidMessage(7);
+    sendValidMessage(8);
+    sendValidMessage(9);
+
+    Awaitility
+      .await("All messages are consumed")
+      .atMost(Duration.ofSeconds(5))
+      .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(20));
+  }
+
+  @DisplayName("A deserialization exception is skipped and all valid messages are consumed")
+  @Test
+  void testDeserializationException()
+  {
+    sendValidMessage(0);
+    sendValidMessage(1);
+    sendValidMessage(2);
+    sendNonDeserializableMessage(3);
+    sendValidMessage(4);
+    sendValidMessage(5);
+    sendValidMessage(6);
+    sendValidMessage(7);
+    sendValidMessage(8);
+    sendValidMessage(9);
+    sendValidMessage(0);
+    sendValidMessage(1);
+    sendValidMessage(2);
+    sendValidMessage(3);
+    sendValidMessage(4);
+    sendValidMessage(5);
+    sendValidMessage(6);
+    sendValidMessage(7);
+    sendValidMessage(8);
+    sendValidMessage(9);
+
+    Awaitility
+      .await("All valid messages are consumed")
+      .atMost(Duration.ofSeconds(15))
+      .untilAsserted(() -> assertThat(mockRecordHandler.getNumMessagesHandled()).isEqualTo(19));
+  }
+
+  @DisplayName("A message, that triggers an unexpected exception in the domain-logic, exits the application")
+  @Test
+  void testUnexpectedDomainError() throws Exception
+  {
+    sendValidMessage(0);
+    sendValidMessage(1);
+    sendValidMessage(2);
+    sendValidMessage(3);
+    sendValidMessage(4);
+    sendValidMessage(5);
+    sendValidMessage(6);
+    sendValidMessage(7);
+    sendValidMessage(8);
+    sendValidMessage(9);
+    sendValidMessage(0);
+    sendValidMessage(1);
+    sendValidMessage(2);
+    sendMessageThatTriggersRuntimeExceptionInDomain(3);
+    sendValidMessage(4);
+    sendValidMessage(5);
+    sendValidMessage(6);
+    sendValidMessage(7);
+    sendValidMessage(8);
+    sendValidMessage(9);
+
+    Awaitility
+      .await("The ConsumerRunnable is exited by an unexpected exception")
+      .atMost(Duration.ofSeconds(5))
+      .pollInterval(Duration.ofMillis(250))
+      .untilAsserted(() -> assertThat(isTerminatedExceptionally.get()).isTrue());
+  }
+
+
+  static final String ID = "TEST";
+  static final String TOPIC = "ExampleConsumerTest_TEST";
+  static final int NUM_PARTITIONS = 10;
+
+  @Autowired
+  KafkaTemplate<String, byte[]> kafkaTemplate;
+
+  final Serializer<V> serializer = createSerializer();
+  final long[] currentOffsets = new long[NUM_PARTITIONS];
+
+  long nextMessage = 1;
+
+  final AbstractMockRecordHandler mockRecordHandler = createMockRecordHandler();
+  final AtomicBoolean isTerminatedExceptionally = new AtomicBoolean();
+
+  ExampleConsumer exampleConsumer;
+
+
+  abstract Serializer<V> createSerializer();
+  abstract AbstractMockRecordHandler<V> createMockRecordHandler();
+  abstract Consumer<?, ?> createConsumer(KafkaProperties properties);
+  abstract V createValidMessage();
+  abstract V createMessageThatTriggersRuntimeException();
+
+
+  @BeforeEach
+  void createExampleConsumer(@Autowired KafkaProperties properties)
+  {
+    exampleConsumer = new ExampleConsumer(
+      ID,
+      TOPIC,
+      createConsumer(properties),
+      mockRecordHandler,
+      () -> isTerminatedExceptionally.set(true));
+  }
+
+  @AfterEach
+  void resetSetup(@Autowired AdminClient adminClient) throws InterruptedException, ExecutionException
+  {
+    exampleConsumer.shutdown();
+    adminClient
+      .deleteRecords(recordsToDelete())
+      .all()
+      .get();
+    mockRecordHandler.clear();
+    nextMessage = 1;
+    isTerminatedExceptionally.set(false);
+  }
+
+  private Map<TopicPartition, RecordsToDelete> recordsToDelete()
+  {
+    return IntStream
+      .range(0, NUM_PARTITIONS)
+      .filter(i -> currentOffsets[i] > 0)
+      .mapToObj(i -> Integer.valueOf(i))
+      .collect(Collectors.toMap(
+        i -> new TopicPartition(TOPIC, i),
+        i -> recordsToDelete(i)));
+  }
+
+  private RecordsToDelete recordsToDelete(int partition)
+  {
+    return RecordsToDelete.beforeOffset(currentOffsets[partition] + 1);
+  }
+
+  private void sendValidMessage(int partition)
+  {
+    send(partition, createValidMessage());
+  }
+
+  private void sendNonDeserializableMessage(int partition)
+  {
+    send(partition, "BOOM!".getBytes());
+  }
+
+  private void sendMessageThatTriggersRuntimeExceptionInDomain(int partition)
+  {
+    send(partition, createMessageThatTriggersRuntimeException());
+  }
+
+  private void send(int partition, V message)
+  {
+    send(partition, serializer.serialize(TOPIC, message));
+  }
+
+  private void send(int partition, byte[] bytes)
+  {
+    nextMessage++;
+    kafkaTemplate
+      .send(TOPIC, partition, "EGAL", bytes)
+      .thenAccept(result ->
+      {
+        RecordMetadata metadata = result.getRecordMetadata();
+        currentOffsets[metadata.partition()] = metadata.offset();
+      });
+  }
+
+
+
+  @TestConfiguration
+  static class ConsumerRunnableTestConfig
+  {
+    @Bean
+    AdminClient adminClient(@Value("${spring.embedded.kafka.brokers}") String kafkaBroker)
+    {
+      Map<String, Object> properties = new HashMap<>();
+      properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBroker);
+      return AdminClient.create(properties);
+    }
+  }
+}