Springify: GRÜN - Unerwartetes Verhalten lag an Konfigurations-Fehler
authorKai Moritz <kai@juplo.de>
Fri, 15 Apr 2022 10:55:30 +0000 (12:55 +0200)
committerKai Moritz <kai@juplo.de>
Fri, 15 Apr 2022 12:23:33 +0000 (14:23 +0200)
* Der `KafkaMessageListenerContainer` hatte sich nicht wegen einem
  fehlenden `ErrorHandlingDeserializer` trotz des konfigurierten
  `CommonContainerStoppingErrorHandler` in der Endlosschleife verfangen,
  so wie es die Fehlermeldung suggeriert hatte.
* Der Fehler lag eigentlich daran, dass der Error-Handler zwar erzeugt,
  aber nicht der wegen der Batch-Verarbeitung manuell erzeugten
  `KafkaListenerContainerFactory` übergeben wurde.

src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/resources/application.yml

index 4c74eb2..12b6990 100644 (file)
@@ -8,6 +8,7 @@ import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
 import org.springframework.kafka.config.KafkaListenerContainerFactory;
 import org.springframework.kafka.core.ConsumerFactory;
 import org.springframework.kafka.listener.CommonContainerStoppingErrorHandler;
+import org.springframework.kafka.listener.CommonErrorHandler;
 
 import java.util.function.Consumer;
 
@@ -26,12 +27,15 @@ public class ApplicationConfiguration
   }
 
   @Bean
-  public KafkaListenerContainerFactory<?> batchFactory(ConsumerFactory<String, Long> consumerFactory)
+  public KafkaListenerContainerFactory<?> batchFactory(
+      ConsumerFactory<String, Long> consumerFactory,
+      CommonErrorHandler errorHandler)
   {
     ConcurrentKafkaListenerContainerFactory<String, Long> factory =
         new ConcurrentKafkaListenerContainerFactory<>();
 
     factory.setConsumerFactory(consumerFactory);
+    factory.setCommonErrorHandler(errorHandler);
     factory.setBatchListener(true);
 
     return factory;
index afc074c..1cb6212 100644 (file)
@@ -31,9 +31,7 @@ spring:
       client-id: ${consumer.client-id}
       auto-offset-reset: ${consumer.auto-offset-reset}
       group-id: ${consumer.group-id}
-      value-deserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
-      properties:
-        spring.deserializer.value.delegate.class: org.apache.kafka.common.serialization.LongDeserializer
+      value-deserializer: org.apache.kafka.common.serialization.LongDeserializer
 logging:
   level:
     root: INFO