WIP
authorKai Moritz <kai@juplo.de>
Tue, 1 Nov 2022 18:44:39 +0000 (19:44 +0100)
committerKai Moritz <kai@juplo.de>
Tue, 1 Nov 2022 18:44:39 +0000 (19:44 +0100)
22 files changed:
src/main/java/de/juplo/kafka/AdderBusinessLogic.java [deleted file]
src/main/java/de/juplo/kafka/AdderResult.java [deleted file]
src/main/java/de/juplo/kafka/AdderResults.java [deleted file]
src/main/java/de/juplo/kafka/Application.java
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java [deleted file]
src/main/java/de/juplo/kafka/ApplicationProperties.java
src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java [deleted file]
src/main/java/de/juplo/kafka/ApplicationRecordHandler.java [deleted file]
src/main/java/de/juplo/kafka/DriverController.java [deleted file]
src/main/java/de/juplo/kafka/EndlessConsumer.java
src/main/java/de/juplo/kafka/ErrorResponse.java [deleted file]
src/main/java/de/juplo/kafka/RecordHandler.java [deleted file]
src/main/java/de/juplo/kafka/StateDocument.java [deleted file]
src/main/java/de/juplo/kafka/StateRepository.java [deleted file]
src/main/resources/application.yml
src/test/java/de/juplo/kafka/AdderBusinessLogicTest.java [deleted file]
src/test/java/de/juplo/kafka/ApplicationTests.java [deleted file]
src/test/java/de/juplo/kafka/ErrorCannotBeGeneratedCondition.java [deleted file]
src/test/java/de/juplo/kafka/GenericApplicationTests.java [deleted file]
src/test/java/de/juplo/kafka/SkipWhenErrorCannotBeGenerated.java [deleted file]
src/test/java/de/juplo/kafka/TestRecordHandler.java [deleted file]

