From 9d1e359d897bbac39ab50adbb44a9af67b1c5121 Mon Sep 17 00:00:00 2001 From: Kai Moritz Date: Fri, 15 Apr 2022 12:55:30 +0200 Subject: [PATCH] =?utf8?q?Springify:=20GR=C3=9CN=20-=20Unerwartetes=20Verh?= =?utf8?q?alten=20lag=20an=20Konfigurations-Fehler?= MIME-Version: 1.0 Content-Type: text/plain; charset=utf8 Content-Transfer-Encoding: 8bit * Der `KafkaMessageListenerContainer` hatte sich nicht wegen einem fehlenden `ErrorHandlingDeserializer` trotz des konfigurierten `CommonContainerStoppingErrorHandler` in der Endlosschleife verfangen, so wie es die Fehlermeldung suggeriert hatte. * Der Fehler lag eigentlich daran, dass der Error-Handler zwar erzeugt, aber nicht der wegen der Batch-Verarbeitung manuell erzeugten `KafkaListenerContainerFactory` übergeben wurde. --- src/main/java/de/juplo/kafka/ApplicationConfiguration.java | 6 +++++- src/main/resources/application.yml | 4 +--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java index 4c74eb2..12b6990 100644 --- a/src/main/java/de/juplo/kafka/ApplicationConfiguration.java +++ b/src/main/java/de/juplo/kafka/ApplicationConfiguration.java @@ -8,6 +8,7 @@ import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.config.KafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.listener.CommonContainerStoppingErrorHandler; +import org.springframework.kafka.listener.CommonErrorHandler; import java.util.function.Consumer; @@ -26,12 +27,15 @@ public class ApplicationConfiguration } @Bean - public KafkaListenerContainerFactory batchFactory(ConsumerFactory consumerFactory) + public KafkaListenerContainerFactory batchFactory( + ConsumerFactory consumerFactory, + CommonErrorHandler errorHandler) { ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory); + factory.setCommonErrorHandler(errorHandler); factory.setBatchListener(true); return factory; diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml index afc074c..1cb6212 100644 --- a/src/main/resources/application.yml +++ b/src/main/resources/application.yml @@ -31,9 +31,7 @@ spring: client-id: ${consumer.client-id} auto-offset-reset: ${consumer.auto-offset-reset} group-id: ${consumer.group-id} - value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer - properties: - spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.LongDeserializer + value-deserializer: org.apache.kafka.common.serialization.LongDeserializer logging: level: root: INFO -- 2.20.1