Addder beendet sich bei Fehler und Logik für Beenden vereinfacht
authorKai Moritz <kai@juplo.de>
Sun, 18 Sep 2022 04:31:25 +0000 (06:31 +0200)
committerKai Moritz <kai@juplo.de>
Sun, 18 Sep 2022 09:52:41 +0000 (11:52 +0200)
* Der Adder beendet sich bei einer Exception vollständig.
* _Hintergrund:_ Zustand ist in der Übung leichter erkennbar.
* Die Logik, ob beim Beenden ein Commit durchgeführt wurde, wurde so weit
  wie möglich vereinfacht.
* Da der Test aus zeitmangel nicht angepasst werden konnte, wurde er
  vorerst entfernt.

docker-compose.yml
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/DriverController.java
src/main/java/de/juplo/kafka/EndlessConsumer.java
src/test/java/de/juplo/kafka/ApplicationTests.java [deleted file]
src/test/java/de/juplo/kafka/ErrorCannotBeGeneratedCondition.java [deleted file]
src/test/java/de/juplo/kafka/GenericApplicationTests.java [deleted file]
src/test/java/de/juplo/kafka/SkipWhenErrorCannotBeGenerated.java [deleted file]
src/test/java/de/juplo/kafka/TestRecordHandler.java [deleted file]

index 5d33cd1..3f7188c 100644 (file)
@@ -115,13 +115,14 @@ services:
       sumup.requests.client-id: requests-1
 
   requests-2:
-    image: juplo/sumup-requests-json:1.0-SNAPSHOT
+    image: juplo/sumup-requests-fehlerteufel:1.0-SNAPSHOT
     ports:
       - 8082:8080
     environment:
       server.port: 8080
       sumup.requests.bootstrap-server: kafka:9092
       sumup.requests.client-id: requests-2
+      sumup.requests.error-position: 6
 
   adder-1:
     image: juplo/sumup-adder-json:1.0-SNAPSHOT
@@ -135,7 +136,7 @@ services:
       sumup.adder.throttle: 3ms
       spring.data.mongodb.uri: mongodb://juplo:training@mongo:27017
       spring.data.mongodb.database: juplo
-      logging.level.org.apache.kafka.clients.consumer: DEBUG
+      logging.level.org.apache.kafka.clients.consumer: INFO
 
   adder-2:
     image: juplo/sumup-adder-json:1.0-SNAPSHOT
@@ -149,7 +150,7 @@ services:
       sumup.adder.throttle: 3ms
       spring.data.mongodb.uri: mongodb://juplo:training@mongo:27017
       spring.data.mongodb.database: juplo
-      logging.level.org.apache.kafka.clients.consumer: DEBUG
+      logging.level.org.apache.kafka.clients.consumer: INFO
 
   peter:
     image: juplo/toolbox
index 6137411..93db3b5 100644 (file)
@@ -3,6 +3,7 @@ package de.juplo.kafka;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.ConfigurableApplicationContext;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.kafka.support.serializer.JsonDeserializer;
@@ -53,6 +54,7 @@ public class ApplicationConfiguration
   public EndlessConsumer<String, Message> endlessConsumer(
       KafkaConsumer<String, Message> kafkaConsumer,
       ExecutorService executor,
+      ConfigurableApplicationContext applicationContext,
       ApplicationRebalanceListener rebalanceListener,
       ApplicationRecordHandler recordHandler,
       ApplicationProperties properties)
