From eba11d4859d1e2b936bcd9e8075986b5179b32ea Mon Sep 17 00:00:00 2001
From: Kai Moritz <kai@juplo.de>
Date: Sun, 18 Sep 2022 06:31:25 +0200
Subject: [PATCH] =?utf8?q?Addder=20beendet=20sich=20bei=20Fehler=20und=20L?=
 =?utf8?q?ogik=20f=C3=BCr=20Beenden=20vereinfacht?=
MIME-Version: 1.0
Content-Type: text/plain; charset=utf8
Content-Transfer-Encoding: 8bit

* Der Adder beendet sich bei einer Exception vollständig.
* _Hintergrund:_ Zustand ist in der Übung leichter erkennbar.
* Die Logik, ob beim Beenden ein Commit durchgeführt wurde, wurde so weit
  wie möglich vereinfacht.
* Da der Test aus zeitmangel nicht angepasst werden konnte, wurde er
  vorerst entfernt.
---
 docker-compose.yml                            |   7 +-
 .../juplo/kafka/ApplicationConfiguration.java |   3 +
 .../java/de/juplo/kafka/DriverController.java |  15 -
 .../java/de/juplo/kafka/EndlessConsumer.java  | 131 +-----
 .../java/de/juplo/kafka/ApplicationTests.java | 172 --------
 .../ErrorCannotBeGeneratedCondition.java      |  60 ---
 .../juplo/kafka/GenericApplicationTests.java  | 405 ------------------
 .../kafka/SkipWhenErrorCannotBeGenerated.java |  15 -
 .../de/juplo/kafka/TestRecordHandler.java     |  22 -
 9 files changed, 30 insertions(+), 800 deletions(-)
 delete mode 100644 src/test/java/de/juplo/kafka/ApplicationTests.java
 delete mode 100644 src/test/java/de/juplo/kafka/ErrorCannotBeGeneratedCondition.java
 delete mode 100644 src/test/java/de/juplo/kafka/GenericApplicationTests.java
 delete mode 100644 src/test/java/de/juplo/kafka/SkipWhenErrorCannotBeGenerated.java
 delete mode 100644 src/test/java/de/juplo/kafka/TestRecordHandler.java

diff --git a/docker-compose.yml b/docker-compose.yml
index 5d33cd1a..3f7188c4 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -115,13 +115,14 @@ services:
       sumup.requests.client-id: requests-1
 
   requests-2:
-    image: juplo/sumup-requests-json:1.0-SNAPSHOT
+    image: juplo/sumup-requests-fehlerteufel:1.0-SNAPSHOT
     ports:
       - 8082:8080
     environment:
       server.port: 8080
       sumup.requests.bootstrap-server: kafka:9092
       sumup.requests.client-id: requests-2
+      sumup.requests.error-position: 6
 
   adder-1:
     image: juplo/sumup-adder-json:1.0-SNAPSHOT
@@ -135,7 +136,7 @@ services:
       sumup.adder.throttle: 3ms
       spring.data.mongodb.uri: mongodb://juplo:training@mongo:27017
       spring.data.mongodb.database: juplo
-      logging.level.org.apache.kafka.clients.consumer: DEBUG
+      logging.level.org.apache.kafka.clients.consumer: INFO
 
   adder-2:
     image: juplo/sumup-adder-json:1.0-SNAPSHOT
@@ -149,7 +150,7 @@ services:
       sumup.adder.throttle: 3ms
       spring.data.mongodb.uri: mongodb://juplo:training@mongo:27017
       spring.data.mongodb.database: juplo
-      logging.level.org.apache.kafka.clients.consumer: DEBUG
+      logging.level.org.apache.kafka.clients.consumer: INFO
 
   peter:
     image: juplo/toolbox
diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java
index 61374113..93db3b54 100644
--- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java
+++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java
@@ -3,6 +3,7 @@ package de.juplo.kafka;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.ConfigurableApplicationContext;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.kafka.support.serializer.JsonDeserializer;
@@ -53,6 +54,7 @@ public class ApplicationConfiguration
   public EndlessConsumer<String, Message> endlessConsumer(
       KafkaConsumer<String, Message> kafkaConsumer,
       ExecutorService executor,
+      ConfigurableApplicationContext applicationContext,
       ApplicationRebalanceListener rebalanceListener,
       ApplicationRecordHandler recordHandler,
       ApplicationProperties properties)
