Springify: Die Fehlerbehandlung funktioniert unabhängig vom Batch-Listener
authorKai Moritz <kai@juplo.de>
Sat, 16 Apr 2022 10:03:19 +0000 (12:03 +0200)
committerKai Moritz <kai@juplo.de>
Sat, 16 Apr 2022 10:03:19 +0000 (12:03 +0200)
* Es ist nicht wie zunächst vermutet nötig, einen Batch-Listener zu
  verwenden, um das gewünschte Stop-World-Verhalten für Deserialisierungs-
  Fehler zu erreichen.
* Den Batch-Listener wieder entfernt - der Test bleibt unverändert GRÜN.
* Der von Spring Kafka automatisch erzeugte Listener-Container verwendet
  dann auch automatisch den als Bean definierten Error-Handler.

src/main/java/de/juplo/kafka/ApplicationConfiguration.java
src/main/java/de/juplo/kafka/EndlessConsumer.java

index 12b6990..fd4ff28 100644 (file)
@@ -4,11 +4,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.springframework.boot.context.properties.EnableConfigurationProperties;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
-import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
-import org.springframework.kafka.config.KafkaListenerContainerFactory;
-import org.springframework.kafka.core.ConsumerFactory;
 import org.springframework.kafka.listener.CommonContainerStoppingErrorHandler;
-import org.springframework.kafka.listener.CommonErrorHandler;
 
 import java.util.function.Consumer;
 
@@ -26,21 +22,6 @@ public class ApplicationConfiguration
     };
   }
 
-  @Bean
-  public KafkaListenerContainerFactory<?> batchFactory(
-      ConsumerFactory<String, Long> consumerFactory,
-      CommonErrorHandler errorHandler)
-  {
-    ConcurrentKafkaListenerContainerFactory<String, Long> factory =
-        new ConcurrentKafkaListenerContainerFactory<>();
-
-    factory.setConsumerFactory(consumerFactory);
-    factory.setCommonErrorHandler(errorHandler);
-    factory.setBatchListener(true);
-
-    return factory;
-  }
-
   @Bean
   public CommonContainerStoppingErrorHandler errorHandler()
   {
index 5e76865..888805f 100644 (file)
@@ -9,7 +9,6 @@ import org.springframework.kafka.annotation.KafkaListener;
 import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
 import org.springframework.stereotype.Component;
 
-import java.util.List;
 import java.util.function.Consumer;
 
 
@@ -31,28 +30,22 @@ public class EndlessConsumer<K, V>
       id = "${consumer.client-id}",
       idIsGroup = false,
       topics = "${consumer.topic}",
-      containerFactory = "batchFactory",
       autoStartup = "false")
-  public void receive(List<ConsumerRecord<K, V>> records)
+  public void receive(ConsumerRecord<K, V> record)
   {
-    // Do something with the data...
-    log.info("{} - Received {} messages", id, records.size());
-    for (ConsumerRecord<K, V> record : records)
-    {
-      log.info(
-          "{} - {}: {}/{} - {}={}",
-          id,
-          record.offset(),
-          record.topic(),
-          record.partition(),
-          record.key(),
-          record.value()
-      );
+    log.info(
+        "{} - {}: {}/{} - {}={}",
+        id,
+        record.offset(),
+        record.topic(),
+        record.partition(),
+        record.key(),
+        record.value()
+    );
 
-      handler.accept(record);
+    handler.accept(record);
 
-      consumed++;
-    }
+    consumed++;
   }