From 5c4020772b4c955394e2e36f6518b647dc1bad5d Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 10 Sep 2022 10:49:31 +0200 Subject: [PATCH] WIP:kafkahandler --- src/main/java/de/juplo/kafka/Application.java | 6 +-- .../juplo/kafka/ApplicationConfiguration.java | 20 +++++++-- .../kafka/ApplicationHealthIndicator.java | 4 +- .../juplo/kafka/ApplicationRecordHandler.java | 45 +++---------------- .../java/de/juplo/kafka/DriverController.java | 35 +++++++++++++-- .../juplo/kafka/GenericApplicationTests.java | 33 +++++++------- 6 files changed, 76 insertions(+), 67 deletions(-) diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index 90e7108..fd8eeb8 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -15,14 +15,14 @@ import javax.annotation.PreDestroy; public class Application implements ApplicationRunner { @Autowired - ApplicationRecordHandler recordHandler; + DriverController driverController; @Override public void run(ApplicationArguments args) throws Exception { log.info("Starting EndlessConsumer"); - recordHandler.start(); + driverController.start(); } @PreDestroy @@ -31,7 +31,7 @@ public class Application implements ApplicationRunner try { log.info("Stopping EndlessConsumer"); - recordHandler.stop(); + driverController.stop(); } catch (IllegalStateException e) { diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index df092b0..3ef7398 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -17,15 +17,11 @@ public class ApplicationConfiguration @Bean public ApplicationRecordHandler applicationRecordHandler( AdderResults adderResults, - KafkaListenerEndpointRegistry endpointRegistry, - ApplicationErrorHandler errorHandler, KafkaProperties kafkaProperties, ApplicationProperties applicationProperties) { return new ApplicationRecordHandler( adderResults, - endpointRegistry, - errorHandler, Optional.ofNullable(applicationProperties.getThrottle()), kafkaProperties.getClientId()); } @@ -55,4 +51,20 @@ public class ApplicationConfiguration { return new ApplicationErrorHandler(); } + + @Bean + public DriverController driverController( + KafkaProperties kafkaProperties, + ApplicationRecordHandler applicationRecordHandler, + AdderResults adderResults, + KafkaListenerEndpointRegistry endpointRegistry, + ApplicationErrorHandler errorHandler) + { + return new DriverController( + kafkaProperties.getClientId(), + applicationRecordHandler, + adderResults, + endpointRegistry, + errorHandler); + } } diff --git a/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java b/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java index cf8ef7e..b26b6c6 100644 --- a/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java +++ b/src/main/java/de/juplo/kafka/ApplicationHealthIndicator.java @@ -10,7 +10,7 @@ import org.springframework.stereotype.Component; @RequiredArgsConstructor public class ApplicationHealthIndicator implements HealthIndicator { - private final ApplicationRecordHandler consumer; + private final DriverController driverController; @Override @@ -18,7 +18,7 @@ public class ApplicationHealthIndicator implements HealthIndicator { try { - return consumer + return driverController .exitStatus() .map(Health::down) .orElse(Health.outOfService()) diff --git a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java index 7ed38b3..521414f 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java +++ b/src/main/java/de/juplo/kafka/ApplicationRecordHandler.java @@ -3,7 +3,7 @@ package de.juplo.kafka; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.springframework.kafka.config.KafkaListenerEndpointRegistry; +import org.springframework.kafka.annotation.KafkaListener; import java.time.Duration; import java.util.HashMap; @@ -13,11 +13,15 @@ import java.util.Optional; @RequiredArgsConstructor @Slf4j +@KafkaListener( + id = "${spring.kafka.client-id}", + idIsGroup = false, + topics = "${sumup.adder.topic}", + batch = "true", + autoStartup = "false") public class ApplicationRecordHandler implements RecordHandler { private final AdderResults results; - private final KafkaListenerEndpointRegistry registry; - private final ApplicationErrorHandler errorHandler; private final Optional throttle; private final String id; @@ -78,39 +82,4 @@ public class ApplicationRecordHandler implements RecordHandler { return state.get(partition); } - - - - public void start() - { - if (running()) - throw new IllegalStateException("Consumer instance " + id + " is already running!"); - - log.info("{} - Starting", id); - errorHandler.clearState(); - registry.getListenerContainer(id).start(); - } - - public void stop() - { - if (running()) - throw new IllegalStateException("Consumer instance " + id + " is not running!"); - - log.info("{} - Stopping", id); - registry.getListenerContainer(id).stop(); - log.info("{} - Stopped", id); - } - - public boolean running() - { - return registry.getListenerContainer(id).isRunning(); - } - - public Optional exitStatus() - { - if (running()) - throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!"); - - return errorHandler.getException(); - } } diff --git a/src/main/java/de/juplo/kafka/DriverController.java b/src/main/java/de/juplo/kafka/DriverController.java index 56791be..e814f9c 100644 --- a/src/main/java/de/juplo/kafka/DriverController.java +++ b/src/main/java/de/juplo/kafka/DriverController.java @@ -1,8 +1,10 @@ package de.juplo.kafka; import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; import org.springframework.web.bind.annotation.*; import java.util.List; @@ -12,24 +14,51 @@ import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; -@RestController +@ResponseBody @RequiredArgsConstructor +@Slf4j public class DriverController { + private final String id; private final ApplicationRecordHandler recordHandler; private final AdderResults results; + private final KafkaListenerEndpointRegistry registry; + private final ApplicationErrorHandler errorHandler; @PostMapping("start") public void start() { - recordHandler.start(); + if (running()) + throw new IllegalStateException("Consumer instance " + id + " is already running!"); + + log.info("{} - Starting", id); + errorHandler.clearState(); + registry.getListenerContainer(id).start(); } @PostMapping("stop") public void stop() throws ExecutionException, InterruptedException { - recordHandler.stop(); + if (running()) + throw new IllegalStateException("Consumer instance " + id + " is not running!"); + + log.info("{} - Stopping", id); + registry.getListenerContainer(id).stop(); + log.info("{} - Stopped", id); + } + + public boolean running() + { + return registry.getListenerContainer(id).isRunning(); + } + + public Optional exitStatus() + { + if (running()) + throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!"); + + return errorHandler.getException(); } diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index 753debe..1e28b06 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -2,7 +2,6 @@ package de.juplo.kafka; import com.mongodb.client.MongoClient; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -71,7 +70,7 @@ abstract class GenericApplicationTests @Autowired TestRecordHandler recordHandler; @Autowired - EndlessConsumer endlessConsumer; + DriverController driverController; KafkaProducer testRecordProducer; KafkaConsumer offsetConsumer; @@ -111,10 +110,10 @@ abstract class GenericApplicationTests }); assertThatExceptionOfType(IllegalStateException.class) - .isThrownBy(() -> endlessConsumer.exitStatus()) + .isThrownBy(() -> driverController.exitStatus()) .describedAs("Consumer should still be running"); - endlessConsumer.stop(); + driverController.stop(); recordGenerator.assertBusinessLogic(); } @@ -128,16 +127,16 @@ abstract class GenericApplicationTests await("Consumer failed") .atMost(Duration.ofSeconds(30)) .pollInterval(Duration.ofSeconds(1)) - .until(() -> !endlessConsumer.running()); + .until(() -> !driverController.running()); checkSeenOffsetsForProgress(); assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); - endlessConsumer.start(); + driverController.start(); await("Consumer failed") .atMost(Duration.ofSeconds(30)) .pollInterval(Duration.ofSeconds(1)) - .until(() -> !endlessConsumer.running()); + .until(() -> !driverController.running()); checkSeenOffsetsForProgress(); assertSeenOffsetsEqualCommittedOffsets(recordHandler.seenOffsets); @@ -147,8 +146,8 @@ abstract class GenericApplicationTests assertThatNoException() .describedAs("Consumer should not be running") - .isThrownBy(() -> endlessConsumer.exitStatus()); - assertThat(endlessConsumer.exitStatus()) + .isThrownBy(() -> driverController.exitStatus()); + assertThat(driverController.exitStatus()) .describedAs("Consumer should have exited abnormally") .containsInstanceOf(RecordDeserializationException.class); @@ -165,23 +164,23 @@ abstract class GenericApplicationTests await("Consumer failed") .atMost(Duration.ofSeconds(30)) .pollInterval(Duration.ofSeconds(1)) - .until(() -> !endlessConsumer.running()); + .until(() -> !driverController.running()); checkSeenOffsetsForProgress(); assertSeenOffsetsAreBehindCommittedOffsets(recordHandler.seenOffsets); - endlessConsumer.start(); + driverController.start(); await("Consumer failed") .atMost(Duration.ofSeconds(30)) .pollInterval(Duration.ofSeconds(1)) - .until(() -> !endlessConsumer.running()); + .until(() -> !driverController.running()); assertSeenOffsetsAreBehindCommittedOffsets(recordHandler.seenOffsets); assertThatNoException() .describedAs("Consumer should not be running") - .isThrownBy(() -> endlessConsumer.exitStatus()); - assertThat(endlessConsumer.exitStatus()) + .isThrownBy(() -> driverController.exitStatus()); + assertThat(driverController.exitStatus()) .describedAs("Consumer should have exited abnormally") .containsInstanceOf(RuntimeException.class); @@ -356,7 +355,7 @@ abstract class GenericApplicationTests recordHandler.seenOffsets.put(tp, offset - 1); }); - endlessConsumer.start(); + driverController.start(); } @AfterEach @@ -364,7 +363,7 @@ abstract class GenericApplicationTests { try { - endlessConsumer.stop(); + driverController.stop(); } catch (Exception e) { @@ -394,7 +393,7 @@ abstract class GenericApplicationTests } @Bean(destroyMethod = "close") - public org.apache.kafka.clients.consumer.Consumer kafkaConsumer(ConsumerFactory factory) + public org.apache.kafka.clients.consumer.Consumer kafkaConsumer(ConsumerFactory factory) { return factory.createConsumer(); } -- 2.20.1