@@ -60,6 +62,7 @@ public class ApplicationConfiguration
     return
         new EndlessConsumer<>(
             executor,
+            applicationContext,
             properties.getClientId(),
             properties.getTopic(),
             kafkaConsumer,
index 26a5bc8..cc3abed 100644 (file)
@@ -8,7 +8,6 @@ import org.springframework.web.bind.annotation.*;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 
 
@@ -16,24 +15,10 @@ import java.util.stream.Collectors;
 @RequiredArgsConstructor
 public class DriverController
 {
-  private final EndlessConsumer consumer;
   private final ApplicationRecordHandler recordHandler;
   private final AdderResults results;
 
 
-  @PostMapping("start")
-  public void start()
-  {
-    consumer.start();
-  }
-
-  @PostMapping("stop")
-  public void stop() throws ExecutionException, InterruptedException
-  {
-    consumer.stop();
-  }
-
-
   @GetMapping("state")
   public Map<Integer, Map<String, AdderResult>> state()
   {
index 00678c4..9ea944b 100644 (file)
@@ -6,15 +6,11 @@ import org.apache.kafka.clients.consumer.*;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.RecordDeserializationException;
 import org.apache.kafka.common.errors.WakeupException;
+import org.springframework.context.ConfigurableApplicationContext;
 
-import javax.annotation.PreDestroy;
 import java.time.Duration;
 import java.util.*;
-import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 
 
 @Slf4j
@@ -22,16 +18,15 @@ import java.util.concurrent.locks.ReentrantLock;
 public class EndlessConsumer<K, V> implements Runnable
 {
   private final ExecutorService executor;
+  private final ConfigurableApplicationContext applicationContext;
   private final String id;
   private final String topic;
   private final Consumer<K, V> consumer;
   private final ConsumerRebalanceListener rebalanceListener;
   private final RecordHandler<K, V> recordHandler;
 
-  private final Lock lock = new ReentrantLock();
-  private final Condition condition = lock.newCondition();
   private boolean running = false;
-  private Exception exception;
+  private Exception exception = null;
   private long consumed = 0;
 
 
@@ -72,8 +67,6 @@ public class EndlessConsumer<K, V> implements Runnable
     catch(WakeupException e)
     {
       log.info("{} - RIIING! Request to stop consumption - commiting current offsets!", id);
-      consumer.commitSync();
-      shutdown();
     }
     catch(RecordDeserializationException e)
     {
@@ -85,128 +78,50 @@ public class EndlessConsumer<K, V> implements Runnable
           tp,
           offset,
           e.getCause().toString());
-
-      consumer.commitSync();
-      shutdown(e);
+      this.exception = e;
     }
     catch(Exception e)
     {
       log.error("{} - Unexpected error: {}", id, e.toString(), e);
-      shutdown(e);
+      this.exception = e;
+      log.info("{} - Unsubscribing...", id);
+      consumer.unsubscribe();
     }
     finally
     {
+      running = false;
+      log.info("{} - Closing the consumer...", id);
+      consumer.close();
+      log.info("{} - Shutting down the app...", id);
+      applicationContext.close();
       log.info("{} - Consumer-Thread exiting", id);
     }
   }
 
-  private void shutdown()
-  {
-    shutdown(null);
-  }
-
-  private void shutdown(Exception e)
-  {
-    lock.lock();
-    try
-    {
-      try
-      {
-        log.info("{} - Unsubscribing from topic {}", id, topic);
-        consumer.unsubscribe();
-      }
-      catch (Exception ue)
-      {
-        log.error(
-            "{} - Error while unsubscribing from topic {}: {}",
-            id,
-            topic,
-            ue.toString());
-      }
-      finally
-      {
-        running = false;
-        exception = e;
-        condition.signal();
-      }
-    }
-    finally
-    {
-      lock.unlock();
-    }
-  }
-
   public void start()
   {
-    lock.lock();
-    try
-    {
-      if (running)
-        throw new IllegalStateException("Consumer instance " + id + " is already running!");
-
-      log.info("{} - Starting - consumed {} messages before", id, consumed);
-      running = true;
-      exception = null;
-      executor.submit(this);
-    }
-    finally
-    {
-      lock.unlock();
-    }
-  }
-
-  public synchronized void stop() throws InterruptedException
-  {
-    lock.lock();
-    try
-    {
-      if (!running)
-        throw new IllegalStateException("Consumer instance " + id + " is not running!");
+    if (running)
+      throw new IllegalStateException("Consumer instance " + id + " is already running!");
 
-      log.info("{} - Stopping", id);
-      consumer.wakeup();
-      condition.await();
-      log.info("{} - Stopped - consumed {} messages so far", id, consumed);
-    }
-    finally
-    {
-      lock.unlock();
-    }
+    log.info("{} - Starting - consumed {} messages before", id, consumed);
+    running = true;
+    executor.submit(this);
   }
 
-  @PreDestroy
-  public void destroy() throws ExecutionException, InterruptedException
+  public void stop()
   {
-    log.info("{} - Destroy!", id);
-    log.info("{}: Consumed {} messages in total, exiting!", id, consumed);
+    consumer.wakeup();
   }
-
   public boolean running()
   {
-    lock.lock();
-    try
-    {
-      return running;
-    }
-    finally
-    {
-      lock.unlock();
-    }
+    return running;
   }
 
   public Optional<Exception> exitStatus()
   {
-    lock.lock();
-    try
-    {
-      if (running)
-        throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!");
+    if (running)
+      throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!");
 
-      return Optional.ofNullable(exception);
-    }
-    finally
-    {
-      lock.unlock();
-    }
+    return Optional.ofNullable(exception);
   }
 }
diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java
deleted file mode 100644 (file)
index bd9f449..0000000
+++ /dev/null
@@ -1,172 +0,0 @@
-package de.juplo.kafka;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.utils.Bytes;
-import org.springframework.beans.factory.annotation.Autowired;
-
-import java.util.*;
-import java.util.function.Consumer;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-
-@Slf4j
-public class ApplicationTests extends GenericApplicationTests<String, Message>
-{
-  @Autowired
-  StateRepository stateRepository;
-
-
-  public ApplicationTests()
-  {
-    super(new ApplicationTestRecrodGenerator());
-    ((ApplicationTestRecrodGenerator)recordGenerator).tests = this;
-  }
-
-
-  static class ApplicationTestRecrodGenerator implements RecordGenerator
-  {
-    ApplicationTests tests;
-
-    final int[] numbers = {1, 77, 33, 2, 66, 666, 11};
-    final String[] dieWilden13 =
-        IntStream
-            .range(1, 14)
-            .mapToObj(i -> "seeräuber-" + i)
-            .toArray(i -> new String[i]);
-    final StringSerializer stringSerializer = new StringSerializer();
-    final Bytes calculateMessage = new Bytes(stringSerializer.serialize(TOPIC, "{}"));
-
-    int counter = 0;
-
-    Map<String, List<AdderResult>> state;
-
-    @Override
-    public int generate(
-        boolean poisonPills,
-        boolean logicErrors,
-        Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
-    {
-      counter = 0;
-      state =
-          Arrays
-              .stream(dieWilden13)
-              .collect(Collectors.toMap(
-                  seeräuber -> seeräuber,
-                  seeräuber -> new LinkedList()));
-
-      int number[] = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 };
-      int message[] = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 };
-      int next = 0;
-
-      for (int pass = 0; pass < 333; pass++)
-      {
-        for (int i = 0; i<13; i++)
-        {
-          String seeräuber = dieWilden13[i];
-          Bytes key = new Bytes(stringSerializer.serialize(TOPIC, seeräuber));
-
-          if (message[i] > number[i])
-          {
-            send(
-              key,
-              calculateMessage,
-              Message.Type.CALC,
-              poisonPill(poisonPills, pass, counter),
-              logicError(logicErrors, pass, counter),
-              messageSender);
-            state.get(seeräuber).add(new AdderResult(number[i], (number[i] + 1) * number[i] / 2));
-            // Pick next number to calculate
-            number[i] = numbers[next++%numbers.length];
-            message[i] = 1;
-            log.debug("Seeräuber {} will die Summe für {} berechnen", seeräuber, number[i]);
-          }
-
-          send(
-            key,
-            new Bytes(stringSerializer.serialize(TOPIC, "{\"next\":" + message[i]++ + "}")),
-            Message.Type.ADD,
-            poisonPill(poisonPills, pass, counter),
-            logicError(logicErrors, pass, counter),
-            messageSender);
-        }
-      }
-
-      return counter;
-    }
-
-    boolean poisonPill (boolean poisonPills, int pass, int counter)
-    {
-      return poisonPills && pass > 300 && counter%99 == 0;
-    }
-
-    boolean logicError(boolean logicErrors, int pass, int counter)
-    {
-      return logicErrors && pass > 300 && counter%77 == 0;
-    }
-
-    void send(
-        Bytes key,
-        Bytes value,
-        Message.Type type,
-        boolean poisonPill,
-        boolean logicError,
-        Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
-    {
-      counter++;
-
-      if (logicError)
-      {
-        value = new Bytes(stringSerializer.serialize(TOPIC, "{\"next\":-1}"));
-      }
-      if (poisonPill)
-      {
-        value = new Bytes("BOOM!".getBytes());
-      }
-
-      ProducerRecord<Bytes, Bytes> record = new ProducerRecord<>(TOPIC, key, value);
-      record.headers().add("__TypeId__", type.toString().getBytes());
-      messageSender.accept(record);
-    }
-
-    @Override
-    public void assertBusinessLogic()
-    {
-      for (int i=0; i<PARTITIONS; i++)
-      {
-        StateDocument stateDocument =
-            tests.stateRepository.findById(Integer.toString(i)).get();
-
-        stateDocument
-            .results
-            .entrySet()
-            .stream()
-            .forEach(entry ->
-            {
-              String user = entry.getKey();
-              List<AdderResult> resultsForUser = entry.getValue();
-
-              for (int j=0; j < resultsForUser.size(); j++)
-              {
-                if (!(j < state.get(user).size()))
-                {
-                  break;
-                }
-
-                assertThat(resultsForUser.get(j))
-                    .as("Unexpected results calculation %d of user %s", j, user)
-                    .isEqualTo(state.get(user).get(j));
-              }
-
-              assertThat(state.get(user))
-                  .as("More results calculated for user %s as expected", user)
-                  .containsAll(resultsForUser);
-            });
-      }
-    }
-  }
-}
diff --git a/src/test/java/de/juplo/kafka/ErrorCannotBeGeneratedCondition.java b/src/test/java/de/juplo/kafka/ErrorCannotBeGeneratedCondition.java
deleted file mode 100644 (file)
index 606218f..0000000
+++ /dev/null
@@ -1,60 +0,0 @@
-package de.juplo.kafka;
-
-import org.junit.jupiter.api.extension.ConditionEvaluationResult;
-import org.junit.jupiter.api.extension.ExecutionCondition;
-import org.junit.jupiter.api.extension.ExtensionContext;
-import org.junit.platform.commons.util.AnnotationUtils;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Optional;
-import java.util.stream.Collectors;
-
-
-public class ErrorCannotBeGeneratedCondition implements ExecutionCondition
-{
-  @Override
-  public ConditionEvaluationResult evaluateExecutionCondition(ExtensionContext context)
-  {
-    final Optional<SkipWhenErrorCannotBeGenerated> optional =
-        AnnotationUtils.findAnnotation(
-            context.getElement(),
-            SkipWhenErrorCannotBeGenerated.class);
-
-    if (context.getTestInstance().isEmpty())
-      return ConditionEvaluationResult.enabled("Test-instance ist not available");
-
-    if (optional.isPresent())
-    {
-      SkipWhenErrorCannotBeGenerated skipWhenErrorCannotBeGenerated = optional.get();
-      GenericApplicationTests instance = (GenericApplicationTests)context.getTestInstance().get();
-      List<String> missingRequiredErrors = new LinkedList<>();
-
-      if (skipWhenErrorCannotBeGenerated.poisonPill() && !instance.recordGenerator.canGeneratePoisonPill())
-        missingRequiredErrors.add("Poison-Pill");
-
-      if (skipWhenErrorCannotBeGenerated.logicError() && !instance.recordGenerator.canGenerateLogicError())
-        missingRequiredErrors.add("Logic-Error");
-
-      StringBuilder builder = new StringBuilder();
-      builder.append(context.getTestClass().get().getSimpleName());
-
-      if (missingRequiredErrors.isEmpty())
-      {
-        builder.append(" can generate all required types of errors");
-        return ConditionEvaluationResult.enabled(builder.toString());
-      }
-
-      builder.append(" cannot generate the required error(s): ");
-      builder.append(
-          missingRequiredErrors
-              .stream()
-              .collect(Collectors.joining(", ")));
-
-      return ConditionEvaluationResult.disabled(builder.toString());
-    }
-
-    return ConditionEvaluationResult.enabled(
-        "Not annotated with " + SkipWhenErrorCannotBeGenerated.class.getSimpleName());
-  }
-}
diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java
deleted file mode 100644 (file)
index 8849317..0000000
+++ /dev/null
@@ -1,405 +0,0 @@
-package de.juplo.kafka;
-
-import com.mongodb.client.MongoClient;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.RecordDeserializationException;
-import org.apache.kafka.common.serialization.*;
-import org.apache.kafka.common.utils.Bytes;
-import org.junit.jupiter.api.*;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
-import org.springframework.boot.autoconfigure.mongo.MongoProperties;
-import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo;
-import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer;
-import org.springframework.boot.test.context.TestConfiguration;
-import org.springframework.context.annotation.Import;
-import org.springframework.kafka.test.context.EmbeddedKafka;
-import org.springframework.test.context.TestPropertySource;
-import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
-
-import java.time.Duration;
-import java.util.*;
-import java.util.concurrent.ExecutorService;
-import java.util.function.BiConsumer;
-import java.util.function.Consumer;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-import static de.juplo.kafka.GenericApplicationTests.PARTITIONS;
-import static de.juplo.kafka.GenericApplicationTests.TOPIC;
-import static org.assertj.core.api.Assertions.*;
-import static org.awaitility.Awaitility.*;
-
-
-@SpringJUnitConfig(initializers = ConfigDataApplicationContextInitializer.class)
-@TestPropertySource(
-               properties = {
-                               "sumup.adder.bootstrap-server=${spring.embedded.kafka.brokers}",
-                               "sumup.adder.topic=" + TOPIC,
-                               "sumup.adder.commit-interval=500ms",
-                               "spring.mongodb.embedded.version=4.4.13" })
-@EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
-@EnableAutoConfiguration
-@AutoConfigureDataMongo
-@Slf4j
-abstract class GenericApplicationTests<K, V>
-{
-       public static final String TOPIC = "FOO";
-       public static final int PARTITIONS = 10;
-
-
-       @Autowired
-       KafkaConsumer<K, V> kafkaConsumer;
-       @Autowired
-       Consumer<ConsumerRecord<K, V>> consumer;
-       @Autowired
-       ApplicationProperties properties;
-       @Autowired
-       ExecutorService executor;
-       @Autowired
-       MongoClient mongoClient;
-       @Autowired
-       MongoProperties mongoProperties;
-       @Autowired
-       ConsumerRebalanceListener rebalanceListener;
-       @Autowired
-       RecordHandler<K, V> recordHandler;
-
-       KafkaProducer<Bytes, Bytes> testRecordProducer;
-       KafkaConsumer<Bytes, Bytes> offsetConsumer;
-       EndlessConsumer<K, V> endlessConsumer;
-       Map<TopicPartition, Long> oldOffsets;
-       Map<TopicPartition, Long> seenOffsets;
-       Set<ConsumerRecord<K, V>> receivedRecords;
-
-
-       final RecordGenerator recordGenerator;
-       final Consumer<ProducerRecord<Bytes, Bytes>> messageSender;
-
-       public GenericApplicationTests(RecordGenerator recordGenerator)
-       {
-               this.recordGenerator = recordGenerator;
-               this.messageSender = (record) -> sendMessage(record);
-       }
-
-
-       /** Tests methods */
-
-       @Test
-       void commitsCurrentOffsetsOnSuccess() throws Exception
-       {
-               int numberOfGeneratedMessages =
-                               recordGenerator.generate(false, false, messageSender);
-
-               await(numberOfGeneratedMessages + " records received")
-                               .atMost(Duration.ofSeconds(30))
-                               .pollInterval(Duration.ofSeconds(1))
-                               .until(() -> receivedRecords.size() >= numberOfGeneratedMessages);
-
-               await("Offsets committed")
-                               .atMost(Duration.ofSeconds(10))
-                               .pollInterval(Duration.ofSeconds(1))
-                               .untilAsserted(() ->
-                               {
-                                       checkSeenOffsetsForProgress();
-                                       assertSeenOffsetsEqualCommittedOffsets(seenOffsets);
-                               });
-
-               assertThatExceptionOfType(IllegalStateException.class)
-                               .isThrownBy(() -> endlessConsumer.exitStatus())
-                               .describedAs("Consumer should still be running");
-
-               endlessConsumer.stop();
-               recordGenerator.assertBusinessLogic();
-       }
-
-       @Test
-       @SkipWhenErrorCannotBeGenerated(poisonPill = true)
-       void commitsOffsetOfErrorForReprocessingOnDeserializationError()
-       {
-               int numberOfGeneratedMessages =
-                               recordGenerator.generate(true, false, messageSender);
-
-               await("Consumer failed")
-                               .atMost(Duration.ofSeconds(30))
-                               .pollInterval(Duration.ofSeconds(1))
-                               .until(() -> !endlessConsumer.running());
-
-               checkSeenOffsetsForProgress();
-               assertSeenOffsetsEqualCommittedOffsets(seenOffsets);
-
-               endlessConsumer.start();
-               await("Consumer failed")
-                               .atMost(Duration.ofSeconds(30))
-                               .pollInterval(Duration.ofSeconds(1))
-                               .until(() -> !endlessConsumer.running());
-
-               checkSeenOffsetsForProgress();
-               assertSeenOffsetsEqualCommittedOffsets(seenOffsets);
-               assertThat(receivedRecords.size())
-                               .describedAs("Received not all sent events")
-                               .isLessThan(numberOfGeneratedMessages);
-
-               assertThatNoException()
-                               .describedAs("Consumer should not be running")
-                               .isThrownBy(() -> endlessConsumer.exitStatus());
-               assertThat(endlessConsumer.exitStatus())
-                               .describedAs("Consumer should have exited abnormally")
-                               .containsInstanceOf(RecordDeserializationException.class);
-
-               recordGenerator.assertBusinessLogic();
-       }
-
-       @Test
-       @SkipWhenErrorCannotBeGenerated(logicError = true)
-       void doesNotCommitOffsetsOnLogicError()
-       {
-               int numberOfGeneratedMessages =
-                               recordGenerator.generate(false, true, messageSender);
-
-               await("Consumer failed")
-                               .atMost(Duration.ofSeconds(30))
-                               .pollInterval(Duration.ofSeconds(1))
-                               .until(() -> !endlessConsumer.running());
-
-               checkSeenOffsetsForProgress();
-               assertSeenOffsetsAreBehindCommittedOffsets(seenOffsets);
-
-               endlessConsumer.start();
-               await("Consumer failed")
-                               .atMost(Duration.ofSeconds(30))
-                               .pollInterval(Duration.ofSeconds(1))
-                               .until(() -> !endlessConsumer.running());
-
-               assertSeenOffsetsAreBehindCommittedOffsets(seenOffsets);
-
-               assertThatNoException()
-                               .describedAs("Consumer should not be running")
-                               .isThrownBy(() -> endlessConsumer.exitStatus());
-               assertThat(endlessConsumer.exitStatus())
-                               .describedAs("Consumer should have exited abnormally")
-                               .containsInstanceOf(RuntimeException.class);
-
-               recordGenerator.assertBusinessLogic();
-       }
-
-
-       /** Helper methods for the verification of expectations */
-
-       void assertSeenOffsetsEqualCommittedOffsets(Map<TopicPartition, Long> offsetsToCheck)
-       {
-               doForCurrentOffsets((tp, offset) ->
-               {
-                       Long expected = offsetsToCheck.get(tp) + 1;
-                       log.debug("Checking, if the offset {} for {} is exactly {}", offset, tp, expected);
-                       assertThat(offset)
-                                       .describedAs("Committed offset corresponds to the offset of the consumer")
-                                       .isEqualTo(expected);
-               });
-       }
-
-       void assertSeenOffsetsAreBehindCommittedOffsets(Map<TopicPartition, Long> offsetsToCheck)
-       {
-               List<Boolean> isOffsetBehindSeen = new LinkedList<>();
-
-               doForCurrentOffsets((tp, offset) ->
-               {
-                       Long expected = offsetsToCheck.get(tp) + 1;
-                       log.debug("Checking, if the offset {} for {} is at most {}", offset, tp, expected);
-                       assertThat(offset)
-                                       .describedAs("Committed offset must be at most equal to the offset of the consumer")
-                                       .isLessThanOrEqualTo(expected);
-                       isOffsetBehindSeen.add(offset < expected);
-               });
-
-               assertThat(isOffsetBehindSeen.stream().reduce(false, (result, next) -> result | next))
-                               .describedAs("Committed offsets are behind seen offsets")
-                               .isTrue();
-       }
-
-       void checkSeenOffsetsForProgress()
-       {
-               // Be sure, that some messages were consumed...!
-               Set<TopicPartition> withProgress = new HashSet<>();
-               partitions().forEach(tp ->
-               {
-                       Long oldOffset = oldOffsets.get(tp) + 1;
-                       Long newOffset = seenOffsets.get(tp) + 1;
-                       if (!oldOffset.equals(newOffset))
-                       {
-                               log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset);
-                               withProgress.add(tp);
-                       }
-               });
-               assertThat(withProgress)
-                               .describedAs("Some offsets must have changed, compared to the old offset-positions")
-                               .isNotEmpty();
-       }
-
-
-       /** Helper methods for setting up and running the tests */
-
-       void seekToEnd()
-       {
-               offsetConsumer.assign(partitions());
-               offsetConsumer.seekToEnd(partitions());
-               partitions().forEach(tp ->
-               {
-                       // seekToEnd() works lazily: it only takes effect on poll()/position()
-                       Long offset = offsetConsumer.position(tp);
-                       log.info("New position for {}: {}", tp, offset);
-               });
-               // The new positions must be commited!
-               offsetConsumer.commitSync();
-               offsetConsumer.unsubscribe();
-       }
-
-       void doForCurrentOffsets(BiConsumer<TopicPartition, Long> consumer)
-       {
-               offsetConsumer.assign(partitions());
-               partitions().forEach(tp -> consumer.accept(tp, offsetConsumer.position(tp)));
-               offsetConsumer.unsubscribe();
-       }
-
-       List<TopicPartition> partitions()
-       {
-               return
-                               IntStream
-                                               .range(0, PARTITIONS)
-                                               .mapToObj(partition -> new TopicPartition(TOPIC, partition))
-                                               .collect(Collectors.toList());
-       }
-
-
-       public interface RecordGenerator
-       {
-               int generate(
-                               boolean poisonPills,
-                               boolean logicErrors,
-                               Consumer<ProducerRecord<Bytes, Bytes>> messageSender);
-
-               default boolean canGeneratePoisonPill()
-               {
-                       return true;
-               }
-
-               default boolean canGenerateLogicError()
-               {
-                       return true;
-               }
-
-               default void assertBusinessLogic()
-               {
-                       log.debug("No business-logic to assert");
-               }
-       }
-
-       void sendMessage(ProducerRecord<Bytes, Bytes> record)
-       {
-               testRecordProducer.send(record, (metadata, e) ->
-               {
-                       if (metadata != null)
-                       {
-                               log.debug(
-                                               "{}|{} - {}={}",
-                                               metadata.partition(),
-                                               metadata.offset(),
-                                               record.key(),
-                                               record.value());
-                       }
-                       else
-                       {
-                               log.warn(
-                                               "Exception for {}={}: {}",
-                                               record.key(),
-                                               record.value(),
-                                               e.toString());
-                       }
-               });
-       }
-
-
-       @BeforeEach
-       public void init()
-       {
-               Properties props;
-               props = new Properties();
-               props.put("bootstrap.servers", properties.getBootstrapServer());
-               props.put("linger.ms", 100);
-               props.put("key.serializer", BytesSerializer.class.getName());
-               props.put("value.serializer", BytesSerializer.class.getName());
-               testRecordProducer = new KafkaProducer<>(props);
-
-               props = new Properties();
-               props.put("bootstrap.servers", properties.getBootstrapServer());
-               props.put("client.id", "OFFSET-CONSUMER");
-               props.put("group.id", properties.getGroupId());
-               props.put("key.deserializer", BytesDeserializer.class.getName());
-               props.put("value.deserializer", BytesDeserializer.class.getName());
-               offsetConsumer = new KafkaConsumer<>(props);
-
-               mongoClient.getDatabase(mongoProperties.getDatabase()).drop();
-               seekToEnd();
-
-               oldOffsets = new HashMap<>();
-               seenOffsets = new HashMap<>();
-               receivedRecords = new HashSet<>();
-
-               doForCurrentOffsets((tp, offset) ->
-               {
-                       oldOffsets.put(tp, offset - 1);
-                       seenOffsets.put(tp, offset - 1);
-               });
-
-               TestRecordHandler<K, V> captureOffsetAndExecuteTestHandler =
-                               new TestRecordHandler<K, V>(recordHandler)
-                               {
-                                       @Override
-                                       public void onNewRecord(ConsumerRecord<K, V> record)
-                                       {
-                                               seenOffsets.put(
-                                                               new TopicPartition(record.topic(), record.partition()),
-                                                               record.offset());
-                                               receivedRecords.add(record);
-                                       }
-                               };
-
-               endlessConsumer =
-                               new EndlessConsumer<>(
-                                               executor,
-                                               properties.getClientId(),
-                                               properties.getTopic(),
-                                               kafkaConsumer,
-                                               rebalanceListener,
-                                               captureOffsetAndExecuteTestHandler);
-
-               endlessConsumer.start();
-       }
-
-       @AfterEach
-       public void deinit()
-       {
-               try
-               {
-                       testRecordProducer.close();
-                       offsetConsumer.close();
-               }
-               catch (Exception e)
-               {
-                       log.info("Exception while stopping the consumer: {}", e.toString());
-               }
-       }
-
-
-       @TestConfiguration
-       @Import(ApplicationConfiguration.class)
-       public static class Configuration
-       {
-       }
-}
diff --git a/src/test/java/de/juplo/kafka/SkipWhenErrorCannotBeGenerated.java b/src/test/java/de/juplo/kafka/SkipWhenErrorCannotBeGenerated.java
deleted file mode 100644 (file)
index 6d15e9e..0000000
+++ /dev/null
@@ -1,15 +0,0 @@
-package de.juplo.kafka;
-
-import org.junit.jupiter.api.extension.ExtendWith;
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-
-
-@Retention(RetentionPolicy.RUNTIME)
-@ExtendWith(ErrorCannotBeGeneratedCondition.class)
-public @interface SkipWhenErrorCannotBeGenerated
-{
-  boolean poisonPill() default false;
-  boolean logicError() default false;
-}
diff --git a/src/test/java/de/juplo/kafka/TestRecordHandler.java b/src/test/java/de/juplo/kafka/TestRecordHandler.java
deleted file mode 100644 (file)
index b4efdd6..0000000
+++ /dev/null
@@ -1,22 +0,0 @@
-package de.juplo.kafka;
-
-import lombok.RequiredArgsConstructor;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-
-
-@RequiredArgsConstructor
-public abstract class TestRecordHandler<K, V> implements RecordHandler<K, V>
-{
-  private final RecordHandler<K, V> handler;
-
-
-  public abstract void onNewRecord(ConsumerRecord<K, V> record);
-
-
-  @Override
-  public void accept(ConsumerRecord<K, V> record)
-  {
-    this.onNewRecord(record);
-    handler.accept(record);
-  }
-}