diff --git a/src/main/java/de/juplo/kafka/AdderBusinessLogic.java b/src/main/java/de/juplo/kafka/AdderBusinessLogic.java
deleted file mode 100644 (file)
index d525182..0000000
+++ /dev/null
@@ -1,55 +0,0 @@
-package de.juplo.kafka;
-
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-
-
-public class AdderBusinessLogic
-{
-  private final Map<String, AdderResult> state;
-
-
-  public AdderBusinessLogic()
-  {
-    this(new HashMap<>());
-  }
-
-  public AdderBusinessLogic(Map<String, AdderResult> state)
-  {
-    this.state = state;
-  }
-
-
-  public synchronized Optional<Long> getSum(String user)
-  {
-    return Optional.ofNullable(state.get(user)).map(result -> result.sum);
-  }
-
-  public synchronized void addToSum(String user, Integer value)
-  {
-    if (value == null || value < 1)
-      throw new IllegalArgumentException("Not a positive number: " + value);
-
-    long sum =
-        Optional
-            .ofNullable(state.get(user))
-            .map(result -> result.sum)
-            .orElse(0l);
-    state.put(user, new AdderResult(value, sum + value));
-  }
-
-  public synchronized AdderResult calculate(String user)
-  {
-    if (!state.containsKey(user))
-      throw new IllegalStateException("No sumation for " + user + " in progress");
-
-    return state.remove(user);
-  }
-
-  protected Map<String, AdderResult> getState()
-  {
-    return state;
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/AdderResult.java b/src/main/java/de/juplo/kafka/AdderResult.java
deleted file mode 100644 (file)
index 44b7da8..0000000
+++ /dev/null
@@ -1,21 +0,0 @@
-package de.juplo.kafka;
-
-import lombok.EqualsAndHashCode;
-import lombok.Getter;
-import lombok.RequiredArgsConstructor;
-
-
-@RequiredArgsConstructor
-@Getter
-@EqualsAndHashCode
-public class AdderResult
-{
-  final int number;
-  final long sum;
-
-  @Override
-  public String toString()
-  {
-    return "sum(" + number + ") = " + sum;
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/AdderResults.java b/src/main/java/de/juplo/kafka/AdderResults.java
deleted file mode 100644 (file)
index e7f5602..0000000
+++ /dev/null
@@ -1,47 +0,0 @@
-package de.juplo.kafka;
-
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-
-public class AdderResults
-{
-  private final Map<Integer, Map<String, List<AdderResult>>> results = new HashMap<>();
-
-
-  public void addResults(Integer partition, String user, AdderResult result)
-  {
-    Map<String, List<AdderResult>> resultsByUser = this.results.get(partition);
-
-    List<AdderResult> results = resultsByUser.get(user);
-    if (results == null)
-    {
-      results = new LinkedList<>();
-      resultsByUser.put(user, results);
-    }
-
-    results.add(result);
-  }
-
-  protected void addPartition(Integer partition, Map<String, List<AdderResult>> results)
-  {
-    this.results.put(partition, results);
-  }
-
-  protected Map<String, List<AdderResult>> removePartition(Integer partition)
-  {
-    return this.results.remove(partition);
-  }
-
-  public Map<Integer, Map<String, List<AdderResult>>> getState()
-  {
-    return results;
-  }
-
-  public Map<String, List<AdderResult>> getState(Integer partition)
-  {
-    return results.get(partition);
-  }
-}
index 76c2520..b4a960d 100644 (file)
@@ -1,11 +1,14 @@
 package de.juplo.kafka;
 
 import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.ApplicationArguments;
 import org.springframework.boot.ApplicationRunner;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.Bean;
+import org.springframework.kafka.core.ConsumerFactory;
 
 import javax.annotation.PreDestroy;
 import java.util.concurrent.ExecutorService;
@@ -17,57 +20,29 @@ import java.util.concurrent.TimeUnit;
 public class Application implements ApplicationRunner
 {
   @Autowired
-  EndlessConsumer endlessConsumer;
+  Consumer<String, Message> consumer;
   @Autowired
-  ExecutorService executor;
+  SimpleConsumer simpleConsumer;
 
 
   @Override
   public void run(ApplicationArguments args) throws Exception
   {
     log.info("Starting EndlessConsumer");
-    endlessConsumer.start();
+    simpleConsumer.start();
   }
 
   @PreDestroy
   public void shutdown()
   {
-    try
-    {
-      log.info("Stopping EndlessConsumer");
-      endlessConsumer.stop();
-    }
-    catch (IllegalStateException e)
-    {
-      log.info("Was already stopped: {}", e.toString());
-    }
-    catch (Exception e)
-    {
-      log.error("Unexpected exception while stopping EndlessConsumer: {}", e);
-    }
+    log.info("Signaling the consumer to quit its work");
+    consumer.wakeup();
+  }
 
-    try
-    {
-      log.info("Shutting down the ExecutorService.");
-      executor.shutdown();
-      log.info("Waiting 5 seconds for the ExecutorService to terminate...");
-      executor.awaitTermination(5, TimeUnit.SECONDS);
-    }
-    catch (InterruptedException e)
-    {
-      log.error("Exception while waiting for the termination of the ExecutorService: {}", e);
-    }
-    finally
-    {
-      if (!executor.isTerminated())
-      {
-        log.warn("Forcing shutdown of ExecutorService!");
-        executor
-            .shutdownNow()
-            .forEach(runnable -> log.warn("Unprocessed task: {}", runnable.getClass().getSimpleName()));
-      }
-      log.info("Shutdow of ExecutorService finished");
-    }
+  @Bean(destroyMethod = "close")
+  public Consumer<String, Message> kafkaConsumer(ConsumerFactory<String, Message> factory)
+  {
+    return factory.createConsumer();
   }
 
 
index 08c827c..23e9bec 100644 (file)
@@ -6,7 +6,6 @@ import org.springframework.boot.context.properties.EnableConfigurationProperties
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 
-import java.util.Optional;
 import org.springframework.kafka.core.ConsumerFactory;
 
 import java.util.concurrent.ExecutorService;
@@ -18,54 +17,18 @@ import java.util.concurrent.Executors;
 public class ApplicationConfiguration
 {
   @Bean
-  public ApplicationRecordHandler applicationRecordHandler(
-      AdderResults adderResults,
-      KafkaProperties kafkaProperties,
-      ApplicationProperties applicationProperties)
-  {
-    return new ApplicationRecordHandler(
-        adderResults,
-        Optional.ofNullable(applicationProperties.getThrottle()),
-        kafkaProperties.getClientId());
-  }
-
-  @Bean
-  public AdderResults adderResults()
-  {
-    return new AdderResults();
-  }
-
-  @Bean
-  public ApplicationRebalanceListener rebalanceListener(
-      ApplicationRecordHandler recordHandler,
-      AdderResults adderResults,
-      StateRepository stateRepository,
-      KafkaProperties kafkaProperties)
-  {
-    return new ApplicationRebalanceListener(
-        recordHandler,
-        adderResults,
-        stateRepository,
-        kafkaProperties.getClientId());
-  }
-
-  @Bean
-  public EndlessConsumer<String, Message> endlessConsumer(
+  public SimpleConsumer endlessConsumer(
       Consumer<String, Message> kafkaConsumer,
       ExecutorService executor,
-      ApplicationRebalanceListener rebalanceListener,
-      RecordHandler recordHandler,
       KafkaProperties kafkaProperties,
       ApplicationProperties applicationProperties)
   {
     return
-        new EndlessConsumer<>(
+        new SimpleConsumer(
             executor,
             kafkaProperties.getClientId(),
             applicationProperties.getTopic(),
-            kafkaConsumer,
-            rebalanceListener,
-            recordHandler);
+            kafkaConsumer);
   }
 
   @Bean
diff --git a/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java b/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java
deleted file mode 100644 (file)
index 03a14c8..0000000
+++ /dev/null
@@ -1,32 +0,0 @@
-package de.juplo.kafka;
-
-import lombok.RequiredArgsConstructor;
-import org.springframework.boot.actuate.health.Health;
-import org.springframework.boot.actuate.health.HealthIndicator;
-import org.springframework.stereotype.Component;
-
-
-@Component
-@RequiredArgsConstructor
-public class ApplicationHealthIndicator implements HealthIndicator
-{
-  private final EndlessConsumer<String, Message> consumer;
-
-
-  @Override
-  public Health health()
-  {
-    try
-    {
-      return consumer
-          .exitStatus()
-          .map(Health::down)
-          .orElse(Health.outOfService())
-          .build();
-    }
-    catch (IllegalStateException e)
-    {
-      return Health.up().build();
-    }
-  }
-}
index 005460c..d46a8b3 100644 (file)
@@ -10,7 +10,7 @@ import javax.validation.constraints.NotNull;
 import java.time.Duration;
 
 
-@ConfigurationProperties(prefix = "sumup.adder")
+@ConfigurationProperties(prefix = "simple.consumer")
 @Validated
 @Getter
 @Setter
diff --git a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java
deleted file mode 100644 (file)
index 0bfee67..0000000
+++ /dev/null
@@ -1,70 +0,0 @@
-package de.juplo.kafka;
-
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.common.TopicPartition;
-
-import java.util.*;
-
-
-@RequiredArgsConstructor
-@Slf4j
-public class ApplicationRebalanceListener implements ConsumerRebalanceListener
-{
-  private final ApplicationRecordHandler recordHandler;
-  private final AdderResults adderResults;
-  private final StateRepository stateRepository;
-  private final String id;
-
-  private final Set<Integer> partitions = new HashSet<>();
-
-  @Override
-  public void onPartitionsAssigned(Collection<TopicPartition> partitions)
-  {
-    partitions.forEach(tp ->
-    {
-      Integer partition = tp.partition();
-      log.info("{} - adding partition: {}", id, partition);
-      this.partitions.add(partition);
-      StateDocument document =
-          stateRepository
-              .findById(Integer.toString(partition))
-              .orElse(new StateDocument(partition));
-      recordHandler.addPartition(partition, document.state);
-      for (String user : document.state.keySet())
-      {
-        log.info(
-            "{} - Restored state for partition={}|user={}: {}",
-            id,
-            partition,
-            user,
-            document.state.get(user));
-      }
-      adderResults.addPartition(partition, document.results);
-    });
-  }
-
-  @Override
-  public void onPartitionsRevoked(Collection<TopicPartition> partitions)
-  {
-    partitions.forEach(tp ->
-    {
-      Integer partition = tp.partition();
-      log.info("{} - removing partition: {}", id, partition);
-      this.partitions.remove(partition);
-      Map<String, AdderResult> state = recordHandler.removePartition(partition);
-      for (String user : state.keySet())
-      {
-        log.info(
-            "{} - Saved state for partition={}|user={}: {}",
-            id,
-            partition,
-            user,
-            state.get(user));
-      }
-      Map<String, List<AdderResult>> results = adderResults.removePartition(partition);
-      stateRepository.save(new StateDocument(partition, state, results));
-    });
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java
deleted file mode 100644 (file)
index 2829157..0000000
+++ /dev/null
@@ -1,93 +0,0 @@
-package de.juplo.kafka;
-
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-
-import java.time.Duration;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Optional;
-
-
-@RequiredArgsConstructor
-@Slf4j
-public class ApplicationRecordHandler implements RecordHandler<String, Message>
-{
-  private final AdderResults results;
-  private final Optional<Duration> throttle;
-  private final String id;
-
-  private final Map<Integer, AdderBusinessLogic> state = new HashMap<>();
-
-
-  public void addNumber(
-      Integer partition,
-      String user,
-      MessageAddNumber message)
-  {
-    state.get(partition).addToSum(user, message.getNext());
-  }
-
-  public void calculateSum(
-      Integer partition,
-      String user,
-      MessageCalculateSum message)
-  {
-    AdderResult result = state.get(partition).calculate(user);
-    log.info("{} - New result for {}: {}", id, user, result);
-    results.addResults(partition, user, result);
-  }
-
-  @Override
-  public void accept(ConsumerRecord<String, Message> record)
-  {
-    Integer partition = record.partition();
-    String user = record.key();
-    Message message = record.value();
-
-    switch(message.getType())
-    {
-      case ADD:
-        addNumber(partition, user, (MessageAddNumber) message);
-        break;
-
-      case CALC:
-        calculateSum(partition, user, (MessageCalculateSum) message);
-        break;
-    }
-
-    if (throttle.isPresent())
-    {
-      try
-      {
-        Thread.sleep(throttle.get().toMillis());
-      }
-      catch (InterruptedException e)
-      {
-        log.warn("{} - Intrerrupted while throttling: {}", id, e);
-      }
-    }
-  }
-
-  protected void addPartition(Integer partition, Map<String, AdderResult> state)
-  {
-    this.state.put(partition, new AdderBusinessLogic(state));
-  }
-
-  protected Map<String, AdderResult> removePartition(Integer partition)
-  {
-    return this.state.remove(partition).getState();
-  }
-
-
-  public Map<Integer, AdderBusinessLogic> getState()
-  {
-    return state;
-  }
-
-  public AdderBusinessLogic getState(Integer partition)
-  {
-    return state.get(partition);
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/DriverController.java b/src/main/java/de/juplo/kafka/DriverController.java
deleted file mode 100644 (file)
index 26a5bc8..0000000
+++ /dev/null
@@ -1,89 +0,0 @@
-package de.juplo.kafka;
-
-import lombok.RequiredArgsConstructor;
-import org.springframework.http.HttpStatus;
-import org.springframework.http.ResponseEntity;
-import org.springframework.web.bind.annotation.*;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.ExecutionException;
-import java.util.stream.Collectors;
-
-
-@RestController
-@RequiredArgsConstructor
-public class DriverController
-{
-  private final EndlessConsumer consumer;
-  private final ApplicationRecordHandler recordHandler;
-  private final AdderResults results;
-
-
-  @PostMapping("start")
-  public void start()
-  {
-    consumer.start();
-  }
-
-  @PostMapping("stop")
-  public void stop() throws ExecutionException, InterruptedException
-  {
-    consumer.stop();
-  }
-
-
-  @GetMapping("state")
-  public Map<Integer, Map<String, AdderResult>> state()
-  {
-    return
-        recordHandler
-            .getState()
-            .entrySet()
-            .stream()
-            .collect(Collectors.toMap(
-                entry -> entry.getKey(),
-                entry -> entry.getValue().getState()));
-  }
-
-  @GetMapping("state/{user}")
-  public ResponseEntity<Long> state(@PathVariable String user)
-  {
-    for (AdderBusinessLogic adder : recordHandler.getState().values())
-    {
-      Optional<Long> sum = adder.getSum(user);
-      if (sum.isPresent())
-        return ResponseEntity.ok(sum.get());
-    }
-
-    return ResponseEntity.notFound().build();
-  }
-
-  @GetMapping("results")
-  public Map<Integer, Map<String, List<AdderResult>>> results()
-  {
-    return results.getState();
-  }
-
-  @GetMapping("results/{user}")
-  public ResponseEntity<List<AdderResult>> results(@PathVariable String user)
-  {
-    for (Map<String, List<AdderResult>> resultsByUser : this.results.getState().values())
-    {
-      List<AdderResult> results = resultsByUser.get(user);
-      if (results != null)
-        return ResponseEntity.ok(results);
-    }
-
-    return ResponseEntity.notFound().build();
-  }
-
-
-  @ExceptionHandler
-  @ResponseStatus(HttpStatus.BAD_REQUEST)
-  public ErrorResponse illegalStateException(IllegalStateException e)
-  {
-    return new ErrorResponse(e.getMessage(), HttpStatus.BAD_REQUEST.value());
-  }
-}
index 00678c4..ba8eb27 100644 (file)
@@ -6,6 +6,7 @@ import org.apache.kafka.clients.consumer.*;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.RecordDeserializationException;
 import org.apache.kafka.common.errors.WakeupException;
+import org.springframework.stereotype.Component;
 
 import javax.annotation.PreDestroy;
 import java.time.Duration;
@@ -17,9 +18,10 @@ import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
 
 
+@Component
 @Slf4j
 @RequiredArgsConstructor
-public class EndlessConsumer<K, V> implements Runnable
+public class EndlessConsumer implements Runnable
 {
   private final ExecutorService executor;
   private final String id;
diff --git a/src/main/java/de/juplo/kafka/ErrorResponse.java b/src/main/java/de/juplo/kafka/ErrorResponse.java
deleted file mode 100644 (file)
index 5ca206d..0000000
+++ /dev/null
@@ -1,11 +0,0 @@
-package de.juplo.kafka;
-
-import lombok.Value;
-
-
-@Value
-public class ErrorResponse
-{
-  private final String error;
-  private final Integer status;
-}
diff --git a/src/main/java/de/juplo/kafka/RecordHandler.java b/src/main/java/de/juplo/kafka/RecordHandler.java
deleted file mode 100644 (file)
index 327ac9f..0000000
+++ /dev/null
@@ -1,10 +0,0 @@
-package de.juplo.kafka;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-
-import java.util.function.Consumer;
-
-
-public interface RecordHandler<K, V> extends Consumer<ConsumerRecord<K,V>>
-{
-}
diff --git a/src/main/java/de/juplo/kafka/StateDocument.java b/src/main/java/de/juplo/kafka/StateDocument.java
deleted file mode 100644 (file)
index ae8eb51..0000000
+++ /dev/null
@@ -1,41 +0,0 @@
-package de.juplo.kafka;
-
-import lombok.ToString;
-import org.springframework.data.annotation.Id;
-import org.springframework.data.mongodb.core.mapping.Document;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-
-@Document(collection = "state")
-@ToString
-public class StateDocument
-{
-  @Id
-  public String id;
-  public Map<String, AdderResult> state;
-  public Map<String, List<AdderResult>> results;
-
-  public StateDocument()
-  {
-  }
-
-  public StateDocument(Integer partition)
-  {
-    this.id = Integer.toString(partition);
-    this.state = new HashMap<>();
-    this.results = new HashMap<>();
-  }
-
-  public StateDocument(
-      Integer partition,
-      Map<String, AdderResult> state,
-      Map<String, List<AdderResult>> results)
-  {
-    this.id = Integer.toString(partition);
-    this.state = state;
-    this.results = results;
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/StateRepository.java b/src/main/java/de/juplo/kafka/StateRepository.java
deleted file mode 100644 (file)
index 3129535..0000000
+++ /dev/null
@@ -1,11 +0,0 @@
-package de.juplo.kafka;
-
-import org.springframework.data.mongodb.repository.MongoRepository;
-
-import java.util.Optional;
-
-
-public interface StateRepository extends MongoRepository<StateDocument, String>
-{
-  public Optional<StateDocument> findById(String partition);
-}
index 92f3a6b..c2cb792 100644 (file)
@@ -1,6 +1,6 @@
-sumup:
-  adder:
-    topic: out
+simple:
+  consumer:
+    topic: test
 management:
   endpoint:
     shutdown:
@@ -22,10 +22,6 @@ info:
     topic: ${consumer.topic}
     auto-offset-reset: ${spring.kafka.consumer.auto-offset-reset}
 spring:
-  data:
-    mongodb:
-      uri: mongodb://juplo:training@localhost:27017
-      database: juplo
   kafka:
     bootstrap-servers: :9092
     client-id: DEV
diff --git a/src/test/java/de/juplo/kafka/AdderBusinessLogicTest.java b/src/test/java/de/juplo/kafka/AdderBusinessLogicTest.java
deleted file mode 100644 (file)
index 8e49263..0000000
+++ /dev/null
@@ -1,117 +0,0 @@
-package de.juplo.kafka;
-
-import org.junit.jupiter.api.DisplayName;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.Arguments;
-import org.junit.jupiter.params.provider.MethodSource;
-import org.junit.jupiter.params.provider.ValueSource;
-
-import java.util.Arrays;
-import java.util.stream.IntStream;
-import java.util.stream.Stream;
-
-import static org.assertj.core.api.Assertions.*;
-
-
-public class AdderBusinessLogicTest
-{
-  @Test
-  @DisplayName("An empty Optional should be returned, for a non-existing sum")
-  public void testGetSumReturnsEmptyOptionalForNonExistingSum()
-  {
-    AdderBusinessLogic adder = new AdderBusinessLogic();
-    assertThat(adder.getSum("foo")).isEmpty();
-  }
-
-  @Test
-  @DisplayName("A non-empty Optional should be returned, for an existing sum")
-  public void testGetSumReturnsNonEmptyOptionalForExistingSum()
-  {
-    AdderBusinessLogic adder = new AdderBusinessLogic();
-    adder.addToSum("foo", 6);
-    assertThat(adder.getSum("foo")).isNotEmpty();
-  }
-
-  @Test
-  @DisplayName("A sum can be calculated, if it does exist")
-  public void testCalculatePossibleIfSumExists()
-  {
-    AdderBusinessLogic adder = new AdderBusinessLogic();
-    adder.addToSum("foo", 6);
-    assertThatNoException().isThrownBy(() -> adder.calculate("foo"));
-  }
-
-  @Test
-  @DisplayName("An existing sum is removed, if ended")
-  public void testCalculateRemovesSumIfSumExists()
-  {
-    AdderBusinessLogic adder = new AdderBusinessLogic();
-    adder.addToSum("foo", 6);
-    adder.calculate("foo");
-    assertThat(adder.getSum("foo")).isEmpty();
-  }
-
-  @Test
-  @DisplayName("An existing sum returns a non-null value, if calculated")
-  public void testCalculateReturnsNonNullValueIfSumExists()
-  {
-    AdderBusinessLogic adder = new AdderBusinessLogic();
-    adder.addToSum("foo", 6);
-    assertThat(adder.calculate("foo")).isNotNull();
-  }
-
-  @Test
-  @DisplayName("Ending a non-existing sum, causes an IllegalStateException")
-  public void testCalculateCausesExceptionIfNotExists()
-  {
-    AdderBusinessLogic adder = new AdderBusinessLogic();
-    assertThatIllegalStateException().isThrownBy(() -> adder.calculate("foo"));
-  }
-
-  @Test
-  @DisplayName("Adding a null-value to a sum causes an IllegalArgumentException")
-  public void testAddToSumWithNullValueCausesException()
-  {
-    AdderBusinessLogic adder = new AdderBusinessLogic();
-    assertThatIllegalArgumentException().isThrownBy(() -> adder.addToSum("foo", null));
-  }
-
-  @ParameterizedTest(name = "{index}: Adding {0}")
-  @DisplayName("Adding a non-positive value to a sum causes an IllegalArgumentException")
-  @ValueSource(ints = { 0, -1, -6, -66, Integer.MIN_VALUE })
-  public void testAddToSumWithNonPositiveValueCausesException(int value)
-  {
-    AdderBusinessLogic adder = new AdderBusinessLogic();
-    assertThatIllegalArgumentException().isThrownBy(() -> adder.addToSum("foo", value));
-  }
-
-  @ParameterizedTest(name = "{index}: Adding {0}")
-  @DisplayName("Can add a positive value to a sum")
-  @ValueSource(ints = { 1, 3, 6, 66, 7, 9 })
-  public void testAddToSumWithPositiveValuePossible(int value)
-  {
-    AdderBusinessLogic adder = new AdderBusinessLogic();
-    assertThatNoException().isThrownBy(() -> adder.addToSum("foo", value));
-  }
-
-  @ParameterizedTest(name = "{index}: Summing up {0}")
-  @DisplayName("Adds up numbers correctly")
-  @MethodSource("numbersProvider")
-  public void testAddToSumAddsUpNumbersCorrectlyIfSumExists(int... numbers)
-  {
-    long expectedResult = Arrays.stream(numbers).sum();
-    AdderBusinessLogic adder = new AdderBusinessLogic();
-    Arrays.stream(numbers).forEach(number -> adder.addToSum("foo", number));
-    AdderResult result = adder.calculate("foo");
-    assertThat(result.number).isEqualTo(numbers[numbers.length-1]);
-    assertThat(result.sum).isEqualTo(expectedResult);
-  }
-
-  static Stream<Arguments> numbersProvider() {
-    return Stream.of(
-        Arguments.of((Object) IntStream.rangeClosed(1,9).toArray()),
-        Arguments.of((Object) IntStream.rangeClosed(1,19).toArray()),
-        Arguments.of((Object) IntStream.rangeClosed(1,66).toArray()));
-  }
-}
diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java
deleted file mode 100644 (file)
index bd9f449..0000000
+++ /dev/null
@@ -1,172 +0,0 @@
-package de.juplo.kafka;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.utils.Bytes;
-import org.springframework.beans.factory.annotation.Autowired;
-
-import java.util.*;
-import java.util.function.Consumer;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-
-@Slf4j
-public class ApplicationTests extends GenericApplicationTests<String, Message>
-{
-  @Autowired
-  StateRepository stateRepository;
-
-
-  public ApplicationTests()
-  {
-    super(new ApplicationTestRecrodGenerator());
-    ((ApplicationTestRecrodGenerator)recordGenerator).tests = this;
-  }
-
-
-  static class ApplicationTestRecrodGenerator implements RecordGenerator
-  {
-    ApplicationTests tests;
-
-    final int[] numbers = {1, 77, 33, 2, 66, 666, 11};
-    final String[] dieWilden13 =
-        IntStream
-            .range(1, 14)
-            .mapToObj(i -> "seeräuber-" + i)
-            .toArray(i -> new String[i]);
-    final StringSerializer stringSerializer = new StringSerializer();
-    final Bytes calculateMessage = new Bytes(stringSerializer.serialize(TOPIC, "{}"));
-
-    int counter = 0;
-
-    Map<String, List<AdderResult>> state;
-
-    @Override
-    public int generate(
-        boolean poisonPills,
-        boolean logicErrors,
-        Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
-    {
-      counter = 0;
-      state =
-          Arrays
-              .stream(dieWilden13)
-              .collect(Collectors.toMap(
-                  seeräuber -> seeräuber,
-                  seeräuber -> new LinkedList()));
-
-      int number[] = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 };
-      int message[] = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 };
-      int next = 0;
-
-      for (int pass = 0; pass < 333; pass++)
-      {
-        for (int i = 0; i<13; i++)
-        {
-          String seeräuber = dieWilden13[i];
-          Bytes key = new Bytes(stringSerializer.serialize(TOPIC, seeräuber));
-
-          if (message[i] > number[i])
-          {
-            send(
-              key,
-              calculateMessage,
-              Message.Type.CALC,
-              poisonPill(poisonPills, pass, counter),
-              logicError(logicErrors, pass, counter),
-              messageSender);
-            state.get(seeräuber).add(new AdderResult(number[i], (number[i] + 1) * number[i] / 2));
-            // Pick next number to calculate
-            number[i] = numbers[next++%numbers.length];
-            message[i] = 1;
-            log.debug("Seeräuber {} will die Summe für {} berechnen", seeräuber, number[i]);
-          }
-
-          send(
-            key,
-            new Bytes(stringSerializer.serialize(TOPIC, "{\"next\":" + message[i]++ + "}")),
-            Message.Type.ADD,
-            poisonPill(poisonPills, pass, counter),
-            logicError(logicErrors, pass, counter),
-            messageSender);
-        }
-      }
-
-      return counter;
-    }
-
-    boolean poisonPill (boolean poisonPills, int pass, int counter)
-    {
-      return poisonPills && pass > 300 && counter%99 == 0;
-    }
-
-    boolean logicError(boolean logicErrors, int pass, int counter)
-    {
-      return logicErrors && pass > 300 && counter%77 == 0;
-    }
-
-    void send(
-        Bytes key,
-        Bytes value,
-        Message.Type type,
-        boolean poisonPill,
-        boolean logicError,
-        Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
-    {
-      counter++;
-
-      if (logicError)
-      {
-        value = new Bytes(stringSerializer.serialize(TOPIC, "{\"next\":-1}"));
-      }
-      if (poisonPill)
-      {
-        value = new Bytes("BOOM!".getBytes());
-      }
-
-      ProducerRecord<Bytes, Bytes> record = new ProducerRecord<>(TOPIC, key, value);
-      record.headers().add("__TypeId__", type.toString().getBytes());
-      messageSender.accept(record);
-    }
-
-    @Override
-    public void assertBusinessLogic()
-    {
-      for (int i=0; i<PARTITIONS; i++)
-      {
-        StateDocument stateDocument =
-            tests.stateRepository.findById(Integer.toString(i)).get();
-
-        stateDocument
-            .results
-            .entrySet()
-            .stream()
-            .forEach(entry ->
-            {
-              String user = entry.getKey();
-              List<AdderResult> resultsForUser = entry.getValue();
-
-              for (int j=0; j < resultsForUser.size(); j++)
-              {
-                if (!(j < state.get(user).size()))
-                {
-                  break;
-                }
-
-                assertThat(resultsForUser.get(j))
-                    .as("Unexpected results calculation %d of user %s", j, user)
-                    .isEqualTo(state.get(user).get(j));
-              }
-
-              assertThat(state.get(user))
-                  .as("More results calculated for user %s as expected", user)
-                  .containsAll(resultsForUser);
-            });
-      }
-    }
-  }
-}
diff --git a/src/test/java/de/juplo/kafka/ErrorCannotBeGeneratedCondition.java b/src/test/java/de/juplo/kafka/ErrorCannotBeGeneratedCondition.java
deleted file mode 100644 (file)
index 606218f..0000000
+++ /dev/null
@@ -1,60 +0,0 @@
-package de.juplo.kafka;
-
-import org.junit.jupiter.api.extension.ConditionEvaluationResult;
-import org.junit.jupiter.api.extension.ExecutionCondition;
-import org.junit.jupiter.api.extension.ExtensionContext;
-import org.junit.platform.commons.util.AnnotationUtils;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Optional;
-import java.util.stream.Collectors;
-
-
-public class ErrorCannotBeGeneratedCondition implements ExecutionCondition
-{
-  @Override
-  public ConditionEvaluationResult evaluateExecutionCondition(ExtensionContext context)
-  {
-    final Optional<SkipWhenErrorCannotBeGenerated> optional =
-        AnnotationUtils.findAnnotation(
-            context.getElement(),
-            SkipWhenErrorCannotBeGenerated.class);
-
-    if (context.getTestInstance().isEmpty())
-      return ConditionEvaluationResult.enabled("Test-instance ist not available");
-
-    if (optional.isPresent())
-    {
-      SkipWhenErrorCannotBeGenerated skipWhenErrorCannotBeGenerated = optional.get();
-      GenericApplicationTests instance = (GenericApplicationTests)context.getTestInstance().get();
-      List<String> missingRequiredErrors = new LinkedList<>();
-
-      if (skipWhenErrorCannotBeGenerated.poisonPill() && !instance.recordGenerator.canGeneratePoisonPill())
-        missingRequiredErrors.add("Poison-Pill");
-
-      if (skipWhenErrorCannotBeGenerated.logicError() && !instance.recordGenerator.canGenerateLogicError())
-        missingRequiredErrors.add("Logic-Error");
-
-      StringBuilder builder = new StringBuilder();
-      builder.append(context.getTestClass().get().getSimpleName());
-
-      if (missingRequiredErrors.isEmpty())
-      {
-        builder.append(" can generate all required types of errors");
-        return ConditionEvaluationResult.enabled(builder.toString());
-      }
-
-      builder.append(" cannot generate the required error(s): ");
-      builder.append(
-          missingRequiredErrors
-              .stream()
-              .collect(Collectors.joining(", ")));
-
-      return ConditionEvaluationResult.disabled(builder.toString());
-    }
-
-    return ConditionEvaluationResult.enabled(
-        "Not annotated with " + SkipWhenErrorCannotBeGenerated.class.getSimpleName());
-  }
-}
diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java
deleted file mode 100644 (file)
index 937b40f..0000000
+++ /dev/null
@@ -1,396 +0,0 @@
-package de.juplo.kafka;
-
-import com.mongodb.client.MongoClient;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.RecordDeserializationException;
-import org.apache.kafka.common.serialization.*;
-import org.apache.kafka.common.utils.Bytes;
-import org.junit.jupiter.api.*;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
-import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration;
-import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
-import org.springframework.boot.autoconfigure.mongo.MongoProperties;
-import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo;
-import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer;
-import org.springframework.boot.test.context.TestConfiguration;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Import;
-import org.springframework.kafka.test.context.EmbeddedKafka;
-import org.springframework.test.context.TestPropertySource;
-import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
-
-import java.time.Duration;
-import java.util.*;
-import java.util.function.BiConsumer;
-import java.util.function.Consumer;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-import static de.juplo.kafka.GenericApplicationTests.PARTITIONS;
-import static de.juplo.kafka.GenericApplicationTests.TOPIC;
-import static org.assertj.core.api.Assertions.*;
-import static org.awaitility.Awaitility.*;
-
-
-@SpringJUnitConfig(
-  initializers = ConfigDataApplicationContextInitializer.class,
-  classes = {
-      KafkaAutoConfiguration.class,
-      ApplicationTests.Configuration.class })
-@TestPropertySource(
-               properties = {
-                               "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
-                               "sumup.adder.topic=" + TOPIC,
-                               "spring.kafka.consumer.auto-commit-interval=500ms",
-                               "spring.mongodb.embedded.version=4.4.13" })
-@EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
-@EnableAutoConfiguration
-@AutoConfigureDataMongo
-@Slf4j
-abstract class GenericApplicationTests<K, V>
-{
-       public static final String TOPIC = "FOO";
-       public static final int PARTITIONS = 10;
-
-
-       @Autowired
-       org.apache.kafka.clients.consumer.Consumer<K, V> kafkaConsumer;
-       @Autowired
-       KafkaProperties kafkaProperties;
-       @Autowired
-       ApplicationProperties applicationProperties;
-       @Autowired
-       MongoClient mongoClient;
-       @Autowired
-       MongoProperties mongoProperties;
-       @Autowired
-       TestRecordHandler<K, V> recordHandler;
-       @Autowired
-       EndlessConsumer<K, V> endlessConsumer;
-
-       KafkaProducer<Bytes, Bytes> testRecordProducer;
-       KafkaConsumer<Bytes, Bytes> offsetConsumer;
-       Map<TopicPartition, Long> oldOffsets;
-
-
-       final RecordGenerator recordGenerator;
-       final Consumer<ProducerRecord<Bytes, Bytes>> messageSender;
-
-       public GenericApplicationTests(RecordGenerator recordGenerator)
-       {
-               this.recordGenerator = recordGenerator;
-               this.messageSender = (record) -> sendMessage(record);
-       }
-
-
-       /** Tests methods */
-
-       @Test
-       void commitsCurrentOffsetsOnSuccess() throws Exception
-       {
-               int numberOfGeneratedMessages =
-                               recordGenerator.generate(false, false, messageSender);
-
-               await(numberOfGeneratedMessages + " records received")
-                               .atMost(Duration.ofSeconds(30))
-                               .pollInterval(Duration.ofSeconds(1))
-                               .until(() -> recordHandler.receivedRecords.size() >= numberOfGeneratedMessages);
-
-               await("Offsets committed")
-                               .atMost(Duration.ofSeconds(10))
-                               .pollInterval(Duration.ofSeconds(1))
-                               .untilAsserted(() ->
-                               {
-                                       checkSeenOffsetsForProgress();
-                                       assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets);
-                               });
-
-               assertThatExceptionOfType(IllegalStateException.class)
-                               .isThrownBy(() -> endlessConsumer.exitStatus())
-                               .describedAs("Consumer should still be running");
-
-               endlessConsumer.stop();
-               recordGenerator.assertBusinessLogic();
-       }
-
-       @Test
-       @SkipWhenErrorCannotBeGenerated(poisonPill = true)
-       void commitsOffsetOfErrorForReprocessingOnDeserializationError()
-       {
-               int numberOfGeneratedMessages =
-                               recordGenerator.generate(true, false, messageSender);
-
-               await("Consumer failed")
-                               .atMost(Duration.ofSeconds(30))
-                               .pollInterval(Duration.ofSeconds(1))
-                               .until(() -> !endlessConsumer.running());
-
-               checkSeenOffsetsForProgress();
-               assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets);
-
-               endlessConsumer.start();
-               await("Consumer failed")
-                               .atMost(Duration.ofSeconds(30))
-                               .pollInterval(Duration.ofSeconds(1))
-                               .until(() -> !endlessConsumer.running());
-
-               checkSeenOffsetsForProgress();
-               assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets);
-               assertThat(recordHandler.receivedRecords.size())
-                               .describedAs("Received not all sent events")
-                               .isLessThan(numberOfGeneratedMessages);
-
-               assertThatNoException()
-                               .describedAs("Consumer should not be running")
-                               .isThrownBy(() -> endlessConsumer.exitStatus());
-               assertThat(endlessConsumer.exitStatus())
-                               .describedAs("Consumer should have exited abnormally")
-                               .containsInstanceOf(RecordDeserializationException.class);
-
-               recordGenerator.assertBusinessLogic();
-       }
-
-       @Test
-       @SkipWhenErrorCannotBeGenerated(logicError = true)
-       void doesNotCommitOffsetsOnLogicError()
-       {
-               int numberOfGeneratedMessages =
-                               recordGenerator.generate(false, true, messageSender);
-
-               await("Consumer failed")
-                               .atMost(Duration.ofSeconds(30))
-                               .pollInterval(Duration.ofSeconds(1))
-                               .until(() -> !endlessConsumer.running());
-
-               checkSeenOffsetsForProgress();
-               assertSeenOffsetsAreBehindCommittedOffsets(recordHandler.seenOffsets);
-
-               endlessConsumer.start();
-               await("Consumer failed")
-                               .atMost(Duration.ofSeconds(30))
-                               .pollInterval(Duration.ofSeconds(1))
-                               .until(() -> !endlessConsumer.running());
-
-               assertSeenOffsetsAreBehindCommittedOffsets(recordHandler.seenOffsets);
-
-               assertThatNoException()
-                               .describedAs("Consumer should not be running")
-                               .isThrownBy(() -> endlessConsumer.exitStatus());
-               assertThat(endlessConsumer.exitStatus())
-                               .describedAs("Consumer should have exited abnormally")
-                               .containsInstanceOf(RuntimeException.class);
-
-               recordGenerator.assertBusinessLogic();
-       }
-
-
-       /** Helper methods for the verification of expectations */
-
-       void assertSeenOffsetsEqualCommittedOffsets(Map<TopicPartition, Long> offsetsToCheck)
-       {
-               doForCurrentOffsets((tp, offset) ->
-               {
-                       Long expected = offsetsToCheck.get(tp) + 1;
-                       log.debug("Checking, if the offset {} for {} is exactly {}", offset, tp, expected);
-                       assertThat(offset)
-                                       .describedAs("Committed offset corresponds to the offset of the consumer")
-                                       .isEqualTo(expected);
-               });
-       }
-
-       void assertSeenOffsetsAreBehindCommittedOffsets(Map<TopicPartition, Long> offsetsToCheck)
-       {
-               List<Boolean> isOffsetBehindSeen = new LinkedList<>();
-
-               doForCurrentOffsets((tp, offset) ->
-               {
-                       Long expected = offsetsToCheck.get(tp) + 1;
-                       log.debug("Checking, if the offset {} for {} is at most {}", offset, tp, expected);
-                       assertThat(offset)
-                                       .describedAs("Committed offset must be at most equal to the offset of the consumer")
-                                       .isLessThanOrEqualTo(expected);
-                       isOffsetBehindSeen.add(offset < expected);
-               });
-
-               assertThat(isOffsetBehindSeen.stream().reduce(false, (result, next) -> result | next))
-                               .describedAs("Committed offsets are behind seen offsets")
-                               .isTrue();
-       }
-
-       void checkSeenOffsetsForProgress()
-       {
-               // Be sure, that some messages were consumed...!
-               Set<TopicPartition> withProgress = new HashSet<>();
-               partitions().forEach(tp ->
-               {
-                       Long oldOffset = oldOffsets.get(tp) + 1;
-                       Long newOffset = recordHandler.seenOffsets.get(tp) + 1;
-                       if (!oldOffset.equals(newOffset))
-                       {
-                               log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset);
-                               withProgress.add(tp);
-                       }
-               });
-               assertThat(withProgress)
-                               .describedAs("Some offsets must have changed, compared to the old offset-positions")
-                               .isNotEmpty();
-       }
-
-
-       /** Helper methods for setting up and running the tests */
-
-       void seekToEnd()
-       {
-               offsetConsumer.assign(partitions());
-               offsetConsumer.seekToEnd(partitions());
-               partitions().forEach(tp ->
-               {
-                       // seekToEnd() works lazily: it only takes effect on poll()/position()
-                       Long offset = offsetConsumer.position(tp);
-                       log.info("New position for {}: {}", tp, offset);
-               });
-               // The new positions must be commited!
-               offsetConsumer.commitSync();
-               offsetConsumer.unsubscribe();
-       }
-
-       void doForCurrentOffsets(BiConsumer<TopicPartition, Long> consumer)
-       {
-               offsetConsumer.assign(partitions());
-               partitions().forEach(tp -> consumer.accept(tp, offsetConsumer.position(tp)));
-               offsetConsumer.unsubscribe();
-       }
-
-       List<TopicPartition> partitions()
-       {
-               return
-                               IntStream
-                                               .range(0, PARTITIONS)
-                                               .mapToObj(partition -> new TopicPartition(TOPIC, partition))
-                                               .collect(Collectors.toList());
-       }
-
-
-       public interface RecordGenerator
-       {
-               int generate(
-                               boolean poisonPills,
-                               boolean logicErrors,
-                               Consumer<ProducerRecord<Bytes, Bytes>> messageSender);
-
-               default boolean canGeneratePoisonPill()
-               {
-                       return true;
-               }
-
-               default boolean canGenerateLogicError()
-               {
-                       return true;
-               }
-
-               default void assertBusinessLogic()
-               {
-                       log.debug("No business-logic to assert");
-               }
-       }
-
-       void sendMessage(ProducerRecord<Bytes, Bytes> record)
-       {
-               testRecordProducer.send(record, (metadata, e) ->
-               {
-                       if (metadata != null)
-                       {
-                               log.debug(
-                                               "{}|{} - {}={}",
-                                               metadata.partition(),
-                                               metadata.offset(),
-                                               record.key(),
-                                               record.value());
-                       }
-                       else
-                       {
-                               log.warn(
-                                               "Exception for {}={}: {}",
-                                               record.key(),
-                                               record.value(),
-                                               e.toString());
-                       }
-               });
-       }
-
-
-       @BeforeEach
-       public void init()
-       {
-               Properties props;
-               props = new Properties();
-               props.put("bootstrap.servers", kafkaProperties.getBootstrapServers());
-               props.put("linger.ms", 100);
-               props.put("key.serializer", BytesSerializer.class.getName());
-               props.put("value.serializer", BytesSerializer.class.getName());
-               testRecordProducer = new KafkaProducer<>(props);
-
-               props = new Properties();
-               props.put("bootstrap.servers", kafkaProperties.getBootstrapServers());
-               props.put("client.id", "OFFSET-CONSUMER");
-               props.put("group.id", kafkaProperties.getConsumer().getGroupId());
-               props.put("key.deserializer", BytesDeserializer.class.getName());
-               props.put("value.deserializer", BytesDeserializer.class.getName());
-               offsetConsumer = new KafkaConsumer<>(props);
-
-               mongoClient.getDatabase(mongoProperties.getDatabase()).drop();
-               seekToEnd();
-
-               oldOffsets = new HashMap<>();
-               recordHandler.seenOffsets = new HashMap<>();
-               recordHandler.receivedRecords = new HashSet<>();
-
-               doForCurrentOffsets((tp, offset) ->
-               {
-                       oldOffsets.put(tp, offset - 1);
-                       recordHandler.seenOffsets.put(tp, offset - 1);
-               });
-
-               endlessConsumer.start();
-       }
-
-       @AfterEach
-       public void deinit()
-       {
-               try
-               {
-                       endlessConsumer.stop();
-               }
-               catch (Exception e)
-               {
-                       log.debug("{}", e.toString());
-               }
-
-               try
-               {
-                       testRecordProducer.close();
-                       offsetConsumer.close();
-               }
-               catch (Exception e)
-               {
-                       log.info("Exception while stopping the consumer: {}", e.toString());
-               }
-       }
-
-
-       @TestConfiguration
-       @Import(ApplicationConfiguration.class)
-       public static class Configuration
-       {
-               @Bean
-               public RecordHandler recordHandler(RecordHandler applicationRecordHandler)
-               {
-                       return new TestRecordHandler(applicationRecordHandler);
-               }
-       }
-}
diff --git a/src/test/java/de/juplo/kafka/SkipWhenErrorCannotBeGenerated.java b/src/test/java/de/juplo/kafka/SkipWhenErrorCannotBeGenerated.java
deleted file mode 100644 (file)
index 6d15e9e..0000000
+++ /dev/null
@@ -1,15 +0,0 @@
-package de.juplo.kafka;
-
-import org.junit.jupiter.api.extension.ExtendWith;
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-
-
-@Retention(RetentionPolicy.RUNTIME)
-@ExtendWith(ErrorCannotBeGeneratedCondition.class)
-public @interface SkipWhenErrorCannotBeGenerated
-{
-  boolean poisonPill() default false;
-  boolean logicError() default false;
-}
diff --git a/src/test/java/de/juplo/kafka/TestRecordHandler.java b/src/test/java/de/juplo/kafka/TestRecordHandler.java
deleted file mode 100644 (file)
index 37d3f65..0000000
+++ /dev/null
@@ -1,34 +0,0 @@
-package de.juplo.kafka;
-
-import lombok.RequiredArgsConstructor;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.TopicPartition;
-
-import java.util.Map;
-import java.util.Set;
-
-
-@RequiredArgsConstructor
-public class TestRecordHandler<K, V> implements RecordHandler<K, V>
-{
-  private final RecordHandler<K, V> handler;
-
-  Map<TopicPartition, Long> seenOffsets;
-  Set<ConsumerRecord<K, V>> receivedRecords;
-
-
-  public void onNewRecord(ConsumerRecord<K, V> record)
-  {
-    seenOffsets.put(
-      new TopicPartition(record.topic(), record.partition()),
-      record.offset());
-    receivedRecords.add(record);
-  }
-
-  @Override
-  public void accept(ConsumerRecord<K, V> record)
-  {
-    this.onNewRecord(record);
-    handler.accept(record);
-  }
-}