WIP:kafkahandler
authorKai Moritz <kai@juplo.de>
Sat, 10 Sep 2022 08:35:17 +0000 (10:35 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 10 Sep 2022 08:35:17 +0000 (10:35 +0200)
src/main/java/de/juplo/kafka/Application.java
src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java
src/main/java/de/juplo/kafka/ApplicationRecordHandler.java
src/main/java/de/juplo/kafka/DriverController.java
src/main/java/de/juplo/kafka/EndlessConsumer.java [deleted file]

index a4d9aeb..90e7108 100644 (file)
@@ -15,14 +15,14 @@ import javax.annotation.PreDestroy;
 public class Application implements ApplicationRunner
 {
   @Autowired
-  EndlessConsumer endlessConsumer;
+  ApplicationRecordHandler recordHandler;
 
 
   @Override
   public void run(ApplicationArguments args) throws Exception
   {
     log.info("Starting EndlessConsumer");
-    endlessConsumer.start();
+    recordHandler.start();
   }
 
   @PreDestroy
@@ -31,7 +31,7 @@ public class Application implements ApplicationRunner
     try
     {
       log.info("Stopping EndlessConsumer");
-      endlessConsumer.stop();
+      recordHandler.stop();
     }
     catch (IllegalStateException e)
     {
index f8bf857..df092b0 100644 (file)
@@ -17,11 +17,15 @@ public class ApplicationConfiguration
   @Bean
   public ApplicationRecordHandler applicationRecordHandler(
       AdderResults adderResults,
+      KafkaListenerEndpointRegistry endpointRegistry,
+      ApplicationErrorHandler errorHandler,
       KafkaProperties kafkaProperties,
       ApplicationProperties applicationProperties)
   {
     return new ApplicationRecordHandler(
         adderResults,
+        endpointRegistry,
+        errorHandler,
         Optional.ofNullable(applicationProperties.getThrottle()),
         kafkaProperties.getClientId());
   }
@@ -51,19 +55,4 @@ public class ApplicationConfiguration
   {
     return new ApplicationErrorHandler();
   }
-
-  @Bean
-  public EndlessConsumer<String, Message> endlessConsumer(
-      RecordHandler recordHandler,
-      ApplicationErrorHandler errorHandler,
-      KafkaProperties kafkaProperties,
-      KafkaListenerEndpointRegistry endpointRegistry)
-  {
-    return
-        new EndlessConsumer<>(
-            kafkaProperties.getClientId(),
-            endpointRegistry,
-            errorHandler,
-            recordHandler);
-  }
 }
index 03a14c8..cf8ef7e 100644 (file)
@@ -10,7 +10,7 @@ import org.springframework.stereotype.Component;
 @RequiredArgsConstructor
 public class ApplicationHealthIndicator implements HealthIndicator
 {
-  private final EndlessConsumer<String, Message> consumer;
+  private final ApplicationRecordHandler consumer;
 
 
   @Override
index 829ab0e..7ed38b3 100644 (file)
@@ -3,6 +3,7 @@ package de.juplo.kafka;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
 
 import java.time.Duration;
 import java.util.HashMap;
@@ -15,6 +16,8 @@ import java.util.Optional;
 public class ApplicationRecordHandler implements RecordHandler<String, Message>
 {
   private final AdderResults results;
+  private final KafkaListenerEndpointRegistry registry;
+  private final ApplicationErrorHandler errorHandler;
   private final Optional<Duration> throttle;
   private final String id;
 
@@ -75,4 +78,39 @@ public class ApplicationRecordHandler implements RecordHandler<String, Message>
   {
     return state.get(partition);
   }
+
+
+
+  public void start()
+  {
+    if (running())
+      throw new IllegalStateException("Consumer instance " + id + " is already running!");
+
+    log.info("{} - Starting", id);
+    errorHandler.clearState();
+    registry.getListenerContainer(id).start();
+  }
+
+  public void stop()
+  {
+    if (running())
+      throw new IllegalStateException("Consumer instance " + id + " is not running!");
+
+    log.info("{} - Stopping", id);
+    registry.getListenerContainer(id).stop();
+    log.info("{} - Stopped", id);
+  }
+
+  public boolean running()
+  {
+    return registry.getListenerContainer(id).isRunning();
+  }
+
+  public Optional<Exception> exitStatus()
+  {
+    if (running())
+      throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!");
+
+    return errorHandler.getException();
+  }
 }
index 26a5bc8..56791be 100644 (file)
@@ -16,7 +16,6 @@ import java.util.stream.Collectors;
 @RequiredArgsConstructor
 public class DriverController
 {
-  private final EndlessConsumer consumer;
   private final ApplicationRecordHandler recordHandler;
   private final AdderResults results;
 
@@ -24,13 +23,13 @@ public class DriverController
   @PostMapping("start")
   public void start()
   {
-    consumer.start();
+    recordHandler.start();
   }
 
   @PostMapping("stop")
   public void stop() throws ExecutionException, InterruptedException
   {
-    consumer.stop();
+    recordHandler.stop();
   }
 
 
diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java
deleted file mode 100644 (file)
index dfb8349..0000000
+++ /dev/null
@@ -1,85 +0,0 @@
-package de.juplo.kafka;
-
-import lombok.RequiredArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.springframework.kafka.annotation.KafkaListener;
-import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
-
-import java.util.List;
-import java.util.Optional;
-
-
-@RequiredArgsConstructor
-@Slf4j
-public class EndlessConsumer<K, V>
-{
-  private final String id;
-  private final KafkaListenerEndpointRegistry registry;
-  private final ApplicationErrorHandler errorHandler;
-  private final RecordHandler<K, V> recordHandler;
-
-  private long consumed = 0;
-
-
-  @KafkaListener(
-      id = "${spring.kafka.client-id}",
-      idIsGroup = false,
-      topics = "${sumup.adder.topic}",
-      batch = "true",
-      autoStartup = "false")
-  public void accept(List<ConsumerRecord<K, V>> records)
-  {
-        // Do something with the data...
-        log.info("{} - Received {} messages", id, records.size());
-        for (ConsumerRecord<K, V> record : records)
-        {
-          log.info(
-              "{} - {}: {}/{} - {}={}",
-              id,
-              record.offset(),
-              record.topic(),
-              record.partition(),
-              record.key(),
-              record.value()
-          );
-
-          recordHandler.accept(record);
-
-          consumed++;
-        }
-  }
-
-  public void start()
-  {
-    if (running())
-      throw new IllegalStateException("Consumer instance " + id + " is already running!");
-
-    log.info("{} - Starting - consumed {} messages before", id, consumed);
-    errorHandler.clearState();
-    registry.getListenerContainer(id).start();
-  }
-
-  public void stop()
-  {
-    if (running())
-      throw new IllegalStateException("Consumer instance " + id + " is not running!");
-
-    log.info("{} - Stopping", id);
-    registry.getListenerContainer(id).stop();
-    log.info("{} - Stopped - consumed {} messages so far", id, consumed);
-  }
-
-  public boolean running()
-  {
-    return registry.getListenerContainer(id).isRunning();
-  }
-
-  public Optional<Exception> exitStatus()
-  {
-    if (running())
-      throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!");
-
-    return errorHandler.getException();
-  }
-}