From 19c132c9bf0aa3deb27a558160f8d056cf271cb0 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 4 Sep 2022 19:30:29 +0200 Subject: [PATCH] WIP inklu:Exit Status --- src/main/java/de/juplo/kafka/Application.java | 27 --- .../juplo/kafka/ApplicationConfiguration.java | 35 +--- .../kafka/ApplicationRebalanceListener.java | 3 +- .../java/de/juplo/kafka/EndlessConsumer.java | 195 +++--------------- .../juplo/kafka/GenericApplicationTests.java | 12 +- 5 files changed, 53 insertions(+), 219 deletions(-) diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index 76c2520..a4d9aeb 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -8,8 +8,6 @@ import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import javax.annotation.PreDestroy; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; @SpringBootApplication @@ -18,8 +16,6 @@ public class Application implements ApplicationRunner { @Autowired EndlessConsumer endlessConsumer; - @Autowired - ExecutorService executor; @Override @@ -45,29 +41,6 @@ public class Application implements ApplicationRunner { log.error("Unexpected exception while stopping EndlessConsumer: {}", e); } - - try - { - log.info("Shutting down the ExecutorService."); - executor.shutdown(); - log.info("Waiting 5 seconds for the ExecutorService to terminate..."); - executor.awaitTermination(5, TimeUnit.SECONDS); - } - catch (InterruptedException e) - { - log.error("Exception while waiting for the termination of the ExecutorService: {}", e); - } - finally - { - if (!executor.isTerminated()) - { - log.warn("Forcing shutdown of ExecutorService!"); - executor - .shutdownNow() - .forEach(runnable -> log.warn("Unprocessed task: {}", runnable.getClass().getSimpleName())); - } - log.info("Shutdow of ExecutorService finished"); - } } diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 08c827c..f8bf857 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -1,16 +1,13 @@ package de.juplo.kafka; -import org.apache.kafka.clients.consumer.Consumer; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.Optional; -import org.springframework.kafka.core.ConsumerFactory; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; @Configuration @@ -49,34 +46,24 @@ public class ApplicationConfiguration kafkaProperties.getClientId()); } + @Bean + public ApplicationErrorHandler applicationErrorHandler() + { + return new ApplicationErrorHandler(); + } + @Bean public EndlessConsumer endlessConsumer( - Consumer kafkaConsumer, - ExecutorService executor, - ApplicationRebalanceListener rebalanceListener, RecordHandler recordHandler, + ApplicationErrorHandler errorHandler, KafkaProperties kafkaProperties, - ApplicationProperties applicationProperties) + KafkaListenerEndpointRegistry endpointRegistry) { return new EndlessConsumer<>( - executor, kafkaProperties.getClientId(), - applicationProperties.getTopic(), - kafkaConsumer, - rebalanceListener, + endpointRegistry, + errorHandler, recordHandler); } - - @Bean - public ExecutorService executor() - { - return Executors.newSingleThreadExecutor(); - } - - @Bean(destroyMethod = "close") - public Consumer kafkaConsumer(ConsumerFactory factory) - { - return factory.createConsumer(); - } } diff --git a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java index 0bfee67..ba15227 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java +++ b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java @@ -4,13 +4,14 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.common.TopicPartition; +import org.springframework.kafka.listener.ConsumerAwareRebalanceListener; import java.util.*; @RequiredArgsConstructor @Slf4j -public class ApplicationRebalanceListener implements ConsumerRebalanceListener +public class ApplicationRebalanceListener implements ConsumerAwareRebalanceListener { private final ApplicationRecordHandler recordHandler; private final AdderResults adderResults; diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 00678c4..9edc87b 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -2,55 +2,36 @@ package de.juplo.kafka; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.*; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.RecordDeserializationException; -import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; -import javax.annotation.PreDestroy; -import java.time.Duration; -import java.util.*; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; +import java.util.List; +import java.util.Optional; -@Slf4j @RequiredArgsConstructor -public class EndlessConsumer implements Runnable +@Slf4j +public class EndlessConsumer { - private final ExecutorService executor; private final String id; - private final String topic; - private final Consumer consumer; - private final ConsumerRebalanceListener rebalanceListener; + private final KafkaListenerEndpointRegistry registry; + private final ApplicationErrorHandler errorHandler; private final RecordHandler recordHandler; - private final Lock lock = new ReentrantLock(); - private final Condition condition = lock.newCondition(); - private boolean running = false; - private Exception exception; private long consumed = 0; - - @Override - public void run() + @KafkaListener( + id = "${spring.kafka.client-id}", + idIsGroup = false, + topics = "${sumup.adder.topic}", + batch = "true", + autoStartup = "false") + public void accept(List> records) { - try - { - log.info("{} - Subscribing to topic {}", id, topic); - consumer.subscribe(Arrays.asList(topic), rebalanceListener); - - while (true) - { - ConsumerRecords records = - consumer.poll(Duration.ofSeconds(1)); - // Do something with the data... - log.info("{} - Received {} messages", id, records.count()); + log.info("{} - Received {} messages", id, records.size()); for (ConsumerRecord record : records) { log.info( @@ -67,146 +48,38 @@ public class EndlessConsumer implements Runnable consumed++; } - } - } - catch(WakeupException e) - { - log.info("{} - RIIING! Request to stop consumption - commiting current offsets!", id); - consumer.commitSync(); - shutdown(); - } - catch(RecordDeserializationException e) - { - TopicPartition tp = e.topicPartition(); - long offset = e.offset(); - log.error( - "{} - Could not deserialize message on topic {} with offset={}: {}", - id, - tp, - offset, - e.getCause().toString()); - - consumer.commitSync(); - shutdown(e); - } - catch(Exception e) - { - log.error("{} - Unexpected error: {}", id, e.toString(), e); - shutdown(e); - } - finally - { - log.info("{} - Consumer-Thread exiting", id); - } - } - - private void shutdown() - { - shutdown(null); - } - - private void shutdown(Exception e) - { - lock.lock(); - try - { - try - { - log.info("{} - Unsubscribing from topic {}", id, topic); - consumer.unsubscribe(); - } - catch (Exception ue) - { - log.error( - "{} - Error while unsubscribing from topic {}: {}", - id, - topic, - ue.toString()); - } - finally - { - running = false; - exception = e; - condition.signal(); - } - } - finally - { - lock.unlock(); - } } public void start() { - lock.lock(); - try - { - if (running) - throw new IllegalStateException("Consumer instance " + id + " is already running!"); - - log.info("{} - Starting - consumed {} messages before", id, consumed); - running = true; - exception = null; - executor.submit(this); - } - finally - { - lock.unlock(); - } - } + if (registry.getListenerContainer(id).isChildRunning()) + throw new IllegalStateException("Consumer instance " + id + " is already running!"); - public synchronized void stop() throws InterruptedException - { - lock.lock(); - try - { - if (!running) - throw new IllegalStateException("Consumer instance " + id + " is not running!"); - - log.info("{} - Stopping", id); - consumer.wakeup(); - condition.await(); - log.info("{} - Stopped - consumed {} messages so far", id, consumed); - } - finally - { - lock.unlock(); - } + log.info("{} - Starting ListenerContainer", id); + errorHandler.clearState(); + registry.getListenerContainer(id).start(); } - @PreDestroy - public void destroy() throws ExecutionException, InterruptedException + public void stop() { - log.info("{} - Destroy!", id); - log.info("{}: Consumed {} messages in total, exiting!", id, consumed); + if (!registry.getListenerContainer(id).isChildRunning()) + throw new IllegalStateException("Consumer instance " + id + " is not running!"); + + log.info("{} - Stopping ListenerContainer", id); + registry.getListenerContainer(id).stop(); + log.info("{} - Stopped", id); } public boolean running() { - lock.lock(); - try - { - return running; - } - finally - { - lock.unlock(); - } + return registry.getListenerContainer(id).isRunning(); } public Optional exitStatus() { - lock.lock(); - try - { - if (running) - throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!"); - - return Optional.ofNullable(exception); - } - finally - { - lock.unlock(); - } + if (running()) + throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!"); + + return errorHandler.getException(); } } diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index 937b40f..d54a5bb 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -2,6 +2,7 @@ package de.juplo.kafka; import com.mongodb.client.MongoClient; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -12,7 +13,6 @@ import org.apache.kafka.common.utils.Bytes; import org.junit.jupiter.api.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; -import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.boot.autoconfigure.mongo.MongoProperties; import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo; @@ -20,6 +20,8 @@ import org.springframework.boot.test.context.ConfigDataApplicationContextInitial import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Import; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; +import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.test.context.TestPropertySource; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; @@ -37,11 +39,7 @@ import static org.assertj.core.api.Assertions.*; import static org.awaitility.Awaitility.*; -@SpringJUnitConfig( - initializers = ConfigDataApplicationContextInitializer.class, - classes = { - KafkaAutoConfiguration.class, - ApplicationTests.Configuration.class }) +@SpringJUnitConfig(initializers = ConfigDataApplicationContextInitializer.class) @TestPropertySource( properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", @@ -69,6 +67,8 @@ abstract class GenericApplicationTests @Autowired MongoProperties mongoProperties; @Autowired + KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry; + @Autowired TestRecordHandler recordHandler; @Autowired EndlessConsumer endlessConsumer; -- 2.20.1