From: Kai Moritz Date: Sat, 16 Apr 2022 10:03:19 +0000 (+0200) Subject: Springify: Die Fehlerbehandlung funktioniert unabhängig vom Batch-Listener X-Git-Url: http://juplo.de/gitweb/?a=commitdiff_plain;h=5966e3b824a0303b02fd59c693ae35c3ededa111;p=demos%2Fkafka%2Ftraining Springify: Die Fehlerbehandlung funktioniert unabhängig vom Batch-Listener * Es ist nicht wie zunächst vermutet nötig, einen Batch-Listener zu verwenden, um das gewünschte Stop-World-Verhalten für Deserialisierungs- Fehler zu erreichen. * Den Batch-Listener wieder entfernt - der Test bleibt unverändert GRÜN. * Der von Spring Kafka automatisch erzeugte Listener-Container verwendet dann auch automatisch den als Bean definierten Error-Handler. --- diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 12b6990..fd4ff28 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -4,11 +4,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; -import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; -import org.springframework.kafka.config.KafkaListenerContainerFactory; -import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.listener.CommonContainerStoppingErrorHandler; -import org.springframework.kafka.listener.CommonErrorHandler; import java.util.function.Consumer; @@ -26,21 +22,6 @@ public class ApplicationConfiguration }; } - @Bean - public KafkaListenerContainerFactory batchFactory( - ConsumerFactory consumerFactory, - CommonErrorHandler errorHandler) - { - ConcurrentKafkaListenerContainerFactory factory = - new ConcurrentKafkaListenerContainerFactory<>(); - - factory.setConsumerFactory(consumerFactory); - factory.setCommonErrorHandler(errorHandler); - factory.setBatchListener(true); - - return factory; - } - @Bean public CommonContainerStoppingErrorHandler errorHandler() { diff --git a/src/main/java/de/juplo/kafka/EndlessConsumer.java b/src/main/java/de/juplo/kafka/EndlessConsumer.java index 5e76865..888805f 100644 --- a/src/main/java/de/juplo/kafka/EndlessConsumer.java +++ b/src/main/java/de/juplo/kafka/EndlessConsumer.java @@ -9,7 +9,6 @@ import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.config.KafkaListenerEndpointRegistry; import org.springframework.stereotype.Component; -import java.util.List; import java.util.function.Consumer; @@ -31,28 +30,22 @@ public class EndlessConsumer id = "${consumer.client-id}", idIsGroup = false, topics = "${consumer.topic}", - containerFactory = "batchFactory", autoStartup = "false") - public void receive(List> records) + public void receive(ConsumerRecord record) { - // Do something with the data... - log.info("{} - Received {} messages", id, records.size()); - for (ConsumerRecord record : records) - { - log.info( - "{} - {}: {}/{} - {}={}", - id, - record.offset(), - record.topic(), - record.partition(), - record.key(), - record.value() - ); + log.info( + "{} - {}: {}/{} - {}={}", + id, + record.offset(), + record.topic(), + record.partition(), + record.key(), + record.value() + ); - handler.accept(record); + handler.accept(record); - consumed++; - } + consumed++; }