@@ -60,6 +62,7 @@ public class ApplicationConfiguration
     return
         new EndlessConsumer<>(
             executor,
+            applicationContext,
             properties.getClientId(),
             properties.getTopic(),
             kafkaConsumer,
diff --git a/src/main/java/de/juplo/kafka/DriverController.java b/src/main/java/de/juplo/kafka/DriverController.java
index 26a5bc86..cc3abeda 100644
--- a/src/main/java/de/juplo/kafka/DriverController.java
+++ b/src/main/java/de/juplo/kafka/DriverController.java
@@ -8,7 +8,6 @@ import org.springframework.web.bind.annotation.*;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 
 
@@ -16,24 +15,10 @@ import java.util.stream.Collectors;
 @RequiredArgsConstructor
 public class DriverController
 {
-  private final EndlessConsumer consumer;
   private final ApplicationRecordHandler recordHandler;
   private final AdderResults results;
 
 
-  @PostMapping("start")
-  public void start()
-  {
-    consumer.start();
-  }
-
-  @PostMapping("stop")
-  public void stop() throws ExecutionException, InterruptedException
-  {
-    consumer.stop();
-  }
-
-
   @GetMapping("state")
   public Map<Integer, Map<String, AdderResult>> state()
   {
diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java
index 00678c45..9ea944b9 100644
--- a/src/main/java/de/juplo/kafka/EndlessConsumer.java
+++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java
@@ -6,15 +6,11 @@ import org.apache.kafka.clients.consumer.*;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.RecordDeserializationException;
 import org.apache.kafka.common.errors.WakeupException;
+import org.springframework.context.ConfigurableApplicationContext;
 
-import javax.annotation.PreDestroy;
 import java.time.Duration;
 import java.util.*;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 
 
 @Slf4j
@@ -22,16 +18,15 @@ import java.util.concurrent.locks.ReentrantLock;
 public class EndlessConsumer<K, V> implements Runnable
 {
   private final ExecutorService executor;
+  private final ConfigurableApplicationContext applicationContext;
   private final String id;
   private final String topic;
   private final Consumer<K, V> consumer;
   private final ConsumerRebalanceListener rebalanceListener;
   private final RecordHandler<K, V> recordHandler;
 
-  private final Lock lock = new ReentrantLock();
-  private final Condition condition = lock.newCondition();
   private boolean running = false;
-  private Exception exception;
+  private Exception exception = null;
   private long consumed = 0;
 
 
@@ -72,8 +67,6 @@ public class EndlessConsumer<K, V> implements Runnable
     catch(WakeupException e)
     {
       log.info("{} - RIIING! Request to stop consumption - commiting current offsets!", id);
-      consumer.commitSync();
-      shutdown();
     }
     catch(RecordDeserializationException e)
     {
@@ -85,128 +78,50 @@ public class EndlessConsumer<K, V> implements Runnable
           tp,
           offset,
           e.getCause().toString());
-
-      consumer.commitSync();
-      shutdown(e);
+      this.exception = e;
     }
     catch(Exception e)
     {
       log.error("{} - Unexpected error: {}", id, e.toString(), e);
-      shutdown(e);
+      this.exception = e;
+      log.info("{} - Unsubscribing...", id);
+      consumer.unsubscribe();
     }
     finally
     {
+      running = false;
+      log.info("{} - Closing the consumer...", id);
+      consumer.close();
+      log.info("{} - Shutting down the app...", id);
+      applicationContext.close();
       log.info("{} - Consumer-Thread exiting", id);
     }
   }
 
-  private void shutdown()
-  {
-    shutdown(null);
-  }
-
-  private void shutdown(Exception e)
-  {
-    lock.lock();
-    try
-    {
-      try
-      {
-        log.info("{} - Unsubscribing from topic {}", id, topic);
-        consumer.unsubscribe();
-      }
-      catch (Exception ue)
-      {
-        log.error(
-            "{} - Error while unsubscribing from topic {}: {}",
-            id,
-            topic,
-            ue.toString());
-      }
-      finally
-      {
-        running = false;
-        exception = e;
-        condition.signal();
-      }
-    }
-    finally
-    {
-      lock.unlock();
-    }
-  }
-
   public void start()
   {
-    lock.lock();
-    try
-    {
-      if (running)
-        throw new IllegalStateException("Consumer instance " + id + " is already running!");
-
-      log.info("{} - Starting - consumed {} messages before", id, consumed);
-      running = true;
-      exception = null;
-      executor.submit(this);
-    }
-    finally
-    {
-      lock.unlock();
-    }
-  }
-
-  public synchronized void stop() throws InterruptedException
-  {
-    lock.lock();
-    try
-    {
-      if (!running)
-        throw new IllegalStateException("Consumer instance " + id + " is not running!");
+    if (running)
+      throw new IllegalStateException("Consumer instance " + id + " is already running!");
 
-      log.info("{} - Stopping", id);
-      consumer.wakeup();
-      condition.await();
-      log.info("{} - Stopped - consumed {} messages so far", id, consumed);
-    }
-    finally
-    {
-      lock.unlock();
-    }
+    log.info("{} - Starting - consumed {} messages before", id, consumed);
+    running = true;
+    executor.submit(this);
   }
 
-  @PreDestroy
-  public void destroy() throws ExecutionException, InterruptedException
+  public void stop()
   {
-    log.info("{} - Destroy!", id);
-    log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
+    consumer.wakeup();
   }
-
   public boolean running()
   {
-    lock.lock();
-    try
-    {
-      return running;
-    }
-    finally
-    {
-      lock.unlock();
-    }
+    return running;
   }
 
   public Optional<Exception> exitStatus()
   {
-    lock.lock();
-    try
-    {
-      if (running)
-        throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!");
+    if (running)
+      throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!");
 
-      return Optional.ofNullable(exception);
-    }
-    finally
-    {
-      lock.unlock();
-    }
+    return Optional.ofNullable(exception);
   }
 }
diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java
deleted file mode 100644
index bd9f449e..00000000
--- a/src/test/java/de/juplo/kafka/ApplicationTests.java
+++ /dev/null
@@ -1,172 +0,0 @@
-package de.juplo.kafka;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.utils.Bytes;
-import org.springframework.beans.factory.annotation.Autowired;
-
-import java.util.*;
-import java.util.function.Consumer;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-
-@Slf4j
-public class ApplicationTests extends GenericApplicationTests<String, Message>
-{
-  @Autowired
-  StateRepository stateRepository;
-
-
-  public ApplicationTests()
-  {
-    super(new ApplicationTestRecrodGenerator());
-    ((ApplicationTestRecrodGenerator)recordGenerator).tests = this;
-  }
-
-
-  static class ApplicationTestRecrodGenerator implements RecordGenerator
-  {
-    ApplicationTests tests;
-
-    final int[] numbers = {1, 77, 33, 2, 66, 666, 11};
-    final String[] dieWilden13 =
-        IntStream
-            .range(1, 14)
-            .mapToObj(i -> "seeräuber-" + i)
-            .toArray(i -> new String[i]);
-    final StringSerializer stringSerializer = new StringSerializer();
-    final Bytes calculateMessage = new Bytes(stringSerializer.serialize(TOPIC, "{}"));
-
-    int counter = 0;
-
-    Map<String, List<AdderResult>> state;
-
-    @Override
-    public int generate(
-        boolean poisonPills,
-        boolean logicErrors,
-        Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
-    {
-      counter = 0;
-      state =
-          Arrays
-              .stream(dieWilden13)
-              .collect(Collectors.toMap(
-                  seeräuber -> seeräuber,
-                  seeräuber -> new LinkedList()));
-
-      int number[] = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 };
-      int message[] = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 };
-      int next = 0;
-
-      for (int pass = 0; pass < 333; pass++)
-      {
-        for (int i = 0; i<13; i++)
-        {
-          String seeräuber = dieWilden13[i];
-          Bytes key = new Bytes(stringSerializer.serialize(TOPIC, seeräuber));
-
-          if (message[i] > number[i])
-          {
-            send(
-              key,
-              calculateMessage,
-              Message.Type.CALC,
-              poisonPill(poisonPills, pass, counter),
-              logicError(logicErrors, pass, counter),
-              messageSender);
-            state.get(seeräuber).add(new AdderResult(number[i], (number[i] + 1) * number[i] / 2));
-            // Pick next number to calculate
-            number[i] = numbers[next++%numbers.length];
-            message[i] = 1;
-            log.debug("Seeräuber {} will die Summe für {} berechnen", seeräuber, number[i]);
-          }
-
-          send(
-            key,
-            new Bytes(stringSerializer.serialize(TOPIC, "{\"next\":" + message[i]++ + "}")),
-            Message.Type.ADD,
-            poisonPill(poisonPills, pass, counter),
-            logicError(logicErrors, pass, counter),
-            messageSender);
-        }
-      }
-
-      return counter;
-    }
-
-    boolean poisonPill (boolean poisonPills, int pass, int counter)
-    {
-      return poisonPills && pass > 300 && counter%99 == 0;
-    }
-
-    boolean logicError(boolean logicErrors, int pass, int counter)
-    {
-      return logicErrors && pass > 300 && counter%77 == 0;
-    }
-
-    void send(
-        Bytes key,
-        Bytes value,
-        Message.Type type,
-        boolean poisonPill,
-        boolean logicError,
-        Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
-    {
-      counter++;
-
-      if (logicError)
-      {
-        value = new Bytes(stringSerializer.serialize(TOPIC, "{\"next\":-1}"));
-      }
-      if (poisonPill)
-      {
-        value = new Bytes("BOOM!".getBytes());
-      }
-
-      ProducerRecord<Bytes, Bytes> record = new ProducerRecord<>(TOPIC, key, value);
-      record.headers().add("__TypeId__", type.toString().getBytes());
-      messageSender.accept(record);
-    }
-
-    @Override
-    public void assertBusinessLogic()
-    {
-      for (int i=0; i<PARTITIONS; i++)
-      {
-        StateDocument stateDocument =
-            tests.stateRepository.findById(Integer.toString(i)).get();
-
-        stateDocument
-            .results
-            .entrySet()
-            .stream()
-            .forEach(entry ->
-            {
-              String user = entry.getKey();
-              List<AdderResult> resultsForUser = entry.getValue();
-
-              for (int j=0; j < resultsForUser.size(); j++)
-              {
-                if (!(j < state.get(user).size()))
-                {
-                  break;
-                }
-
-                assertThat(resultsForUser.get(j))
-                    .as("Unexpected results calculation %d of user %s", j, user)
-                    .isEqualTo(state.get(user).get(j));
-              }
-
-              assertThat(state.get(user))
-                  .as("More results calculated for user %s as expected", user)
-                  .containsAll(resultsForUser);
-            });
-      }
-    }
-  }
-}
diff --git a/src/test/java/de/juplo/kafka/ErrorCannotBeGeneratedCondition.java b/src/test/java/de/juplo/kafka/ErrorCannotBeGeneratedCondition.java
deleted file mode 100644
index 606218fa..00000000
--- a/src/test/java/de/juplo/kafka/ErrorCannotBeGeneratedCondition.java
+++ /dev/null
@@ -1,60 +0,0 @@
-package de.juplo.kafka;
-
-import org.junit.jupiter.api.extension.ConditionEvaluationResult;
-import org.junit.jupiter.api.extension.ExecutionCondition;
-import org.junit.jupiter.api.extension.ExtensionContext;
-import org.junit.platform.commons.util.AnnotationUtils;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Optional;
-import java.util.stream.Collectors;
-
-
-public class ErrorCannotBeGeneratedCondition implements ExecutionCondition
-{
-  @Override
-  public ConditionEvaluationResult evaluateExecutionCondition(ExtensionContext context)
-  {
-    final Optional<SkipWhenErrorCannotBeGenerated> optional =
-        AnnotationUtils.findAnnotation(
-            context.getElement(),
-            SkipWhenErrorCannotBeGenerated.class);
-
-    if (context.getTestInstance().isEmpty())
-      return ConditionEvaluationResult.enabled("Test-instance ist not available");
-
-    if (optional.isPresent())
-    {
-      SkipWhenErrorCannotBeGenerated skipWhenErrorCannotBeGenerated = optional.get();
-      GenericApplicationTests instance = (GenericApplicationTests)context.getTestInstance().get();
-      List<String> missingRequiredErrors = new LinkedList<>();
-
-      if (skipWhenErrorCannotBeGenerated.poisonPill() && !instance.recordGenerator.canGeneratePoisonPill())
-        missingRequiredErrors.add("Poison-Pill");
-
-      if (skipWhenErrorCannotBeGenerated.logicError() && !instance.recordGenerator.canGenerateLogicError())
-        missingRequiredErrors.add("Logic-Error");
-
-      StringBuilder builder = new StringBuilder();
-      builder.append(context.getTestClass().get().getSimpleName());
-
-      if (missingRequiredErrors.isEmpty())
-      {
-        builder.append(" can generate all required types of errors");
-        return ConditionEvaluationResult.enabled(builder.toString());
-      }
-
-      builder.append(" cannot generate the required error(s): ");
-      builder.append(
-          missingRequiredErrors
-              .stream()
-              .collect(Collectors.joining(", ")));
-
-      return ConditionEvaluationResult.disabled(builder.toString());
-    }
-
-    return ConditionEvaluationResult.enabled(
-        "Not annotated with " + SkipWhenErrorCannotBeGenerated.class.getSimpleName());
-  }
-}
diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java
deleted file mode 100644
index 8849317a..00000000
--- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java
+++ /dev/null
@@ -1,405 +0,0 @@
-package de.juplo.kafka;
-
-import com.mongodb.client.MongoClient;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.RecordDeserializationException;
-import org.apache.kafka.common.serialization.*;
-import org.apache.kafka.common.utils.Bytes;
-import org.junit.jupiter.api.*;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
-import org.springframework.boot.autoconfigure.mongo.MongoProperties;
-import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo;
-import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer;
-import org.springframework.boot.test.context.TestConfiguration;
-import org.springframework.context.annotation.Import;
-import org.springframework.kafka.test.context.EmbeddedKafka;
-import org.springframework.test.context.TestPropertySource;
-import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
-
-import java.time.Duration;
-import java.util.*;
-import java.util.concurrent.ExecutorService;
-import java.util.function.BiConsumer;
-import java.util.function.Consumer;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-import static de.juplo.kafka.GenericApplicationTests.PARTITIONS;
-import static de.juplo.kafka.GenericApplicationTests.TOPIC;
-import static org.assertj.core.api.Assertions.*;
-import static org.awaitility.Awaitility.*;
-
-
-@SpringJUnitConfig(initializers = ConfigDataApplicationContextInitializer.class)
-@TestPropertySource(
-		properties = {
-				"sumup.adder.bootstrap-server=${spring.embedded.kafka.brokers}",
-				"sumup.adder.topic=" + TOPIC,
-				"sumup.adder.commit-interval=500ms",
-				"spring.mongodb.embedded.version=4.4.13" })
-@EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
-@EnableAutoConfiguration
-@AutoConfigureDataMongo
-@Slf4j
-abstract class GenericApplicationTests<K, V>
-{
-	public static final String TOPIC = "FOO";
-	public static final int PARTITIONS = 10;
-
-
-	@Autowired
-	KafkaConsumer<K, V> kafkaConsumer;
-	@Autowired
-	Consumer<ConsumerRecord<K, V>> consumer;
-	@Autowired
-	ApplicationProperties properties;
-	@Autowired
-	ExecutorService executor;
-	@Autowired
-	MongoClient mongoClient;
-	@Autowired
-	MongoProperties mongoProperties;
-	@Autowired
-	ConsumerRebalanceListener rebalanceListener;
-	@Autowired
-	RecordHandler<K, V> recordHandler;
-
-	KafkaProducer<Bytes, Bytes> testRecordProducer;
-	KafkaConsumer<Bytes, Bytes> offsetConsumer;
-	EndlessConsumer<K, V> endlessConsumer;
-	Map<TopicPartition, Long> oldOffsets;
-	Map<TopicPartition, Long> seenOffsets;
-	Set<ConsumerRecord<K, V>> receivedRecords;
-
-
-	final RecordGenerator recordGenerator;
-	final Consumer<ProducerRecord<Bytes, Bytes>> messageSender;
-
-	public GenericApplicationTests(RecordGenerator recordGenerator)
-	{
-		this.recordGenerator = recordGenerator;
-		this.messageSender = (record) -> sendMessage(record);
-	}
-
-
-	/** Tests methods */
-
-	@Test
-	void commitsCurrentOffsetsOnSuccess() throws Exception
-	{
-		int numberOfGeneratedMessages =
-				recordGenerator.generate(false, false, messageSender);
-
-		await(numberOfGeneratedMessages + " records received")
-				.atMost(Duration.ofSeconds(30))
-				.pollInterval(Duration.ofSeconds(1))
-				.until(() -> receivedRecords.size() >= numberOfGeneratedMessages);
-
-		await("Offsets committed")
-				.atMost(Duration.ofSeconds(10))
-				.pollInterval(Duration.ofSeconds(1))
-				.untilAsserted(() ->
-				{
-					checkSeenOffsetsForProgress();
-					assertSeenOffsetsEqualCommittedOffsets(seenOffsets);
-				});
-
-		assertThatExceptionOfType(IllegalStateException.class)
-				.isThrownBy(() -> endlessConsumer.exitStatus())
-				.describedAs("Consumer should still be running");
-
-		endlessConsumer.stop();
-		recordGenerator.assertBusinessLogic();
-	}
-
-	@Test
-	@SkipWhenErrorCannotBeGenerated(poisonPill = true)
-	void commitsOffsetOfErrorForReprocessingOnDeserializationError()
-	{
-		int numberOfGeneratedMessages =
-				recordGenerator.generate(true, false, messageSender);
-
-		await("Consumer failed")
-				.atMost(Duration.ofSeconds(30))
-				.pollInterval(Duration.ofSeconds(1))
-				.until(() -> !endlessConsumer.running());
-
-		checkSeenOffsetsForProgress();
-		assertSeenOffsetsEqualCommittedOffsets(seenOffsets);
-
-		endlessConsumer.start();
-		await("Consumer failed")
-				.atMost(Duration.ofSeconds(30))
-				.pollInterval(Duration.ofSeconds(1))
-				.until(() -> !endlessConsumer.running());
-
-		checkSeenOffsetsForProgress();
-		assertSeenOffsetsEqualCommittedOffsets(seenOffsets);
-		assertThat(receivedRecords.size())
-				.describedAs("Received not all sent events")
-				.isLessThan(numberOfGeneratedMessages);
-
-		assertThatNoException()
-				.describedAs("Consumer should not be running")
-				.isThrownBy(() -> endlessConsumer.exitStatus());
-		assertThat(endlessConsumer.exitStatus())
-				.describedAs("Consumer should have exited abnormally")
-				.containsInstanceOf(RecordDeserializationException.class);
-
-		recordGenerator.assertBusinessLogic();
-	}
-
-	@Test
-	@SkipWhenErrorCannotBeGenerated(logicError = true)
-	void doesNotCommitOffsetsOnLogicError()
-	{
-		int numberOfGeneratedMessages =
-				recordGenerator.generate(false, true, messageSender);
-
-		await("Consumer failed")
-				.atMost(Duration.ofSeconds(30))
-				.pollInterval(Duration.ofSeconds(1))
-				.until(() -> !endlessConsumer.running());
-
-		checkSeenOffsetsForProgress();
-		assertSeenOffsetsAreBehindCommittedOffsets(seenOffsets);
-
-		endlessConsumer.start();
-		await("Consumer failed")
-				.atMost(Duration.ofSeconds(30))
-				.pollInterval(Duration.ofSeconds(1))
-				.until(() -> !endlessConsumer.running());
-
-		assertSeenOffsetsAreBehindCommittedOffsets(seenOffsets);
-
-		assertThatNoException()
-				.describedAs("Consumer should not be running")
-				.isThrownBy(() -> endlessConsumer.exitStatus());
-		assertThat(endlessConsumer.exitStatus())
-				.describedAs("Consumer should have exited abnormally")
-				.containsInstanceOf(RuntimeException.class);
-
-		recordGenerator.assertBusinessLogic();
-	}
-
-
-	/** Helper methods for the verification of expectations */
-
-	void assertSeenOffsetsEqualCommittedOffsets(Map<TopicPartition, Long> offsetsToCheck)
-	{
-		doForCurrentOffsets((tp, offset) ->
-		{
-			Long expected = offsetsToCheck.get(tp) + 1;
-			log.debug("Checking, if the offset {} for {} is exactly {}", offset, tp, expected);
-			assertThat(offset)
-					.describedAs("Committed offset corresponds to the offset of the consumer")
-					.isEqualTo(expected);
-		});
-	}
-
-	void assertSeenOffsetsAreBehindCommittedOffsets(Map<TopicPartition, Long> offsetsToCheck)
-	{
-		List<Boolean> isOffsetBehindSeen = new LinkedList<>();
-
-		doForCurrentOffsets((tp, offset) ->
-		{
-			Long expected = offsetsToCheck.get(tp) + 1;
-			log.debug("Checking, if the offset {} for {} is at most {}", offset, tp, expected);
-			assertThat(offset)
-					.describedAs("Committed offset must be at most equal to the offset of the consumer")
-					.isLessThanOrEqualTo(expected);
-			isOffsetBehindSeen.add(offset < expected);
-		});
-
-		assertThat(isOffsetBehindSeen.stream().reduce(false, (result, next) -> result | next))
-				.describedAs("Committed offsets are behind seen offsets")
-				.isTrue();
-	}
-
-	void checkSeenOffsetsForProgress()
-	{
-		// Be sure, that some messages were consumed...!
-		Set<TopicPartition> withProgress = new HashSet<>();
-		partitions().forEach(tp ->
-		{
-			Long oldOffset = oldOffsets.get(tp) + 1;
-			Long newOffset = seenOffsets.get(tp) + 1;
-			if (!oldOffset.equals(newOffset))
-			{
-				log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset);
-				withProgress.add(tp);
-			}
-		});
-		assertThat(withProgress)
-				.describedAs("Some offsets must have changed, compared to the old offset-positions")
-				.isNotEmpty();
-	}
-
-
-	/** Helper methods for setting up and running the tests */
-
-	void seekToEnd()
-	{
-		offsetConsumer.assign(partitions());
-		offsetConsumer.seekToEnd(partitions());
-		partitions().forEach(tp ->
-		{
-			// seekToEnd() works lazily: it only takes effect on poll()/position()
-			Long offset = offsetConsumer.position(tp);
-			log.info("New position for {}: {}", tp, offset);
-		});
-		// The new positions must be commited!
-		offsetConsumer.commitSync();
-		offsetConsumer.unsubscribe();
-	}
-
-	void doForCurrentOffsets(BiConsumer<TopicPartition, Long> consumer)
-	{
-		offsetConsumer.assign(partitions());
-		partitions().forEach(tp -> consumer.accept(tp, offsetConsumer.position(tp)));
-		offsetConsumer.unsubscribe();
-	}
-
-	List<TopicPartition> partitions()
-	{
-		return
-				IntStream
-						.range(0, PARTITIONS)
-						.mapToObj(partition -> new TopicPartition(TOPIC, partition))
-						.collect(Collectors.toList());
-	}
-
-
-	public interface RecordGenerator
-	{
-		int generate(
-				boolean poisonPills,
-				boolean logicErrors,
-				Consumer<ProducerRecord<Bytes, Bytes>> messageSender);
-
-		default boolean canGeneratePoisonPill()
-		{
-			return true;
-		}
-
-		default boolean canGenerateLogicError()
-		{
-			return true;
-		}
-
-		default void assertBusinessLogic()
-		{
-			log.debug("No business-logic to assert");
-		}
-	}
-
-	void sendMessage(ProducerRecord<Bytes, Bytes> record)
-	{
-		testRecordProducer.send(record, (metadata, e) ->
-		{
-			if (metadata != null)
-			{
-				log.debug(
-						"{}|{} - {}={}",
-						metadata.partition(),
-						metadata.offset(),
-						record.key(),
-						record.value());
-			}
-			else
-			{
-				log.warn(
-						"Exception for {}={}: {}",
-						record.key(),
-						record.value(),
-						e.toString());
-			}
-		});
-	}
-
-
-	@BeforeEach
-	public void init()
-	{
-		Properties props;
-		props = new Properties();
-		props.put("bootstrap.servers", properties.getBootstrapServer());
-		props.put("linger.ms", 100);
-		props.put("key.serializer", BytesSerializer.class.getName());
-		props.put("value.serializer", BytesSerializer.class.getName());
-		testRecordProducer = new KafkaProducer<>(props);
-
-		props = new Properties();
-		props.put("bootstrap.servers", properties.getBootstrapServer());
-		props.put("client.id", "OFFSET-CONSUMER");
-		props.put("group.id", properties.getGroupId());
-		props.put("key.deserializer", BytesDeserializer.class.getName());
-		props.put("value.deserializer", BytesDeserializer.class.getName());
-		offsetConsumer = new KafkaConsumer<>(props);
-
-		mongoClient.getDatabase(mongoProperties.getDatabase()).drop();
-		seekToEnd();
-
-		oldOffsets = new HashMap<>();
-		seenOffsets = new HashMap<>();
-		receivedRecords = new HashSet<>();
-
-		doForCurrentOffsets((tp, offset) ->
-		{
-			oldOffsets.put(tp, offset - 1);
-			seenOffsets.put(tp, offset - 1);
-		});
-
-		TestRecordHandler<K, V> captureOffsetAndExecuteTestHandler =
-				new TestRecordHandler<K, V>(recordHandler)
-				{
-					@Override
-					public void onNewRecord(ConsumerRecord<K, V> record)
-					{
-						seenOffsets.put(
-								new TopicPartition(record.topic(), record.partition()),
-								record.offset());
-						receivedRecords.add(record);
-					}
-				};
-
-		endlessConsumer =
-				new EndlessConsumer<>(
-						executor,
-						properties.getClientId(),
-						properties.getTopic(),
-						kafkaConsumer,
-						rebalanceListener,
-						captureOffsetAndExecuteTestHandler);
-
-		endlessConsumer.start();
-	}
-
-	@AfterEach
-	public void deinit()
-	{
-		try
-		{
-			testRecordProducer.close();
-			offsetConsumer.close();
-		}
-		catch (Exception e)
-		{
-			log.info("Exception while stopping the consumer: {}", e.toString());
-		}
-	}
-
-
-	@TestConfiguration
-	@Import(ApplicationConfiguration.class)
-	public static class Configuration
-	{
-	}
-}
diff --git a/src/test/java/de/juplo/kafka/SkipWhenErrorCannotBeGenerated.java b/src/test/java/de/juplo/kafka/SkipWhenErrorCannotBeGenerated.java
deleted file mode 100644
index 6d15e9e7..00000000
--- a/src/test/java/de/juplo/kafka/SkipWhenErrorCannotBeGenerated.java
+++ /dev/null
@@ -1,15 +0,0 @@
-package de.juplo.kafka;
-
-import org.junit.jupiter.api.extension.ExtendWith;
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-
-
-@Retention(RetentionPolicy.RUNTIME)
-@ExtendWith(ErrorCannotBeGeneratedCondition.class)
-public @interface SkipWhenErrorCannotBeGenerated
-{
-  boolean poisonPill() default false;
-  boolean logicError() default false;
-}
diff --git a/src/test/java/de/juplo/kafka/TestRecordHandler.java b/src/test/java/de/juplo/kafka/TestRecordHandler.java
deleted file mode 100644
index b4efdd61..00000000
--- a/src/test/java/de/juplo/kafka/TestRecordHandler.java
+++ /dev/null
@@ -1,22 +0,0 @@
-package de.juplo.kafka;
-
-import lombok.RequiredArgsConstructor;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-
-
-@RequiredArgsConstructor
-public abstract class TestRecordHandler<K, V> implements RecordHandler<K, V>
-{
-  private final RecordHandler<K, V> handler;
-
-
-  public abstract void onNewRecord(ConsumerRecord<K, V> record);
-
-
-  @Override
-  public void accept(ConsumerRecord<K, V> record)
-  {
-    this.onNewRecord(record);
-    handler.accept(record);
-  }
-}
-- 
2.20.1