From 08bd8983a84a801b3dffe386245679c03546b973 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 10 Sep 2022 10:35:17 +0200 Subject: [PATCH] WIP:kafkahandler --- src/main/java/de/juplo/kafka/Application.java | 6 +- .../juplo/kafka/ApplicationConfiguration.java | 19 +---- .../kafka/ApplicationHealthIndicator.java | 2 +- .../juplo/kafka/ApplicationRecordHandler.java | 38 +++++++++ .../java/de/juplo/kafka/DriverController.java | 5 +- .../java/de/juplo/kafka/EndlessConsumer.java | 85 ------------------- 6 files changed, 48 insertions(+), 107 deletions(-) delete mode 100644 src/main/java/de/juplo/kafka/EndlessConsumer.java diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index a4d9aeb..90e7108 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -15,14 +15,14 @@ import javax.annotation.PreDestroy; public class Application implements ApplicationRunner { @Autowired - EndlessConsumer endlessConsumer; + ApplicationRecordHandler recordHandler; @Override public void run(ApplicationArguments args) throws Exception { log.info("Starting EndlessConsumer"); - endlessConsumer.start(); + recordHandler.start(); } @PreDestroy @@ -31,7 +31,7 @@ public class Application implements ApplicationRunner try { log.info("Stopping EndlessConsumer"); - endlessConsumer.stop(); + recordHandler.stop(); } catch (IllegalStateException e) { diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index f8bf857..df092b0 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -17,11 +17,15 @@ public class ApplicationConfiguration @Bean public ApplicationRecordHandler applicationRecordHandler( AdderResults adderResults, + KafkaListenerEndpointRegistry endpointRegistry, + ApplicationErrorHandler errorHandler, KafkaProperties kafkaProperties, ApplicationProperties applicationProperties) { return new ApplicationRecordHandler( adderResults, + endpointRegistry, + errorHandler, Optional.ofNullable(applicationProperties.getThrottle()), kafkaProperties.getClientId()); } @@ -51,19 +55,4 @@ public class ApplicationConfiguration { return new ApplicationErrorHandler(); } - - @Bean - public EndlessConsumer endlessConsumer( - RecordHandler recordHandler, - ApplicationErrorHandler errorHandler, - KafkaProperties kafkaProperties, - KafkaListenerEndpointRegistry endpointRegistry) - { - return - new EndlessConsumer<>( - kafkaProperties.getClientId(), - endpointRegistry, - errorHandler, - recordHandler); - } } diff --git a/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java b/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java index 03a14c8..cf8ef7e 100644 --- a/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java +++ b/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java @@ -10,7 +10,7 @@ import org.springframework.stereotype.Component; @RequiredArgsConstructor public class ApplicationHealthIndicator implements HealthIndicator { - private final EndlessConsumer consumer; + private final ApplicationRecordHandler consumer; @Override diff --git a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java index 829ab0e..7ed38b3 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java +++ b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java @@ -3,6 +3,7 @@ package de.juplo.kafka; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; import java.time.Duration; import java.util.HashMap; @@ -15,6 +16,8 @@ import java.util.Optional; public class ApplicationRecordHandler implements RecordHandler { private final AdderResults results; + private final KafkaListenerEndpointRegistry registry; + private final ApplicationErrorHandler errorHandler; private final Optional throttle; private final String id; @@ -75,4 +78,39 @@ public class ApplicationRecordHandler implements RecordHandler { return state.get(partition); } + + + + public void start() + { + if (running()) + throw new IllegalStateException("Consumer instance " + id + " is already running!"); + + log.info("{} - Starting", id); + errorHandler.clearState(); + registry.getListenerContainer(id).start(); + } + + public void stop() + { + if (running()) + throw new IllegalStateException("Consumer instance " + id + " is not running!"); + + log.info("{} - Stopping", id); + registry.getListenerContainer(id).stop(); + log.info("{} - Stopped", id); + } + + public boolean running() + { + return registry.getListenerContainer(id).isRunning(); + } + + public Optional exitStatus() + { + if (running()) + throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!"); + + return errorHandler.getException(); + } } diff --git a/src/main/java/de/juplo/kafka/DriverController.java b/src/main/java/de/juplo/kafka/DriverController.java index 26a5bc8..56791be 100644 --- a/src/main/java/de/juplo/kafka/DriverController.java +++ b/src/main/java/de/juplo/kafka/DriverController.java @@ -16,7 +16,6 @@ import java.util.stream.Collectors; @RequiredArgsConstructor public class DriverController { - private final EndlessConsumer consumer; private final ApplicationRecordHandler recordHandler; private final AdderResults results; @@ -24,13 +23,13 @@ public class DriverController @PostMapping("start") public void start() { - consumer.start(); + recordHandler.start(); } @PostMapping("stop") public void stop() throws ExecutionException, InterruptedException { - consumer.stop(); + recordHandler.stop(); } diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java deleted file mode 100644 index dfb8349..0000000 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ /dev/null @@ -1,85 +0,0 @@ -package de.juplo.kafka; - -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.springframework.kafka.annotation.KafkaListener; -import org.springframework.kafka.config.KafkaListenerEndpointRegistry; - -import java.util.List; -import java.util.Optional; - - -@RequiredArgsConstructor -@Slf4j -public class EndlessConsumer -{ - private final String id; - private final KafkaListenerEndpointRegistry registry; - private final ApplicationErrorHandler errorHandler; - private final RecordHandler recordHandler; - - private long consumed = 0; - - - @KafkaListener( - id = "${spring.kafka.client-id}", - idIsGroup = false, - topics = "${sumup.adder.topic}", - batch = "true", - autoStartup = "false") - public void accept(List> records) - { - // Do something with the data... - log.info("{} - Received {} messages", id, records.size()); - for (ConsumerRecord record : records) - { - log.info( - "{} - {}: {}/{} - {}={}", - id, - record.offset(), - record.topic(), - record.partition(), - record.key(), - record.value() - ); - - recordHandler.accept(record); - - consumed++; - } - } - - public void start() - { - if (running()) - throw new IllegalStateException("Consumer instance " + id + " is already running!"); - - log.info("{} - Starting - consumed {} messages before", id, consumed); - errorHandler.clearState(); - registry.getListenerContainer(id).start(); - } - - public void stop() - { - if (running()) - throw new IllegalStateException("Consumer instance " + id + " is not running!"); - - log.info("{} - Stopping", id); - registry.getListenerContainer(id).stop(); - log.info("{} - Stopped - consumed {} messages so far", id, consumed); - } - - public boolean running() - { - return registry.getListenerContainer(id).isRunning(); - } - - public Optional exitStatus() - { - if (running()) - throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!"); - - return errorHandler.getException(); - } -} -- 2.20.1