Vereinfachte Version der auf Spring Kafka basierenden Implementierung sumup-adder--springified sumup-adder--springified---lvm-2-tage
authorKai Moritz <kai@juplo.de>
Sun, 11 Sep 2022 17:58:02 +0000 (19:58 +0200)
committerKai Moritz <kai@juplo.de>
Fri, 16 Sep 2022 16:09:21 +0000 (18:09 +0200)
17 files changed:
docker-compose.yml
src/main/java/de/juplo/kafka/Application.java
src/main/java/de/juplo/kafka/ApplicationConfiguration.java [deleted file]
src/main/java/de/juplo/kafka/ApplicationController.java [new file with mode: 0644]
src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java
src/main/java/de/juplo/kafka/ApplicationRecordHandler.java
src/main/java/de/juplo/kafka/DriverController.java [deleted file]
src/main/java/de/juplo/kafka/EndlessConsumer.java [deleted file]
src/main/java/de/juplo/kafka/ErrorResponse.java [deleted file]
src/main/java/de/juplo/kafka/RecordHandler.java [deleted file]
src/main/resources/application.yml
src/test/java/de/juplo/kafka/ApplicationTests.java [deleted file]
src/test/java/de/juplo/kafka/DeadLetterTopicConsumer.java [deleted file]
src/test/java/de/juplo/kafka/ErrorCannotBeGeneratedCondition.java [deleted file]
src/test/java/de/juplo/kafka/GenericApplicationTests.java [deleted file]
src/test/java/de/juplo/kafka/SkipWhenErrorCannotBeGenerated.java [deleted file]
src/test/java/de/juplo/kafka/TestRecordHandler.java [deleted file]

index 9850ce3..a3da553 100644 (file)
@@ -139,7 +139,7 @@ services:
       sumup.adder.throttle: 3ms
       spring.data.mongodb.uri: mongodb://juplo:training@mongo:27017
       spring.data.mongodb.database: juplo
-      logging.level.org.apache.kafka.clients.consumer: DEBUG
+      logging.level.org.apache.kafka.clients.consumer: INFO
 
   adder-2:
     image: juplo/sumup-adder-springified:1.0-SNAPSHOT
@@ -154,7 +154,7 @@ services:
       sumup.adder.throttle: 3ms
       spring.data.mongodb.uri: mongodb://juplo:training@mongo:27017
       spring.data.mongodb.database: juplo
-      logging.level.org.apache.kafka.clients.consumer: DEBUG
+      logging.level.org.apache.kafka.clients.consumer: INFO
 
   peter:
     image: juplo/toolbox
index a4d9aeb..69a9712 100644 (file)
 package de.juplo.kafka;
 
 import lombok.extern.slf4j.Slf4j;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.ApplicationArguments;
-import org.springframework.boot.ApplicationRunner;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.springframework.boot.SpringApplication;
 import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.kafka.annotation.EnableKafka;
+import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
+import org.springframework.kafka.core.KafkaOperations;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.core.ProducerFactory;
+import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
+import org.springframework.kafka.listener.DefaultErrorHandler;
+import org.springframework.kafka.support.serializer.DelegatingByTypeSerializer;
+import org.springframework.kafka.support.serializer.JsonSerializer;
+import org.springframework.util.backoff.FixedBackOff;
 
-import javax.annotation.PreDestroy;
+import java.util.Map;
+import java.util.Optional;
 
 
 @SpringBootApplication
 @Slf4j
