WIP:kafkahandler
authorKai Moritz <kai@juplo.de>
Sat, 10 Sep 2022 08:49:31 +0000 (10:49 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 10 Sep 2022 08:49:31 +0000 (10:49 +0200)
src/main/java/de/juplo/kafka/Application.java
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java
src/main/java/de/juplo/kafka/ApplicationRecordHandler.java
src/main/java/de/juplo/kafka/DriverController.java
src/test/java/de/juplo/kafka/GenericApplicationTests.java

index 90e7108..fd8eeb8 100644 (file)
@@ -15,14 +15,14 @@ import javax.annotation.PreDestroy;
 public class Application implements ApplicationRunner
 {
   @Autowired
-  ApplicationRecordHandler recordHandler;
+  DriverController driverController;
 
 
   @Override
   public void run(ApplicationArguments args) throws Exception
   {
     log.info("Starting EndlessConsumer");
-    recordHandler.start();
+    driverController.start();
   }
 
   @PreDestroy
@@ -31,7 +31,7 @@ public class Application implements ApplicationRunner
     try
     {
       log.info("Stopping EndlessConsumer");
-      recordHandler.stop();
+      driverController.stop();
     }
     catch (IllegalStateException e)
     {
index df092b0..3ef7398 100644 (file)
@@ -17,15 +17,11 @@ public class ApplicationConfiguration
   @Bean
   public ApplicationRecordHandler applicationRecordHandler(
       AdderResults adderResults,
-      KafkaListenerEndpointRegistry endpointRegistry,
-      ApplicationErrorHandler errorHandler,
       KafkaProperties kafkaProperties,
       ApplicationProperties applicationProperties)
   {
     return new ApplicationRecordHandler(
         adderResults,
-        endpointRegistry,
-        errorHandler,
         Optional.ofNullable(applicationProperties.getThrottle()),
         kafkaProperties.getClientId());
   }
@@ -55,4 +51,20 @@ public class ApplicationConfiguration
   {
     return new ApplicationErrorHandler();
   }
+
+  @Bean
+  public DriverController driverController(
+    KafkaProperties kafkaProperties,
+    ApplicationRecordHandler applicationRecordHandler,
+    AdderResults adderResults,
+    KafkaListenerEndpointRegistry endpointRegistry,
+    ApplicationErrorHandler errorHandler)
+  {
+    return new DriverController(
+        kafkaProperties.getClientId(),
+        applicationRecordHandler,
+        adderResults,
+        endpointRegistry,
+        errorHandler);
+  }
 }
index cf8ef7e..b26b6c6 100644 (file)
@@ -10,7 +10,7 @@ import org.springframework.stereotype.Component;
 @RequiredArgsConstructor
 public class ApplicationHealthIndicator implements HealthIndicator
 {
-  private final ApplicationRecordHandler consumer;
+  private final DriverController driverController;
 
 
   @Override
@@ -18,7 +18,7 @@ public class ApplicationHealthIndicator implements HealthIndicator
   {
     try
     {
-      return consumer
+      return driverController
           .exitStatus()
           .map(Health::down)
           .orElse(Health.outOfService())
index 7ed38b3..521414f 100644 (file)
@@ -3,7 +3,7 @@ package de.juplo.kafka;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
+import org.springframework.kafka.annotation.KafkaListener;
 
 import java.time.Duration;
 import java.util.HashMap;
@@ -13,11 +13,15 @@ import java.util.Optional;
 
 @RequiredArgsConstructor
 @Slf4j
+@KafkaListener(
+  id = "${spring.kafka.client-id}",
+  idIsGroup = false,
+  topics = "${sumup.adder.topic}",
+  batch = "true",
+  autoStartup = "false")
 public class ApplicationRecordHandler implements RecordHandler<String, Message>
 {
   private final AdderResults results;
-  private final KafkaListenerEndpointRegistry registry;
-  private final ApplicationErrorHandler errorHandler;
   private final Optional<Duration> throttle;
   private final String id;
 
@@ -78,39 +82,4 @@ public class ApplicationRecordHandler implements RecordHandler<String, Message>
   {
     return state.get(partition);
   }
-
-
-
-  public void start()
-  {
-    if (running())
-      throw new IllegalStateException("Consumer instance " + id + " is already running!");
-
-    log.info("{} - Starting", id);
-    errorHandler.clearState();
-    registry.getListenerContainer(id).start();
-  }
-
-  public void stop()
-  {
-    if (running())
-      throw new IllegalStateException("Consumer instance " + id + " is not running!");
-
-    log.info("{} - Stopping", id);
-    registry.getListenerContainer(id).stop();
-    log.info("{} - Stopped", id);
-  }
-
-  public boolean running()
-  {
-    return registry.getListenerContainer(id).isRunning();
-  }
-
-  public Optional<Exception> exitStatus()
-  {
-    if (running())
-      throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!");
-
-    return errorHandler.getException();
-  }
 }
index 56791be..e814f9c 100644 (file)
@@ -1,8 +1,10 @@
 package de.juplo.kafka;
 
 import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
 import org.springframework.http.HttpStatus;
 import org.springframework.http.ResponseEntity;
+import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
 import org.springframework.web.bind.annotation.*;
 
 import java.util.List;
@@ -12,24 +14,51 @@ import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 
 
-@RestController
+@ResponseBody
 @RequiredArgsConstructor
+@Slf4j
 public class DriverController
 {
+  private final String id;
   private final ApplicationRecordHandler recordHandler;
   private final AdderResults results;
+  private final KafkaListenerEndpointRegistry registry;
+  private final ApplicationErrorHandler errorHandler;
 
 
   @PostMapping("start")
   public void start()
   {
-    recordHandler.start();
+    if (running())
+      throw new IllegalStateException("Consumer instance " + id + " is already running!");
+
+    log.info("{} - Starting", id);
+    errorHandler.clearState();
+    registry.getListenerContainer(id).start();
   }
 
   @PostMapping("stop")
   public void stop() throws ExecutionException, InterruptedException
   {
-    recordHandler.stop();
+    if (running())
+      throw new IllegalStateException("Consumer instance " + id + " is not running!");
+
+    log.info("{} - Stopping", id);
+    registry.getListenerContainer(id).stop();
+    log.info("{} - Stopped", id);
+  }
+
+  public boolean running()
+  {
+    return registry.getListenerContainer(id).isRunning();
+  }
+
+  public Optional<Exception> exitStatus()
+  {
+    if (running())
+      throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!");
+
+    return errorHandler.getException();
   }
 
 
index 753debe..1e28b06 100644 (file)
@@ -2,7 +2,6 @@ package de.juplo.kafka;
 
 import com.mongodb.client.MongoClient;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
@@ -71,7 +70,7 @@ abstract class GenericApplicationTests<K, V>
        @Autowired
        TestRecordHandler<K, V> recordHandler;
        @Autowired
-       EndlessConsumer<K, V> endlessConsumer;
+       DriverController driverController;
 
        KafkaProducer<Bytes, Bytes> testRecordProducer;
        KafkaConsumer<Bytes, Bytes> offsetConsumer;
@@ -111,10 +110,10 @@ abstract class GenericApplicationTests<K, V>
                                });
 
                assertThatExceptionOfType(IllegalStateException.class)
-                               .isThrownBy(() -> endlessConsumer.exitStatus())
+                               .isThrownBy(() -> driverController.exitStatus())
                                .describedAs("Consumer should still be running");
 
-               endlessConsumer.stop();
+               driverController.stop();
                recordGenerator.assertBusinessLogic();
        }
 
@@ -128,16 +127,16 @@ abstract class GenericApplicationTests<K, V>
                await("Consumer failed")
                                .atMost(Duration.ofSeconds(30))
                                .pollInterval(Duration.ofSeconds(1))
-                               .until(() -> !endlessConsumer.running());
+                               .until(() -> !driverController.running());
 
                checkSeenOffsetsForProgress();
                assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets);
 
-               endlessConsumer.start();
+               driverController.start();
                await("Consumer failed")
                                .atMost(Duration.ofSeconds(30))
                                .pollInterval(Duration.ofSeconds(1))
-                               .until(() -> !endlessConsumer.running());
+                               .until(() -> !driverController.running());
 
                checkSeenOffsetsForProgress();
                assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets);
@@ -147,8 +146,8 @@ abstract class GenericApplicationTests<K, V>
 
                assertThatNoException()
                                .describedAs("Consumer should not be running")
-                               .isThrownBy(() -> endlessConsumer.exitStatus());
-               assertThat(endlessConsumer.exitStatus())
+                               .isThrownBy(() -> driverController.exitStatus());
+               assertThat(driverController.exitStatus())
                                .describedAs("Consumer should have exited abnormally")
                                .containsInstanceOf(RecordDeserializationException.class);
 
@@ -165,23 +164,23 @@ abstract class GenericApplicationTests<K, V>
                await("Consumer failed")
                                .atMost(Duration.ofSeconds(30))
                                .pollInterval(Duration.ofSeconds(1))
-                               .until(() -> !endlessConsumer.running());
+                               .until(() -> !driverController.running());
 
                checkSeenOffsetsForProgress();
                assertSeenOffsetsAreBehindCommittedOffsets(recordHandler.seenOffsets);
 
-               endlessConsumer.start();
+               driverController.start();
                await("Consumer failed")
                                .atMost(Duration.ofSeconds(30))
                                .pollInterval(Duration.ofSeconds(1))
-                               .until(() -> !endlessConsumer.running());
+                               .until(() -> !driverController.running());
 
                assertSeenOffsetsAreBehindCommittedOffsets(recordHandler.seenOffsets);
 
                assertThatNoException()
                                .describedAs("Consumer should not be running")
-                               .isThrownBy(() -> endlessConsumer.exitStatus());
-               assertThat(endlessConsumer.exitStatus())
+                               .isThrownBy(() -> driverController.exitStatus());
+               assertThat(driverController.exitStatus())
                                .describedAs("Consumer should have exited abnormally")
                                .containsInstanceOf(RuntimeException.class);
 
@@ -356,7 +355,7 @@ abstract class GenericApplicationTests<K, V>
                        recordHandler.seenOffsets.put(tp, offset - 1);
                });
 
-               endlessConsumer.start();
+               driverController.start();
        }
 
        @AfterEach
@@ -364,7 +363,7 @@ abstract class GenericApplicationTests<K, V>
        {
                try
                {
-                       endlessConsumer.stop();
+                       driverController.stop();
                }
                catch (Exception e)
                {
@@ -394,7 +393,7 @@ abstract class GenericApplicationTests<K, V>
                }
 
     @Bean(destroyMethod = "close")
-    public org.apache.kafka.clients.consumer.Consumer<String, Message> kafkaConsumer(ConsumerFactory<String, Message> factory)
+    public org.apache.kafka.clients.consumer.Consumer<String, Message> kafkaConsumer(ConsumerFactory<String, Message>   factory)
     {
       return factory.createConsumer();
     }