From 5966e3b824a0303b02fd59c693ae35c3ededa111 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Sat, 16 Apr 2022 12:03:19 +0200 Subject: [PATCH] =?utf8?q?Springify:=20Die=20Fehlerbehandlung=20funktionie?= =?utf8?q?rt=20unabh=C3=A4ngig=20vom=20Batch-Listener?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Es ist nicht wie zunächst vermutet nötig, einen Batch-Listener zu verwenden, um das gewünschte Stop-World-Verhalten für Deserialisierungs- Fehler zu erreichen. * Den Batch-Listener wieder entfernt - der Test bleibt unverändert GRÜN. * Der von Spring Kafka automatisch erzeugte Listener-Container verwendet dann auch automatisch den als Bean definierten Error-Handler. --- .../juplo/kafka/ApplicationConfiguration.java | 19 ------------ .../java/de/juplo/kafka/EndlessConsumer.java | 31 +++++++------------ 2 files changed, 12 insertions(+), 38 deletions(-) diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 12b6990..fd4ff28 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -4,11 +4,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; -import org.springframework.kafka.config.KafkaListenerContainerFactory; -import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.listener.CommonContainerStoppingErrorHandler; -import org.springframework.kafka.listener.CommonErrorHandler; import java.util.function.Consumer; @@ -26,21 +22,6 @@ public class ApplicationConfiguration }; } - @Bean - public KafkaListenerContainerFactory batchFactory( - ConsumerFactory consumerFactory, - CommonErrorHandler errorHandler) - { - ConcurrentKafkaListenerContainerFactory factory = - new ConcurrentKafkaListenerContainerFactory<>(); - - factory.setConsumerFactory(consumerFactory); - factory.setCommonErrorHandler(errorHandler); - factory.setBatchListener(true); - - return factory; - } - @Bean public CommonContainerStoppingErrorHandler errorHandler() { diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 5e76865..888805f 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -9,7 +9,6 @@ import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.config.KafkaListenerEndpointRegistry; import org.springframework.stereotype.Component; -import java.util.List; import java.util.function.Consumer; @@ -31,28 +30,22 @@ public class EndlessConsumer id = "${consumer.client-id}", idIsGroup = false, topics = "${consumer.topic}", - containerFactory = "batchFactory", autoStartup = "false") - public void receive(List> records) + public void receive(ConsumerRecord record) { - // Do something with the data... - log.info("{} - Received {} messages", id, records.size()); - for (ConsumerRecord record : records) - { - log.info( - "{} - {}: {}/{} - {}={}", - id, - record.offset(), - record.topic(), - record.partition(), - record.key(), - record.value() - ); + log.info( + "{} - {}: {}/{} - {}={}", + id, + record.offset(), + record.topic(), + record.partition(), + record.key(), + record.value() + ); - handler.accept(record); + handler.accept(record); - consumed++; - } + consumed++; } -- 2.20.1