-public class Application implements ApplicationRunner
+@EnableConfigurationProperties({ KafkaProperties.class, ApplicationProperties.class })
+@EnableKafka
+public class Application
 {
-  @Autowired
-  EndlessConsumer endlessConsumer;
+  @Bean
+  public ApplicationRecordHandler applicationRecordHandler(
+      AdderResults adderResults,
+      KafkaProperties kafkaProperties,
+      ApplicationProperties applicationProperties)
+  {
+    return new ApplicationRecordHandler(
+        adderResults,
+        Optional.ofNullable(applicationProperties.getThrottle()),
+        kafkaProperties.getConsumer().getGroupId());
+  }
+
+  @Bean
+  public AdderResults adderResults()
+  {
+    return new AdderResults();
+  }
+
+  @Bean
+  public ApplicationRebalanceListener rebalanceListener(
+      ApplicationRecordHandler recordHandler,
+      AdderResults adderResults,
+      StateRepository stateRepository,
+      KafkaProperties kafkaProperties)
+  {
+    return new ApplicationRebalanceListener(
+        recordHandler,
+        adderResults,
+        stateRepository,
+        kafkaProperties.getConsumer().getGroupId());
+  }
 
+  @Bean
+  ApplicationHealthIndicator applicationHealthIndicator(
+      KafkaListenerEndpointRegistry registry,
+      KafkaProperties properties)
+  {
+    return new ApplicationHealthIndicator(
+        properties.getConsumer().getGroupId(),
+        registry);
+  }
+
+  @Bean
+  public ProducerFactory<String, Object> producerFactory(
+      KafkaProperties properties)
+  {
+    return new DefaultKafkaProducerFactory<>(
+        properties.getProducer().buildProperties(),
+        new StringSerializer(),
+        new DelegatingByTypeSerializer(
+            Map.of(
+                byte[].class, new ByteArraySerializer(),
+                MessageAddNumber.class, new JsonSerializer<>(),
+                MessageCalculateSum.class, new JsonSerializer<>())));
+  }
+
+  @Bean
+  public KafkaTemplate<String, Object> kafkaTemplate(
+      ProducerFactory<String, Object> producerFactory)
+  {
+    return new KafkaTemplate<>(producerFactory);
+  }
 
-  @Override
-  public void run(ApplicationArguments args) throws Exception
+  @Bean
+  public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(
+      KafkaOperations<?, ?> kafkaTemplate)
   {
-    log.info("Starting EndlessConsumer");
-    endlessConsumer.start();
+    return new DeadLetterPublishingRecoverer(kafkaTemplate);
   }
 
-  @PreDestroy
-  public void shutdown()
+  @Bean
+  public DefaultErrorHandler errorHandler(
+      DeadLetterPublishingRecoverer recoverer)
   {
-    try
-    {
-      log.info("Stopping EndlessConsumer");
-      endlessConsumer.stop();
-    }
-    catch (IllegalStateException e)
-    {
-      log.info("Was already stopped: {}", e.toString());
-    }
-    catch (Exception e)
-    {
-      log.error("Unexpected exception while stopping EndlessConsumer: {}", e);
-    }
+    return new DefaultErrorHandler(
+        recoverer,
+        new FixedBackOff(0l, 0l));
   }
 
 
diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java
deleted file mode 100644 (file)
index b5f6187..0000000
+++ /dev/null
@@ -1,110 +0,0 @@
-package de.juplo.kafka;
-
-import org.apache.kafka.common.serialization.ByteArraySerializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
-import org.springframework.boot.context.properties.EnableConfigurationProperties;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-
-import java.util.Map;
-import java.util.Optional;
-
-import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
-import org.springframework.kafka.core.DefaultKafkaProducerFactory;
-import org.springframework.kafka.core.KafkaOperations;
-import org.springframework.kafka.core.KafkaTemplate;
-import org.springframework.kafka.core.ProducerFactory;
-import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
-import org.springframework.kafka.listener.DefaultErrorHandler;
-import org.springframework.kafka.support.serializer.DelegatingByTypeSerializer;
-import org.springframework.kafka.support.serializer.JsonSerializer;
-import org.springframework.util.backoff.FixedBackOff;
-
-
-@Configuration
-@EnableConfigurationProperties({ KafkaProperties.class, ApplicationProperties.class })
-public class ApplicationConfiguration
-{
-  @Bean
-  public ApplicationRecordHandler applicationRecordHandler(
-      AdderResults adderResults,
-      KafkaProperties kafkaProperties,
-      ApplicationProperties applicationProperties)
-  {
-    return new ApplicationRecordHandler(
-        adderResults,
-        Optional.ofNullable(applicationProperties.getThrottle()),
-        kafkaProperties.getClientId());
-  }
-
-  @Bean
-  public AdderResults adderResults()
-  {
-    return new AdderResults();
-  }
-
-  @Bean
-  public ApplicationRebalanceListener rebalanceListener(
-      ApplicationRecordHandler recordHandler,
-      AdderResults adderResults,
-      StateRepository stateRepository,
-      KafkaProperties kafkaProperties)
-  {
-    return new ApplicationRebalanceListener(
-        recordHandler,
-        adderResults,
-        stateRepository,
-        kafkaProperties.getClientId());
-  }
-
-  @Bean
-  public EndlessConsumer endlessConsumer(
-      RecordHandler recordHandler,
-      KafkaProperties kafkaProperties,
-      KafkaListenerEndpointRegistry endpointRegistry)
-  {
-    return
-        new EndlessConsumer(
-            kafkaProperties.getClientId(),
-            endpointRegistry,
-            recordHandler);
-  }
-
-  @Bean
-  public ProducerFactory<String, Object> producerFactory(
-      KafkaProperties properties)
-  {
-    return new DefaultKafkaProducerFactory<>(
-        properties.getProducer().buildProperties(),
-        new StringSerializer(),
-        new DelegatingByTypeSerializer(
-            Map.of(
-                byte[].class, new ByteArraySerializer(),
-                MessageAddNumber.class, new JsonSerializer<>(),
-                MessageCalculateSum.class, new JsonSerializer<>())));
-  }
-
-  @Bean
-  public KafkaTemplate<String, Object> kafkaTemplate(
-      ProducerFactory<String, Object> producerFactory)
-  {
-    return new KafkaTemplate<>(producerFactory);
-  }
-
-  @Bean
-  public DeadLetterPublishingRecoverer deadLetterPublishingRecoverer(
-      KafkaOperations<?, ?> kafkaTemplate)
-  {
-    return new DeadLetterPublishingRecoverer(kafkaTemplate);
-  }
-
-  @Bean
-  public DefaultErrorHandler errorHandler(
-      DeadLetterPublishingRecoverer recoverer)
-  {
-    return new DefaultErrorHandler(
-        recoverer,
-        new FixedBackOff(0l, 0l));
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/ApplicationController.java b/src/main/java/de/juplo/kafka/ApplicationController.java
new file mode 100644 (file)
index 0000000..0a9890c
--- /dev/null
@@ -0,0 +1,39 @@
+package de.juplo.kafka;
+
+import lombok.RequiredArgsConstructor;
+import org.springframework.data.mongodb.core.aggregation.ArithmeticOperators;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.*;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+
+@RestController
+@RequiredArgsConstructor
+public class ApplicationController
+{
+  private final AdderResults results;
+
+
+  @GetMapping("results")
+  public Map<Integer, Map<String, List<AdderResult>>> results()
+  {
+    return results.getState();
+  }
+
+  @GetMapping("results/{user}")
+  public ResponseEntity<List<AdderResult>> results(@PathVariable String user)
+  {
+    for (Map<String, List<AdderResult>> resultsByUser : this.results.getState().values())
+    {
+      List<AdderResult> results = resultsByUser.get(user);
+      if (results != null)
+        return ResponseEntity.ok(results);
+    }
+
+    return ResponseEntity.notFound().build();
+  }
+}
index e215c69..0466df4 100644 (file)
@@ -3,20 +3,20 @@ package de.juplo.kafka;
 import lombok.RequiredArgsConstructor;
 import org.springframework.boot.actuate.health.Health;
 import org.springframework.boot.actuate.health.HealthIndicator;
-import org.springframework.stereotype.Component;
+import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
 
 
-@Component
 @RequiredArgsConstructor
 public class ApplicationHealthIndicator implements HealthIndicator
 {
-  private final EndlessConsumer consumer;
+  private final String id;
+  private final KafkaListenerEndpointRegistry registry;
 
 
   @Override
   public Health health()
   {
-    return consumer.running()
+    return registry.getListenerContainer(id).isRunning()
         ? Health.up().build()
         : Health.down().build();
   }
index f4d3671..2075781 100644 (file)
@@ -2,6 +2,11 @@ package de.juplo.kafka;
 
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.springframework.kafka.annotation.KafkaHandler;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.support.KafkaHeaders;
+import org.springframework.messaging.handler.annotation.Header;
+import org.springframework.messaging.handler.annotation.Payload;
 
 import java.time.Duration;
 import java.util.HashMap;
@@ -11,7 +16,10 @@ import java.util.Optional;
 
 @RequiredArgsConstructor
 @Slf4j
-public class ApplicationRecordHandler implements RecordHandler
+@KafkaListener(
+    id = "${spring.kafka.consumer.group-id}",
+    topics = "${sumup.adder.topic}")
+public class ApplicationRecordHandler
 {
   private final AdderResults results;
   private final Optional<Duration> throttle;
@@ -20,24 +28,27 @@ public class ApplicationRecordHandler implements RecordHandler
   private final Map<Integer, AdderBusinessLogic> state = new HashMap<>();
 
 
-  @Override
+  @KafkaHandler
   public void addNumber(
-      String topic,
+      @Header(KafkaHeaders.RECEIVED_PARTITION_ID)
       Integer partition,
-      Long offset,
+      @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY)
       String user,
+      @Payload
       MessageAddNumber message)
   {
+    log.debug("{} - Received {} for {} on {}", id, message, user, partition);
     state.get(partition).addToSum(user, message.getNext());
     throttle();
   }
 
-  @Override
+  @KafkaHandler
   public void calculateSum(
-      String topic,
+      @Header(KafkaHeaders.RECEIVED_PARTITION_ID)
       Integer partition,
-      Long offset,
+      @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY)
       String user,
+      @Payload
       MessageCalculateSum message)
   {
     AdderResult result = state.get(partition).calculate(user);
@@ -70,15 +81,4 @@ public class ApplicationRecordHandler implements RecordHandler
   {
     return this.state.remove(partition).getState();
   }
-
-
-  public Map<Integer, AdderBusinessLogic> getState()
-  {
-    return state;
-  }
-
-  public AdderBusinessLogic getState(Integer partition)
-  {
-    return state.get(partition);
-  }
 }
diff --git a/src/main/java/de/juplo/kafka/DriverController.java b/src/main/java/de/juplo/kafka/DriverController.java
deleted file mode 100644 (file)
index 26a5bc8..0000000
+++ /dev/null
@@ -1,89 +0,0 @@
-package de.juplo.kafka;
-
-import lombok.RequiredArgsConstructor;
-import org.springframework.http.HttpStatus;
-import org.springframework.http.ResponseEntity;
-import org.springframework.web.bind.annotation.*;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.concurrent.ExecutionException;
-import java.util.stream.Collectors;
-
-
-@RestController
-@RequiredArgsConstructor
-public class DriverController
-{
-  private final EndlessConsumer consumer;
-  private final ApplicationRecordHandler recordHandler;
-  private final AdderResults results;
-
-
-  @PostMapping("start")
-  public void start()
-  {
-    consumer.start();
-  }
-
-  @PostMapping("stop")
-  public void stop() throws ExecutionException, InterruptedException
-  {
-    consumer.stop();
-  }
-
-
-  @GetMapping("state")
-  public Map<Integer, Map<String, AdderResult>> state()
-  {
-    return
-        recordHandler
-            .getState()
-            .entrySet()
-            .stream()
-            .collect(Collectors.toMap(
-                entry -> entry.getKey(),
-                entry -> entry.getValue().getState()));
-  }
-
-  @GetMapping("state/{user}")
-  public ResponseEntity<Long> state(@PathVariable String user)
-  {
-    for (AdderBusinessLogic adder : recordHandler.getState().values())
-    {
-      Optional<Long> sum = adder.getSum(user);
-      if (sum.isPresent())
-        return ResponseEntity.ok(sum.get());
-    }
-
-    return ResponseEntity.notFound().build();
-  }
-
-  @GetMapping("results")
-  public Map<Integer, Map<String, List<AdderResult>>> results()
-  {
-    return results.getState();
-  }
-
-  @GetMapping("results/{user}")
-  public ResponseEntity<List<AdderResult>> results(@PathVariable String user)
-  {
-    for (Map<String, List<AdderResult>> resultsByUser : this.results.getState().values())
-    {
-      List<AdderResult> results = resultsByUser.get(user);
-      if (results != null)
-        return ResponseEntity.ok(results);
-    }
-
-    return ResponseEntity.notFound().build();
-  }
-
-
-  @ExceptionHandler
-  @ResponseStatus(HttpStatus.BAD_REQUEST)
-  public ErrorResponse illegalStateException(IllegalStateException e)
-  {
-    return new ErrorResponse(e.getMessage(), HttpStatus.BAD_REQUEST.value());
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java
deleted file mode 100644 (file)
index 27c1e44..0000000
+++ /dev/null
@@ -1,102 +0,0 @@
-package de.juplo.kafka;
-
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.springframework.kafka.annotation.KafkaHandler;
-import org.springframework.kafka.annotation.KafkaListener;
-import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
-import org.springframework.kafka.support.KafkaHeaders;
-import org.springframework.messaging.handler.annotation.Header;
-import org.springframework.messaging.handler.annotation.Payload;
-
-import java.util.List;
-import java.util.Optional;
-
-
-@RequiredArgsConstructor
-@Slf4j
-@KafkaListener(
-    id = "${spring.kafka.client-id}",
-    idIsGroup = false,
-    topics = "${sumup.adder.topic}",
-    autoStartup = "false")
-public class EndlessConsumer
-{
-  private final String id;
-  private final KafkaListenerEndpointRegistry registry;
-  private final RecordHandler recordHandler;
-
-  private long consumed = 0;
-
-
-  @KafkaHandler
-  public void addNumber(
-    @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
-    @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
-    @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition,
-    @Header(KafkaHeaders.OFFSET) Long offset,
-    @Payload MessageAddNumber message)
-  {
-          log.info(
-              "{} - {}: {}/{} - {}={}",
-              id,
-              offset,
-              topic,
-              partition,
-              key,
-              message
-          );
-
-          recordHandler.addNumber(topic, partition, offset, key, message);
-
-          consumed++;
-  }
-
-  @KafkaHandler
-  public void calculateSum(
-    @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
-    @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
-    @Header(KafkaHeaders.RECEIVED_PARTITION_ID) Integer partition,
-    @Header(KafkaHeaders.OFFSET) Long offset,
-    @Payload MessageCalculateSum message)
-  {
-          log.info(
-              "{} - {}: {}/{} - {}={}",
-              id,
-              offset,
-              topic,
-              partition,
-              key,
-              message
-          );
-
-          recordHandler.calculateSum(topic, partition, offset, key, message);
-
-          consumed++;
-  }
-
-  public void start()
-  {
-    if (running())
-      throw new IllegalStateException("Consumer instance " + id + " is already running!");
-
-    log.info("{} - Starting - consumed {} messages before", id, consumed);
-    registry.getListenerContainer(id).start();
-  }
-
-  public void stop()
-  {
-    if (!running())
-      throw new IllegalStateException("Consumer instance " + id + " is not running!");
-
-    log.info("{} - Stopping", id);
-    registry.getListenerContainer(id).stop();
-    log.info("{} - Stopped - consumed {} messages so far", id, consumed);
-  }
-
-  public boolean running()
-  {
-    return registry.getListenerContainer(id).isRunning();
-  }
-}
diff --git a/src/main/java/de/juplo/kafka/ErrorResponse.java b/src/main/java/de/juplo/kafka/ErrorResponse.java
deleted file mode 100644 (file)
index 5ca206d..0000000
+++ /dev/null
@@ -1,11 +0,0 @@
-package de.juplo.kafka;
-
-import lombok.Value;
-
-
-@Value
-public class ErrorResponse
-{
-  private final String error;
-  private final Integer status;
-}
diff --git a/src/main/java/de/juplo/kafka/RecordHandler.java b/src/main/java/de/juplo/kafka/RecordHandler.java
deleted file mode 100644 (file)
index 47f984e..0000000
+++ /dev/null
@@ -1,22 +0,0 @@
-package de.juplo.kafka;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-
-import java.util.function.Consumer;
-
-
-public interface RecordHandler
-{
-  void addNumber(
-      String topic,
-      Integer partition,
-      Long offset,
-      String user,
-      MessageAddNumber message);
-  void calculateSum(
-      String topic,
-      Integer partition,
-      Long offset,
-      String user,
-      MessageCalculateSum message);
-}
index 2f6f859..a95e976 100644 (file)
@@ -14,13 +14,6 @@ management:
       enabled: true
     java:
       enabled: true
-info:
-  kafka:
-    bootstrap-server: ${spring.kafka.consumer.bootstrap-servers}
-    client-id: ${spring.kafka.consumer.client-id}
-    group-id: ${spring.kafka.consumer.group-id}
-    topic: ${consumer.topic}
-    auto-offset-reset: ${spring.kafka.consumer.auto-offset-reset}
 spring:
   data:
     mongodb:
@@ -28,11 +21,8 @@ spring:
       database: juplo
   kafka:
     bootstrap-servers: :9092
-    client-id: DEV
     consumer:
       group-id: my-group
-      auto-offset-reset: earliest
-      auto-commit-interval: 5s
       key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
       value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
       properties:
diff --git a/src/test/java/de/juplo/kafka/ApplicationTests.java b/src/test/java/de/juplo/kafka/ApplicationTests.java
deleted file mode 100644 (file)
index e01fdd1..0000000
+++ /dev/null
@@ -1,195 +0,0 @@
-package de.juplo.kafka;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.utils.Bytes;
-import org.springframework.beans.factory.annotation.Autowired;
-
-import java.util.*;
-import java.util.function.Consumer;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-
-@Slf4j
-public class ApplicationTests extends GenericApplicationTests<String, Message>
-{
-  @Autowired
-  StateRepository stateRepository;
-
-
-  public ApplicationTests()
-  {
-    super(new ApplicationTestRecrodGenerator());
-    ((ApplicationTestRecrodGenerator)recordGenerator).tests = this;
-  }
-
-
-  static class ApplicationTestRecrodGenerator implements RecordGenerator
-  {
-    ApplicationTests tests;
-
-    final int[] numbers = {1, 77, 33, 2, 66, 666, 11};
-    final String[] dieWilden13 =
-        IntStream
-            .range(1, 14)
-            .mapToObj(i -> "seeräuber-" + i)
-            .toArray(i -> new String[i]);
-    final StringSerializer stringSerializer = new StringSerializer();
-    final Bytes calculateMessage = new Bytes(stringSerializer.serialize(TOPIC, "{}"));
-
-    int counterMessages;
-    int counterPoisonPills;
-    int counterLogicErrors;
-
-    Map<String, List<AdderResult>> state;
-
-    @Override
-    public void generate(
-        boolean poisonPills,
-        boolean logicErrors,
-        Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
-    {
-      counterMessages = 0;
-      counterPoisonPills = 0;
-      counterLogicErrors = 0;
-
-      state =
-          Arrays
-              .stream(dieWilden13)
-              .collect(Collectors.toMap(
-                  seeräuber -> seeräuber,
-                  seeräuber -> new LinkedList()));
-
-      int number[] = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 };
-      int message[] = { 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1 };
-      int next = 0;
-
-      for (int pass = 0; pass < 333; pass++)
-      {
-        for (int i = 0; i<13; i++)
-        {
-          String seeräuber = dieWilden13[i];
-          Bytes key = new Bytes(stringSerializer.serialize(TOPIC, seeräuber));
-
-          if (message[i] > number[i])
-          {
-            send(
-              key,
-              calculateMessage,
-              Message.Type.CALC,
-              poisonPill(poisonPills, pass, counterMessages),
-              logicError(logicErrors, pass, counterMessages),
-              messageSender);
-            state.get(seeräuber).add(new AdderResult(number[i], (number[i] + 1) * number[i] / 2));
-            // Pick next number to calculate
-            number[i] = numbers[next++%numbers.length];
-            message[i] = 1;
-            log.debug("Seeräuber {} will die Summe für {} berechnen", seeräuber, number[i]);
-          }
-
-          send(
-            key,
-            new Bytes(stringSerializer.serialize(TOPIC, "{\"next\":" + message[i]++ + "}")),
-            Message.Type.ADD,
-            poisonPill(poisonPills, pass, counterMessages),
-            logicError(logicErrors, pass, counterMessages),
-            messageSender);
-        }
-      }
-    }
-
-    @Override
-    public int getNumberOfMessages()
-    {
-      return counterMessages;
-    }
-
-    @Override
-    public int getNumberOfPoisonPills()
-    {
-      return counterPoisonPills;
-    }
-
-    @Override
-    public int getNumberOfLogicErrors()
-    {
-      return counterLogicErrors;
-    }
-
-    boolean poisonPill (boolean poisonPills, int pass, int counter)
-    {
-      return poisonPills && pass > 300 && counter%99 == 0;
-    }
-
-    boolean logicError(boolean logicErrors, int pass, int counter)
-    {
-      return logicErrors && pass > 300 && counter%77 == 0;
-    }
-
-    void send(
-        Bytes key,
-        Bytes value,
-        Message.Type type,
-        boolean poisonPill,
-        boolean logicError,
-        Consumer<ProducerRecord<Bytes, Bytes>> messageSender)
-    {
-      counterMessages++;
-
-      if (logicError)
-      {
-        value = new Bytes(stringSerializer.serialize(TOPIC, "{\"next\":-1}"));
-        counterLogicErrors++;
-      }
-      if (poisonPill)
-      {
-        value = new Bytes("BOOM!".getBytes());
-        counterPoisonPills++;
-      }
-
-      ProducerRecord<Bytes, Bytes> record = new ProducerRecord<>(TOPIC, key, value);
-      record.headers().add("__TypeId__", type.toString().getBytes());
-      messageSender.accept(record);
-    }
-
-    @Override
-    public void assertBusinessLogic()
-    {
-      for (int i=0; i<PARTITIONS; i++)
-      {
-        StateDocument stateDocument =
-            tests.stateRepository.findById(Integer.toString(i)).get();
-
-        stateDocument
-            .results
-            .entrySet()
-            .stream()
-            .forEach(entry ->
-            {
-              String user = entry.getKey();
-              List<AdderResult> resultsForUser = entry.getValue();
-
-              for (int j=0; j < resultsForUser.size(); j++)
-              {
-                if (!(j < state.get(user).size()))
-                {
-                  break;
-                }
-
-                assertThat(resultsForUser.get(j))
-                    .as("Unexpected results calculation %d of user %s", j, user)
-                    .isEqualTo(state.get(user).get(j));
-              }
-
-              assertThat(state.get(user))
-                  .as("More results calculated for user %s as expected", user)
-                  .containsAll(resultsForUser);
-            });
-      }
-    }
-  }
-}
diff --git a/src/test/java/de/juplo/kafka/DeadLetterTopicConsumer.java b/src/test/java/de/juplo/kafka/DeadLetterTopicConsumer.java
deleted file mode 100644 (file)
index ac8c65d..0000000
+++ /dev/null
@@ -1,40 +0,0 @@
-package de.juplo.kafka;
-
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.kafka.annotation.KafkaListener;
-import org.springframework.kafka.support.KafkaHeaders;
-import org.springframework.messaging.Message;
-
-import java.nio.ByteBuffer;
-import java.util.LinkedList;
-import java.util.List;
-
-
-@Slf4j
-public class DeadLetterTopicConsumer
-{
-  List<Message<?>> messages = new LinkedList<>();
-
-
-  @KafkaListener(
-      id = "DLT",
-      topics = "${sumup.adder.topic}.DLT",
-      containerFactory = "dltContainerFactory")
-  public void receive(Message<?> message)
-  {
-    log.info(
-        "Received dead letter on {}-{}|{},{} for {}-{}|{},{}: {}, exception={}",
-        message.getHeaders().get(KafkaHeaders.RECEIVED_TOPIC),
-        message.getHeaders().get(KafkaHeaders.RECEIVED_PARTITION_ID),
-        message.getHeaders().get(KafkaHeaders.OFFSET),
-        message.getHeaders().get(KafkaHeaders.RECEIVED_MESSAGE_KEY),
-        new String(message.getHeaders().get(KafkaHeaders.DLT_ORIGINAL_TOPIC, byte[].class)),
-        ByteBuffer.wrap(message.getHeaders().get(KafkaHeaders.DLT_ORIGINAL_PARTITION, byte[].class)).getInt(),
-        ByteBuffer.wrap(message.getHeaders().get(KafkaHeaders.DLT_ORIGINAL_OFFSET, byte[].class)).getLong(),
-        message.getHeaders().get(KafkaHeaders.MESSAGE_KEY),
-        message.getPayload(),
-        new String(message.getHeaders().get(KafkaHeaders.DLT_EXCEPTION_MESSAGE, byte[].class)));
-
-    messages.add(message);
-  }
-}
diff --git a/src/test/java/de/juplo/kafka/ErrorCannotBeGeneratedCondition.java b/src/test/java/de/juplo/kafka/ErrorCannotBeGeneratedCondition.java
deleted file mode 100644 (file)
index 606218f..0000000
+++ /dev/null
@@ -1,60 +0,0 @@
-package de.juplo.kafka;
-
-import org.junit.jupiter.api.extension.ConditionEvaluationResult;
-import org.junit.jupiter.api.extension.ExecutionCondition;
-import org.junit.jupiter.api.extension.ExtensionContext;
-import org.junit.platform.commons.util.AnnotationUtils;
-
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Optional;
-import java.util.stream.Collectors;
-
-
-public class ErrorCannotBeGeneratedCondition implements ExecutionCondition
-{
-  @Override
-  public ConditionEvaluationResult evaluateExecutionCondition(ExtensionContext context)
-  {
-    final Optional<SkipWhenErrorCannotBeGenerated> optional =
-        AnnotationUtils.findAnnotation(
-            context.getElement(),
-            SkipWhenErrorCannotBeGenerated.class);
-
-    if (context.getTestInstance().isEmpty())
-      return ConditionEvaluationResult.enabled("Test-instance ist not available");
-
-    if (optional.isPresent())
-    {
-      SkipWhenErrorCannotBeGenerated skipWhenErrorCannotBeGenerated = optional.get();
-      GenericApplicationTests instance = (GenericApplicationTests)context.getTestInstance().get();
-      List<String> missingRequiredErrors = new LinkedList<>();
-
-      if (skipWhenErrorCannotBeGenerated.poisonPill() && !instance.recordGenerator.canGeneratePoisonPill())
-        missingRequiredErrors.add("Poison-Pill");
-
-      if (skipWhenErrorCannotBeGenerated.logicError() && !instance.recordGenerator.canGenerateLogicError())
-        missingRequiredErrors.add("Logic-Error");
-
-      StringBuilder builder = new StringBuilder();
-      builder.append(context.getTestClass().get().getSimpleName());
-
-      if (missingRequiredErrors.isEmpty())
-      {
-        builder.append(" can generate all required types of errors");
-        return ConditionEvaluationResult.enabled(builder.toString());
-      }
-
-      builder.append(" cannot generate the required error(s): ");
-      builder.append(
-          missingRequiredErrors
-              .stream()
-              .collect(Collectors.joining(", ")));
-
-      return ConditionEvaluationResult.disabled(builder.toString());
-    }
-
-    return ConditionEvaluationResult.enabled(
-        "Not annotated with " + SkipWhenErrorCannotBeGenerated.class.getSimpleName());
-  }
-}
diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java
deleted file mode 100644 (file)
index ac8a629..0000000
+++ /dev/null
@@ -1,440 +0,0 @@
-package de.juplo.kafka;
-
-import com.mongodb.client.MongoClient;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.serialization.*;
-import org.apache.kafka.common.utils.Bytes;
-import org.junit.jupiter.api.*;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
-import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
-import org.springframework.boot.autoconfigure.mongo.MongoProperties;
-import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo;
-import org.springframework.boot.test.context.ConfigDataApplicationContextInitializer;
-import org.springframework.boot.test.context.TestConfiguration;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Import;
-import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
-import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
-import org.springframework.kafka.core.ConsumerFactory;
-import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
-import org.springframework.kafka.test.context.EmbeddedKafka;
-import org.springframework.test.context.TestPropertySource;
-import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
-
-import java.time.Duration;
-import java.util.*;
-import java.util.function.BiConsumer;
-import java.util.function.Consumer;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-import static de.juplo.kafka.GenericApplicationTests.PARTITIONS;
-import static de.juplo.kafka.GenericApplicationTests.TOPIC;
-import static org.assertj.core.api.Assertions.*;
-import static org.awaitility.Awaitility.*;
-
-
-@SpringJUnitConfig(initializers = ConfigDataApplicationContextInitializer.class)
-@TestPropertySource(
-               properties = {
-                               "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}",
-                               "spring.kafka.producer.bootstrap-servers=${spring.embedded.kafka.brokers}",
-                               "sumup.adder.topic=" + TOPIC,
-                               "spring.kafka.consumer.auto-commit-interval=500ms",
-                               "spring.mongodb.embedded.version=4.4.13" })
-@EmbeddedKafka(topics = TOPIC, partitions = PARTITIONS)
-@EnableAutoConfiguration
-@AutoConfigureDataMongo
-@Slf4j
-abstract class GenericApplicationTests<K, V>
-{
-       public static final String TOPIC = "FOO";
-       public static final int PARTITIONS = 10;
-
-
-       @Autowired
-       org.apache.kafka.clients.consumer.Consumer<K, V> kafkaConsumer;
-       @Autowired
-       KafkaProperties kafkaProperties;
-       @Autowired
-       ApplicationProperties applicationProperties;
-       @Autowired
-       MongoClient mongoClient;
-       @Autowired
-       MongoProperties mongoProperties;
-       @Autowired
-       KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;
-       @Autowired
-       TestRecordHandler recordHandler;
-       @Autowired
-       DeadLetterTopicConsumer deadLetterTopicConsumer;
-       @Autowired
-       EndlessConsumer endlessConsumer;
-
-       KafkaProducer<Bytes, Bytes> testRecordProducer;
-       KafkaConsumer<Bytes, Bytes> offsetConsumer;
-       Map<TopicPartition, Long> oldOffsets;
-
-
-       final RecordGenerator recordGenerator;
-       final Consumer<ProducerRecord<Bytes, Bytes>> messageSender;
-
-       public GenericApplicationTests(RecordGenerator recordGenerator)
-       {
-               this.recordGenerator = recordGenerator;
-               this.messageSender = (record) -> sendMessage(record);
-       }
-
-
-       /** Tests methods */
-
-       @Test
-       void commitsCurrentOffsetsOnSuccess() throws Exception
-       {
-               recordGenerator.generate(false, false, messageSender);
-
-               int numberOfGeneratedMessages = recordGenerator.getNumberOfMessages();
-
-               await(numberOfGeneratedMessages + " records received")
-                               .atMost(Duration.ofSeconds(30))
-                               .pollInterval(Duration.ofSeconds(1))
-                               .until(() -> recordHandler.receivedMessages >= numberOfGeneratedMessages);
-
-               await("Offsets committed")
-                               .atMost(Duration.ofSeconds(10))
-                               .pollInterval(Duration.ofSeconds(1))
-                               .untilAsserted(() ->
-                               {
-                                       checkSeenOffsetsForProgress();
-                                       assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets);
-                               });
-
-               assertThat(endlessConsumer.running())
-                               .describedAs("Consumer should still be running")
-                               .isTrue();
-
-               endlessConsumer.stop();
-               recordGenerator.assertBusinessLogic();
-       }
-
-       @Test
-       @SkipWhenErrorCannotBeGenerated(poisonPill = true)
-       void commitsOffsetOfErrorForReprocessingOnDeserializationError()
-       {
-               recordGenerator.generate(true, false, messageSender);
-
-               int numberOfValidMessages =
-                               recordGenerator.getNumberOfMessages() -
-                               recordGenerator.getNumberOfPoisonPills();
-
-               await(numberOfValidMessages + " records received")
-                               .atMost(Duration.ofSeconds(30))
-                               .pollInterval(Duration.ofSeconds(1))
-                               .until(() -> recordHandler.receivedMessages >= numberOfValidMessages);
-               await(recordGenerator.getNumberOfPoisonPills() + " poison-pills received")
-                               .atMost(Duration.ofSeconds(30))
-                               .pollInterval(Duration.ofSeconds(1))
-                               .until(() -> deadLetterTopicConsumer.messages.size() == recordGenerator.getNumberOfPoisonPills());
-
-               await("Offsets committed")
-                               .atMost(Duration.ofSeconds(10))
-                               .pollInterval(Duration.ofSeconds(1))
-                               .untilAsserted(() ->
-                               {
-                                       checkSeenOffsetsForProgress();
-                                       assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets);
-                               });
-
-               assertThat(endlessConsumer.running())
-                               .describedAs("Consumer should still be running")
-                               .isTrue();
-
-               endlessConsumer.stop();
-               recordGenerator.assertBusinessLogic();
-       }
-
-       @Test
-       @SkipWhenErrorCannotBeGenerated(logicError = true)
-       void commitsOffsetsOfUnseenRecordsOnLogicError()
-       {
-               recordGenerator.generate(false, true, messageSender);
-
-               int numberOfValidMessages =
-                               recordGenerator.getNumberOfMessages() -
-                               recordGenerator.getNumberOfLogicErrors();
-
-               await(numberOfValidMessages + " records received")
-                               .atMost(Duration.ofSeconds(30))
-                               .pollInterval(Duration.ofSeconds(1))
-                               .until(() -> recordHandler.receivedMessages >= numberOfValidMessages);
-               await(recordGenerator.getNumberOfLogicErrors() + " logic-errors received")
-                               .atMost(Duration.ofSeconds(30))
-                               .pollInterval(Duration.ofSeconds(1))
-                               .until(() -> deadLetterTopicConsumer.messages.size() == recordGenerator.getNumberOfLogicErrors());
-
-               await("Offsets committed")
-                               .atMost(Duration.ofSeconds(10))
-                               .pollInterval(Duration.ofSeconds(1))
-                               .untilAsserted(() ->
-                               {
-                                       checkSeenOffsetsForProgress();
-                                       assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets);
-                               });
-
-               assertThat(endlessConsumer.running())
-                               .describedAs("Consumer should still be running")
-                               .isTrue();
-
-               endlessConsumer.stop();
-               recordGenerator.assertBusinessLogic();
-       }
-
-
-       /** Helper methods for the verification of expectations */
-
-       void assertSeenOffsetsEqualCommittedOffsets(Map<TopicPartition, Long> offsetsToCheck)
-       {
-               doForCurrentOffsets((tp, offset) ->
-               {
-                       Long expected = offsetsToCheck.get(tp) + 1;
-                       log.debug("Checking, if the offset {} for {} is exactly {}", offset, tp, expected);
-                       assertThat(offset)
-                                       .describedAs("Committed offset corresponds to the offset of the consumer")
-                                       .isEqualTo(expected);
-               });
-       }
-
-       void assertSeenOffsetsAreBehindCommittedOffsets(Map<TopicPartition, Long> offsetsToCheck)
-       {
-               List<Boolean> isOffsetBehindSeen = new LinkedList<>();
-
-               doForCurrentOffsets((tp, offset) ->
-               {
-                       Long expected = offsetsToCheck.get(tp) + 1;
-                       log.debug("Checking, if the offset {} for {} is at most {}", offset, tp, expected);
-                       assertThat(offset)
-                                       .describedAs("Committed offset must be at most equal to the offset of the consumer")
-                                       .isLessThanOrEqualTo(expected);
-                       isOffsetBehindSeen.add(offset < expected);
-               });
-
-               assertThat(isOffsetBehindSeen.stream().reduce(false, (result, next) -> result | next))
-                               .describedAs("Committed offsets are behind seen offsets")
-                               .isTrue();
-       }
-
-       void checkSeenOffsetsForProgress()
-       {
-               // Be sure, that some messages were consumed...!
-               Set<TopicPartition> withProgress = new HashSet<>();
-               partitions().forEach(tp ->
-               {
-                       Long oldOffset = oldOffsets.get(tp) + 1;
-                       Long newOffset = recordHandler.seenOffsets.get(tp) + 1;
-                       if (!oldOffset.equals(newOffset))
-                       {
-                               log.debug("Progress for {}: {} -> {}", tp, oldOffset, newOffset);
-                               withProgress.add(tp);
-                       }
-               });
-               assertThat(withProgress)
-                               .describedAs("Some offsets must have changed, compared to the old offset-positions")
-                               .isNotEmpty();
-       }
-
-
-       /** Helper methods for setting up and running the tests */
-
-       void seekToEnd()
-       {
-               offsetConsumer.assign(partitions());
-               offsetConsumer.seekToEnd(partitions());
-               partitions().forEach(tp ->
-               {
-                       // seekToEnd() works lazily: it only takes effect on poll()/position()
-                       Long offset = offsetConsumer.position(tp);
-                       log.info("New position for {}: {}", tp, offset);
-               });
-               // The new positions must be commited!
-               offsetConsumer.commitSync();
-               offsetConsumer.unsubscribe();
-       }
-
-       void doForCurrentOffsets(BiConsumer<TopicPartition, Long> consumer)
-       {
-               offsetConsumer.assign(partitions());
-               partitions().forEach(tp -> consumer.accept(tp, offsetConsumer.position(tp)));
-               offsetConsumer.unsubscribe();
-       }
-
-       List<TopicPartition> partitions()
-       {
-               return
-                               IntStream
-                                               .range(0, PARTITIONS)
-                                               .mapToObj(partition -> new TopicPartition(TOPIC, partition))
-                                               .collect(Collectors.toList());
-       }
-
-
-       public interface RecordGenerator
-       {
-               void generate(
-                               boolean poisonPills,
-                               boolean logicErrors,
-                               Consumer<ProducerRecord<Bytes, Bytes>> messageSender);
-
-               int getNumberOfMessages();
-               int getNumberOfPoisonPills();
-               int getNumberOfLogicErrors();
-
-               default boolean canGeneratePoisonPill()
-               {
-                       return true;
-               }
-
-               default boolean canGenerateLogicError()
-               {
-                       return true;
-               }
-
-               default void assertBusinessLogic()
-               {
-                       log.debug("No business-logic to assert");
-               }
-       }
-
-       void sendMessage(ProducerRecord<Bytes, Bytes> record)
-       {
-               testRecordProducer.send(record, (metadata, e) ->
-               {
-                       if (metadata != null)
-                       {
-                               log.debug(
-                                               "{}|{} - {}={}",
-                                               metadata.partition(),
-                                               metadata.offset(),
-                                               record.key(),
-                                               record.value());
-                       }
-                       else
-                       {
-                               log.warn(
-                                               "Exception for {}={}: {}",
-                                               record.key(),
-                                               record.value(),
-                                               e.toString());
-                       }
-               });
-       }
-
-
-       @BeforeEach
-       public void init()
-       {
-               Properties props;
-               props = new Properties();
-               props.put("bootstrap.servers", kafkaProperties.getBootstrapServers());
-               props.put("linger.ms", 100);
-               props.put("key.serializer", BytesSerializer.class.getName());
-               props.put("value.serializer", BytesSerializer.class.getName());
-               testRecordProducer = new KafkaProducer<>(props);
-
-               props = new Properties();
-               props.put("bootstrap.servers", kafkaProperties.getBootstrapServers());
-               props.put("client.id", "OFFSET-CONSUMER");
-               props.put("group.id", kafkaProperties.getConsumer().getGroupId());
-               props.put("key.deserializer", BytesDeserializer.class.getName());
-               props.put("value.deserializer", BytesDeserializer.class.getName());
-               offsetConsumer = new KafkaConsumer<>(props);
-
-               mongoClient.getDatabase(mongoProperties.getDatabase()).drop();
-               seekToEnd();
-
-               oldOffsets = new HashMap<>();
-               recordHandler.seenOffsets = new HashMap<>();
-               recordHandler.receivedMessages = 0;
-
-               deadLetterTopicConsumer.messages.clear();
-
-               doForCurrentOffsets((tp, offset) ->
-               {
-                       oldOffsets.put(tp, offset - 1);
-                       recordHandler.seenOffsets.put(tp, offset - 1);
-               });
-
-               endlessConsumer.start();
-       }
-
-       @AfterEach
-       public void deinit()
-       {
-               try
-               {
-                       endlessConsumer.stop();
-               }
-               catch (Exception e)
-               {
-                       log.debug("{}", e.toString());
-               }
-
-               try
-               {
-                       testRecordProducer.close();
-                       offsetConsumer.close();
-               }
-               catch (Exception e)
-               {
-                       log.info("Exception while stopping the consumer: {}", e.toString());
-               }
-       }
-
-
-       @TestConfiguration
-       @Import(ApplicationConfiguration.class)
-       public static class Configuration
-       {
-               @Bean
-               public RecordHandler recordHandler(RecordHandler applicationRecordHandler)
-               {
-                       return new TestRecordHandler(applicationRecordHandler);
-               }
-
-               @Bean(destroyMethod = "close")
-               public org.apache.kafka.clients.consumer.Consumer<String, Message> kafkaConsumer(ConsumerFactory<String, Message> factory)
-               {
-                       return factory.createConsumer();
-               }
-
-    @Bean
-    public ConcurrentKafkaListenerContainerFactory<String, String> dltContainerFactory(
-      KafkaProperties properties)
-    {
-      Map<String, Object> consumerProperties = new HashMap<>();
-
-      consumerProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, properties.getBootstrapServers());
-      consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-      consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
-      consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-
-      DefaultKafkaConsumerFactory dltConsumerFactory =
-        new DefaultKafkaConsumerFactory<>(consumerProperties);
-      ConcurrentKafkaListenerContainerFactory<String, String> factory =
-        new ConcurrentKafkaListenerContainerFactory<>();
-      factory.setConsumerFactory(dltConsumerFactory);
-      return factory;
-    }
-
-               @Bean
-               public DeadLetterTopicConsumer deadLetterTopicConsumer()
-               {
-                       return new DeadLetterTopicConsumer();
-               }
-       }
-}
diff --git a/src/test/java/de/juplo/kafka/SkipWhenErrorCannotBeGenerated.java b/src/test/java/de/juplo/kafka/SkipWhenErrorCannotBeGenerated.java
deleted file mode 100644 (file)
index 6d15e9e..0000000
+++ /dev/null
@@ -1,15 +0,0 @@
-package de.juplo.kafka;
-
-import org.junit.jupiter.api.extension.ExtendWith;
-
-import java.lang.annotation.Retention;
-import java.lang.annotation.RetentionPolicy;
-
-
-@Retention(RetentionPolicy.RUNTIME)
-@ExtendWith(ErrorCannotBeGeneratedCondition.class)
-public @interface SkipWhenErrorCannotBeGenerated
-{
-  boolean poisonPill() default false;
-  boolean logicError() default false;
-}
diff --git a/src/test/java/de/juplo/kafka/TestRecordHandler.java b/src/test/java/de/juplo/kafka/TestRecordHandler.java
deleted file mode 100644 (file)
index d9f4e67..0000000
+++ /dev/null
@@ -1,52 +0,0 @@
-package de.juplo.kafka;
-
-import lombok.RequiredArgsConstructor;
-import org.apache.kafka.common.TopicPartition;
-
-import java.util.List;
-import java.util.Map;
-
-
-@RequiredArgsConstructor
-public class TestRecordHandler implements RecordHandler
-{
-  private final RecordHandler handler;
-
-  Map<TopicPartition, Long> seenOffsets;
-  int receivedMessages;
-
-
-  public void onNewRecord(
-      String topic,
-      Integer partition,
-      Long offset,
-      Message messgage)
-  {
-    seenOffsets.put(new TopicPartition(topic, partition), offset);
-    receivedMessages++;
-  }
-
-  @Override
-  public void addNumber(
-      String topic,
-      Integer partition,
-      Long offset,
-      String user,
-      MessageAddNumber message)
-  {
-    this.onNewRecord(topic, partition, offset, message);
-    handler.addNumber(topic, partition, offset, user, message);
-  }
-
-  @Override
-  public void calculateSum(
-      String topic,
-      Integer partition,
-      Long offset,
-      String user,
-      MessageCalculateSum message)
-  {
-    this.onNewRecord(topic, partition, offset, message);
-    handler.calculateSum(topic, partition, offset, user, message);
-  }
-}