From f095f71a104fcde025a63f87ba75eb5cb3136656 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sun, 4 Sep 2022 19:30:29 +0200 Subject: [PATCH] Auf `@KafkaHandler` umgestellt MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Die Autoconfiguration über die Annotation `@EnableKafka` aktiviert * Da die Autoconfiguration von Spring Kafka zieht, vereinfacht sich die Konfiguration: ** Spring Kafka erzeugt den benötigten `MessageListenerContainer`, der für die Anbindung der mit `@KafkaHandler` annotierten Methode benötigt wird automatisch und versorgt ihn mit einem passenden `KafkaConsumer`, so dass letzterer nicht mehr explizit erzeugt werden muss. ** Der Scheduler wird von Spring Kafka erzeugt und verwaltet, so dass nicht mehr explizit ein `ExecutorService` erzeugt und beendet werden muss. ** Der Rebalance-Listener wird automatisch eingebunden, der `ApplicationRebalanceListener` muss allerdings von der richtigen Spring-Klasse ableiten, damit er von der Autoconfiguration gefunden wird. * Um das von dem Testfall erwartete Default-Verhalten des `KafkaConsumer` mit dem `MessageListenerContainer` zu simulieren, musste ein angepasster `ErrorHandler` implementiert werden. * Der Code zum Exception-Handling und zum Schließen des `KafkaConsumer` in `EndlessConsumer` entfällt. * Der Code zum Starten/Stoppen in `EndlessConsumer` kann einfach die entsprechende Methoden des `MessageListenerContainers aufrufen. Dafür muss er allerdings erst die passende Instanz aus einer Registry über die Client-ID erfragen. * Der Testfall musste an die Autoconfiguration angepasst werden: ** Die `KafkaAutoConfiguration` muss hier explizit eingebunden werden. ** Da auch der Test einen `KafkaConsumer` benötigt, muss die in der Anwendung nicht mehr benötigte Factory jetzt explizit für den Test bereitgestellt werden. --- README.sh | 23 ++- src/main/java/de/juplo/kafka/Application.java | 27 --- .../juplo/kafka/ApplicationConfiguration.java | 35 +--- .../juplo/kafka/ApplicationErrorHandler.java | 70 +++++++ .../kafka/ApplicationRebalanceListener.java | 3 +- .../java/de/juplo/kafka/EndlessConsumer.java | 195 +++--------------- .../juplo/kafka/GenericApplicationTests.java | 18 +- 7 files changed, 151 insertions(+), 220 deletions(-) create mode 100644 src/main/java/de/juplo/kafka/ApplicationErrorHandler.java diff --git a/README.sh b/README.sh index 07e36d7..a2d813d 100755 --- a/README.sh +++ b/README.sh @@ -76,7 +76,7 @@ http :8092/results/klaus | jq .[].sum | uniq docker-compose stop adder-1 until [ $(http --check-status :8092/results/peter 2> /dev/null) ]; do echo "Waiting for some results for peter to show up on adder-2..."; sleep 1; done -until [ $(http --check-status :8092/results/klaus 2> /dev/null) ]; do echo "Waiting for some results for peter to show up on adder-2..."; sleep 1; done +until [ $(http --check-status :8092/results/klaus 2> /dev/null) ]; do echo "Waiting for some results for klaus to show up on adder-2..."; sleep 1; done echo "Resultate für adder-2" http -v --pretty none -S :8092/results @@ -87,4 +87,25 @@ http :8092/results/peter | jq .[].sum | uniq echo "Resultate für klaus von adder-2" http :8092/results/klaus | jq .[].sum | uniq +docker-compose kill -s 9 adder-2 +docker-compose start adder-1 docker-compose kill -s 9 peter klaus +while ! [[ $(http 0:8091/actuator/health 2> /dev/null) =~ "UP" ]]; do echo "Waiting for adder-1..."; sleep 1; done +until [ $(http --check-status :8091/results/peter 2> /dev/null) ]; do echo "Waiting for some results for peter to show up on adder-1..."; sleep 1; done +until [ $(http --check-status :8091/results/klaus 2> /dev/null) ]; do echo "Waiting for some results for klaus to show up on adder-1..."; sleep 1; done + +echo "Resultate für adder-1" +http -v --pretty none -S :8091/results +echo + +echo "Resultate für peter von adder-1" +http :8091/results/peter | jq .[].sum | uniq +echo "Resultate für klaus von adder-1" +http :8091/results/klaus | jq .[].sum | uniq + +sleep 5 + +echo "Resultate für peter von adder-1" +http :8091/results/peter | jq .[].sum | uniq +echo "Resultate für klaus von adder-1" +http :8091/results/klaus | jq .[].sum | uniq diff --git a/src/main/java/de/juplo/kafka/Application.java b/src/main/java/de/juplo/kafka/Application.java index 76c2520..a4d9aeb 100644 --- a/src/main/java/de/juplo/kafka/Application.java +++ b/src/main/java/de/juplo/kafka/Application.java @@ -8,8 +8,6 @@ import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import javax.annotation.PreDestroy; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.TimeUnit; @SpringBootApplication @@ -18,8 +16,6 @@ public class Application implements ApplicationRunner { @Autowired EndlessConsumer endlessConsumer; - @Autowired - ExecutorService executor; @Override @@ -45,29 +41,6 @@ public class Application implements ApplicationRunner { log.error("Unexpected exception while stopping EndlessConsumer: {}", e); } - - try - { - log.info("Shutting down the ExecutorService."); - executor.shutdown(); - log.info("Waiting 5 seconds for the ExecutorService to terminate..."); - executor.awaitTermination(5, TimeUnit.SECONDS); - } - catch (InterruptedException e) - { - log.error("Exception while waiting for the termination of the ExecutorService: {}", e); - } - finally - { - if (!executor.isTerminated()) - { - log.warn("Forcing shutdown of ExecutorService!"); - executor - .shutdownNow() - .forEach(runnable -> log.warn("Unprocessed task: {}", runnable.getClass().getSimpleName())); - } - log.info("Shutdow of ExecutorService finished"); - } } diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 08c827c..f8bf857 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -1,16 +1,13 @@ package de.juplo.kafka; -import org.apache.kafka.clients.consumer.Consumer; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.Optional; -import org.springframework.kafka.core.ConsumerFactory; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; @Configuration @@ -49,34 +46,24 @@ public class ApplicationConfiguration kafkaProperties.getClientId()); } + @Bean + public ApplicationErrorHandler applicationErrorHandler() + { + return new ApplicationErrorHandler(); + } + @Bean public EndlessConsumer endlessConsumer( - Consumer kafkaConsumer, - ExecutorService executor, - ApplicationRebalanceListener rebalanceListener, RecordHandler recordHandler, + ApplicationErrorHandler errorHandler, KafkaProperties kafkaProperties, - ApplicationProperties applicationProperties) + KafkaListenerEndpointRegistry endpointRegistry) { return new EndlessConsumer<>( - executor, kafkaProperties.getClientId(), - applicationProperties.getTopic(), - kafkaConsumer, - rebalanceListener, + endpointRegistry, + errorHandler, recordHandler); } - - @Bean - public ExecutorService executor() - { - return Executors.newSingleThreadExecutor(); - } - - @Bean(destroyMethod = "close") - public Consumer kafkaConsumer(ConsumerFactory factory) - { - return factory.createConsumer(); - } } diff --git a/src/main/java/de/juplo/kafka/ApplicationErrorHandler.java b/src/main/java/de/juplo/kafka/ApplicationErrorHandler.java new file mode 100644 index 0000000..6e15717 --- /dev/null +++ b/src/main/java/de/juplo/kafka/ApplicationErrorHandler.java @@ -0,0 +1,70 @@ +package de.juplo.kafka; + +import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.springframework.kafka.listener.CommonErrorHandler; +import org.springframework.kafka.listener.MessageListenerContainer; +import org.springframework.util.Assert; + +import java.util.Optional; + + +@Slf4j +public class ApplicationErrorHandler implements CommonErrorHandler +{ + private Exception exception; + private boolean ack = true; + + + @Override + public void handleOtherException( + Exception thrownException, + Consumer consumer, + MessageListenerContainer container, + boolean batchListener) + { + Assert.isTrue(batchListener, getClass().getName() + " is only applicable for Batch-Listeners"); + rememberExceptionAndStopContainer(thrownException, container); + } + + @Override + public void handleBatch( + Exception thrownException, + ConsumerRecords data, + Consumer consumer, + MessageListenerContainer container, + Runnable invokeListener) + { + // Do not commit the polled offsets on a logic-error + ack = false; + rememberExceptionAndStopContainer(thrownException, container); + } + + private void rememberExceptionAndStopContainer( + Exception exception, + MessageListenerContainer container) + { + log.error("{}, stopping container {} abnormally", exception, container); + this.exception = exception; + container.stopAbnormally(() -> log.info("{} is stopped", container)); + } + + @Override + public boolean isAckAfterHandle() + { + return ack; + } + + + public Optional getException() + { + return Optional.ofNullable(exception); + } + + public void clearState() + { + this.exception = null; + this.ack = true; + } +} diff --git a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java index 0bfee67..ba15227 100644 --- a/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java +++ b/src/main/java/de/juplo/kafka/ApplicationRebalanceListener.java @@ -4,13 +4,14 @@ import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.common.TopicPartition; +import org.springframework.kafka.listener.ConsumerAwareRebalanceListener; import java.util.*; @RequiredArgsConstructor @Slf4j -public class ApplicationRebalanceListener implements ConsumerRebalanceListener +public class ApplicationRebalanceListener implements ConsumerAwareRebalanceListener { private final ApplicationRecordHandler recordHandler; private final AdderResults adderResults; diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 00678c4..d3d11ae 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -2,55 +2,36 @@ package de.juplo.kafka; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; -import org.apache.kafka.clients.consumer.*; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.RecordDeserializationException; -import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; -import javax.annotation.PreDestroy; -import java.time.Duration; -import java.util.*; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; +import java.util.List; +import java.util.Optional; -@Slf4j @RequiredArgsConstructor -public class EndlessConsumer implements Runnable +@Slf4j +public class EndlessConsumer { - private final ExecutorService executor; private final String id; - private final String topic; - private final Consumer consumer; - private final ConsumerRebalanceListener rebalanceListener; + private final KafkaListenerEndpointRegistry registry; + private final ApplicationErrorHandler errorHandler; private final RecordHandler recordHandler; - private final Lock lock = new ReentrantLock(); - private final Condition condition = lock.newCondition(); - private boolean running = false; - private Exception exception; private long consumed = 0; - - @Override - public void run() + @KafkaListener( + id = "${spring.kafka.client-id}", + idIsGroup = false, + topics = "${sumup.adder.topic}", + batch = "true", + autoStartup = "false") + public void accept(List> records) { - try - { - log.info("{} - Subscribing to topic {}", id, topic); - consumer.subscribe(Arrays.asList(topic), rebalanceListener); - - while (true) - { - ConsumerRecords records = - consumer.poll(Duration.ofSeconds(1)); - // Do something with the data... - log.info("{} - Received {} messages", id, records.count()); + log.info("{} - Received {} messages", id, records.size()); for (ConsumerRecord record : records) { log.info( @@ -67,146 +48,38 @@ public class EndlessConsumer implements Runnable consumed++; } - } - } - catch(WakeupException e) - { - log.info("{} - RIIING! Request to stop consumption - commiting current offsets!", id); - consumer.commitSync(); - shutdown(); - } - catch(RecordDeserializationException e) - { - TopicPartition tp = e.topicPartition(); - long offset = e.offset(); - log.error( - "{} - Could not deserialize message on topic {} with offset={}: {}", - id, - tp, - offset, - e.getCause().toString()); - - consumer.commitSync(); - shutdown(e); - } - catch(Exception e) - { - log.error("{} - Unexpected error: {}", id, e.toString(), e); - shutdown(e); - } - finally - { - log.info("{} - Consumer-Thread exiting", id); - } - } - - private void shutdown() - { - shutdown(null); - } - - private void shutdown(Exception e) - { - lock.lock(); - try - { - try - { - log.info("{} - Unsubscribing from topic {}", id, topic); - consumer.unsubscribe(); - } - catch (Exception ue) - { - log.error( - "{} - Error while unsubscribing from topic {}: {}", - id, - topic, - ue.toString()); - } - finally - { - running = false; - exception = e; - condition.signal(); - } - } - finally - { - lock.unlock(); - } } public void start() { - lock.lock(); - try - { - if (running) - throw new IllegalStateException("Consumer instance " + id + " is already running!"); - - log.info("{} - Starting - consumed {} messages before", id, consumed); - running = true; - exception = null; - executor.submit(this); - } - finally - { - lock.unlock(); - } - } + if (running()) + throw new IllegalStateException("Consumer instance " + id + " is already running!"); - public synchronized void stop() throws InterruptedException - { - lock.lock(); - try - { - if (!running) - throw new IllegalStateException("Consumer instance " + id + " is not running!"); - - log.info("{} - Stopping", id); - consumer.wakeup(); - condition.await(); - log.info("{} - Stopped - consumed {} messages so far", id, consumed); - } - finally - { - lock.unlock(); - } + log.info("{} - Starting - consumed {} messages before", id, consumed); + errorHandler.clearState(); + registry.getListenerContainer(id).start(); } - @PreDestroy - public void destroy() throws ExecutionException, InterruptedException + public void stop() { - log.info("{} - Destroy!", id); - log.info("{}: Consumed {} messages in total, exiting!", id, consumed); + if (!running()) + throw new IllegalStateException("Consumer instance " + id + " is not running!"); + + log.info("{} - Stopping", id); + registry.getListenerContainer(id).stop(); + log.info("{} - Stopped - consumed {} messages so far", id, consumed); } public boolean running() { - lock.lock(); - try - { - return running; - } - finally - { - lock.unlock(); - } + return registry.getListenerContainer(id).isRunning(); } public Optional exitStatus() { - lock.lock(); - try - { - if (running) - throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!"); - - return Optional.ofNullable(exception); - } - finally - { - lock.unlock(); - } + if (running()) + throw new IllegalStateException("No exit-status available: Consumer instance " + id + " is running!"); + + return errorHandler.getException(); } } diff --git a/src/test/java/de/juplo/kafka/GenericApplicationTests.java b/src/test/java/de/juplo/kafka/GenericApplicationTests.java index 937b40f..753debe 100644 --- a/src/test/java/de/juplo/kafka/GenericApplicationTests.java +++ b/src/test/java/de/juplo/kafka/GenericApplicationTests.java @@ -2,6 +2,7 @@ package de.juplo.kafka; import com.mongodb.client.MongoClient; import lombok.extern.slf4j.Slf4j; +import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; @@ -12,7 +13,6 @@ import org.apache.kafka.common.utils.Bytes; import org.junit.jupiter.api.*; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; -import org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.boot.autoconfigure.mongo.MongoProperties; import org.springframework.boot.test.autoconfigure.data.mongo.AutoConfigureDataMongo; @@ -20,6 +20,8 @@ import org.springframework.boot.test.context.ConfigDataApplicationContextInitial import org.springframework.boot.test.context.TestConfiguration; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Import; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; +import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.test.context.EmbeddedKafka; import org.springframework.test.context.TestPropertySource; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; @@ -37,11 +39,7 @@ import static org.assertj.core.api.Assertions.*; import static org.awaitility.Awaitility.*; -@SpringJUnitConfig( - initializers = ConfigDataApplicationContextInitializer.class, - classes = { - KafkaAutoConfiguration.class, - ApplicationTests.Configuration.class }) +@SpringJUnitConfig(initializers = ConfigDataApplicationContextInitializer.class) @TestPropertySource( properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}", @@ -69,6 +67,8 @@ abstract class GenericApplicationTests @Autowired MongoProperties mongoProperties; @Autowired + KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry; + @Autowired TestRecordHandler recordHandler; @Autowired EndlessConsumer endlessConsumer; @@ -392,5 +392,11 @@ abstract class GenericApplicationTests { return new TestRecordHandler(applicationRecordHandler); } + + @Bean(destroyMethod = "close") + public org.apache.kafka.clients.consumer.Consumer kafkaConsumer(ConsumerFactory factory) + { + return factory.createConsumer(); + } } } -- 2.20.1